diff options
Diffstat (limited to 'java/src/IceInternal/Connection.java')
-rw-r--r-- | java/src/IceInternal/Connection.java | 109 |
1 files changed, 70 insertions, 39 deletions
diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java index c757f9ce824..79f1e0d8eae 100644 --- a/java/src/IceInternal/Connection.java +++ b/java/src/IceInternal/Connection.java @@ -287,21 +287,26 @@ public final class Connection extends EventHandler try { // - // In closed and holding state, we are not registered with the - // thread pool. For all other states, we have to notify the thread - // pool in case this event handler changed from a client to a - // server or vice versa. + // We are registered with a thread pool in active and closing + // mode. However, we only change subscription if we're in active + // mode, and thus ignore closing mode here.k // - if (_state != StateHolding && _state != StateClosed) + if (_state == StateActive) { if (adapter != null && _adapter == null) { - _threadPool.clientIsNowServer(); + // + // Client is now server. + // + unregisterWithPool(); } if (adapter == null && _adapter != null) { - _threadPool.serverIsNowClient(); + // + // Server is now client. + // + unregisterWithPool(); } } @@ -331,20 +336,6 @@ public final class Connection extends EventHandler // Operations from EventHandler // public boolean - server() - { - _mutex.lock(); - try - { - return _adapter != null; - } - finally - { - _mutex.unlock(); - } - } - - public boolean readable() { return true; @@ -371,7 +362,7 @@ public final class Connection extends EventHandler }; public void - message(BasicStream stream) + message(BasicStream stream, ThreadPool threadPool) { Incoming in = null; boolean batch = false; @@ -379,7 +370,7 @@ public final class Connection extends EventHandler _mutex.lock(); try { - _threadPool.promoteFollower(); + threadPool.promoteFollower(); if (_state == StateClosed) { @@ -626,16 +617,18 @@ public final class Connection extends EventHandler } public void - finished() + finished(ThreadPool threadPool) { _mutex.lock(); try { - assert(_state == StateClosed || _state == StateHolding); + threadPool.promoteFollower(); - _threadPool.promoteFollower(); - - if (_state == StateClosed) + if (_state == StateActive || _state == StateClosing) + { + registerWithPool(); + } + else if (_state == StateClosed) { _transceiver.close(); } @@ -662,7 +655,7 @@ public final class Connection extends EventHandler /* public boolean - tryDestroy() + tryDestroy(ThreadPool threadPool) { boolean isLocked = _mutex.trylock(); if (!isLocked) @@ -670,7 +663,7 @@ public final class Connection extends EventHandler return false; } - _threadPool.promoteFollower(); + threadPool.promoteFollower(); try { @@ -684,14 +677,12 @@ public final class Connection extends EventHandler } */ - Connection(Instance instance, Transceiver transceiver, Endpoint endpoint, - Ice.ObjectAdapter adapter) + Connection(Instance instance, Transceiver transceiver, Endpoint endpoint, Ice.ObjectAdapter adapter) { super(instance); _transceiver = transceiver; _endpoint = endpoint; _adapter = adapter; - _threadPool = instance.threadPool(); _logger = instance.logger(); _traceLevels = instance.traceLevels(); _nextRequestId = 1; @@ -814,7 +805,7 @@ public final class Connection extends EventHandler { return; } - _threadPool._register(_transceiver.fd(), this); + registerWithPool(); break; } @@ -824,7 +815,7 @@ public final class Connection extends EventHandler { return; } - _threadPool.unregister(_transceiver.fd()); + unregisterWithPool(); break; } @@ -839,7 +830,7 @@ public final class Connection extends EventHandler // // We need to continue to read data in closing state. // - _threadPool._register(_transceiver.fd(), this); + registerWithPool(); } break; } @@ -853,9 +844,10 @@ public final class Connection extends EventHandler // register again before we unregister, so that // finished() is called correctly. // - _threadPool._register(_transceiver.fd(), this); + registerWithPool(); } - _threadPool.unregister(_transceiver.fd()); + unregisterWithPool(); + super._stream.destroy(); break; } } @@ -889,6 +881,44 @@ public final class Connection extends EventHandler } private void + registerWithPool() + { + if (_adapter != null) + { + if (_serverThreadPool == null) + { + _serverThreadPool = _instance.serverThreadPool(); + assert(_serverThreadPool != null); + } + _serverThreadPool._register(_transceiver.fd(), this); + } + else + { + if (_clientThreadPool == null) + { + _clientThreadPool = _instance.clientThreadPool(); + assert(_clientThreadPool != null); + } + _clientThreadPool._register(_transceiver.fd(), this); + } + } + + private void + unregisterWithPool() + { + if (_adapter != null) + { + assert(_serverThreadPool != null); + _serverThreadPool.unregister(_transceiver.fd()); + } + else + { + assert(_clientThreadPool != null); + _clientThreadPool.unregister(_transceiver.fd()); + } + } + + private void warning(String msg, Exception ex) { java.io.StringWriter sw = new java.io.StringWriter(); @@ -927,9 +957,10 @@ public final class Connection extends EventHandler private Transceiver _transceiver; private Endpoint _endpoint; private Ice.ObjectAdapter _adapter; - private ThreadPool _threadPool; private Ice.Logger _logger; private TraceLevels _traceLevels; + private ThreadPool _clientThreadPool; + private ThreadPool _serverThreadPool; private int _nextRequestId; private IntMap _requests = new IntMap(); private Ice.LocalException _exception; |