diff options
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 105 |
1 files changed, 71 insertions, 34 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 6ae51a0d9eb..5920e58d1c3 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -252,26 +252,31 @@ void IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter) { IceUtil::RecMutex::Lock sync(*this); - + // - // In closed and holding state, we are not registered with the - // thread pool. For all other states, we have to notify the thread - // pool in case this event handler changed from a client to a - // server or vice versa. + // We are registered with a thread pool in active and closing + // mode. However, we only change subscription if we're in active + // mode, and thus ignore closing mode here.k // - if (_state != StateHolding && _state != StateClosed) + if (_state == StateActive) { if (adapter && !_adapter) { - _threadPool->clientIsNowServer(); + // + // Client is now server. + // + unregisterWithPool(); } if (!adapter && _adapter) { - _threadPool->serverIsNowClient(); + // + // Server is now client. + // + unregisterWithPool(); } } - + _adapter = adapter; } @@ -283,13 +288,6 @@ IceInternal::Connection::getAdapter() const } bool -IceInternal::Connection::server() const -{ - IceUtil::RecMutex::Lock sync(*this); - return _adapter; -} - -bool IceInternal::Connection::readable() const { return true; @@ -302,7 +300,7 @@ IceInternal::Connection::read(BasicStream& stream) } void -IceInternal::Connection::message(BasicStream& stream) +IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threadPool) { bool invoke = false; bool batch = false; @@ -310,7 +308,7 @@ IceInternal::Connection::message(BasicStream& stream) { IceUtil::RecMutex::Lock sync(*this); - _threadPool->promoteFollower(); + threadPool->promoteFollower(); if (_state == StateClosed) { @@ -555,15 +553,17 @@ IceInternal::Connection::message(BasicStream& stream) } void -IceInternal::Connection::finished() +IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) { IceUtil::RecMutex::Lock sync(*this); - assert(_state == StateClosed || _state == StateHolding); - - _threadPool->promoteFollower(); + threadPool->promoteFollower(); - if (_state == StateClosed) + if (_state == StateActive || _state == StateClosing) + { + registerWithPool(); + } + else if (_state == StateClosed) { _transceiver->close(); } @@ -578,7 +578,7 @@ IceInternal::Connection::exception(const LocalException& ex) /* bool -IceInternal::Connection::tryDestroy() +IceInternal::Connection::tryDestroy(const ThreadPoolPtr& threadPool) { bool isLocked = trylock(); if(!isLocked) @@ -586,7 +586,7 @@ IceInternal::Connection::tryDestroy() return false; } - _threadPool->promoteFollower(); + threadPool->promoteFollower(); try { @@ -611,12 +611,11 @@ IceInternal::Connection::Connection(const InstancePtr& instance, _transceiver(transceiver), _endpoint(endpoint), _adapter(adapter), - _threadPool(instance->threadPool()), - _logger(instance->logger()), - _traceLevels(instance->traceLevels()), + _logger(_instance->logger()), + _traceLevels(_instance->traceLevels()), _nextRequestId(1), _requestsHint(_requests.end()), - _batchStream(instance), + _batchStream(_instance), _responseCount(0), _proxyUsageCount(0), _state(StateHolding) @@ -713,7 +712,7 @@ IceInternal::Connection::setState(State state) { return; } - _threadPool->_register(_transceiver->fd(), this); + registerWithPool(); break; } @@ -723,7 +722,7 @@ IceInternal::Connection::setState(State state) { return; } - _threadPool->unregister(_transceiver->fd()); + unregisterWithPool(); break; } @@ -738,7 +737,7 @@ IceInternal::Connection::setState(State state) // // We need to continue to read data in closing state. // - _threadPool->_register(_transceiver->fd(), this); + registerWithPool(); } break; } @@ -752,9 +751,9 @@ IceInternal::Connection::setState(State state) // register again before we unregister, so that // finished() is called correctly. // - _threadPool->_register(_transceiver->fd(), this); + registerWithPool(); } - _threadPool->unregister(_transceiver->fd()); + unregisterWithPool(); break; } } @@ -787,3 +786,41 @@ IceInternal::Connection::closeConnection() _transceiver->write(os, _endpoint->timeout()); _transceiver->shutdown(); } + +void +IceInternal::Connection::registerWithPool() +{ + if (_adapter) + { + if (!_serverThreadPool) + { + _serverThreadPool = _instance->serverThreadPool(); + assert(_serverThreadPool); + } + _serverThreadPool->_register(_transceiver->fd(), this); + } + else + { + if (!_clientThreadPool) + { + _clientThreadPool = _instance->clientThreadPool(); + assert(_clientThreadPool); + } + _clientThreadPool->_register(_transceiver->fd(), this); + } +} + +void +IceInternal::Connection::unregisterWithPool() +{ + if (_adapter) + { + assert(_serverThreadPool); + _serverThreadPool->unregister(_transceiver->fd()); + } + else + { + assert(_clientThreadPool); + _clientThreadPool->unregister(_transceiver->fd()); + } +} |