diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 115 |
1 files changed, 76 insertions, 39 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 43a297405b2..729bc82cc23 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -30,6 +30,7 @@ using namespace std; using namespace Ice; +using namespace Ice::Instrumentation; using namespace IceInternal; Ice::LocalObject* Ice::upCast(ConnectionI* p) { return p; } @@ -119,14 +120,14 @@ private: ConnectionIPtr _connection; }; -Ice::ObserverConnectionState connectionStateMap[] = { - Ice::ObserverConnectionStateInitializing, // StateNotInitialized - Ice::ObserverConnectionStateInitializing, // StateNotValidated - Ice::ObserverConnectionStateActive, // StateActive - Ice::ObserverConnectionStateHolding, // StateHolding - Ice::ObserverConnectionStateClosing, // StateClosing - Ice::ObserverConnectionStateClosed, // StateClosed - Ice::ObserverConnectionStateClosed, // StateFinished +ConnectionState connectionStateMap[] = { + ConnectionStateValidating, // StateNotInitialized + ConnectionStateValidating, // StateNotValidated + ConnectionStateActive, // StateActive + ConnectionStateHolding, // StateHolding + ConnectionStateClosing, // StateClosing + ConnectionStateClosed, // StateClosed + ConnectionStateClosed, // StateFinished }; } @@ -136,6 +137,10 @@ IceInternal::ConnectionReaper::add(const ConnectionIPtr& connection) { Lock sync(*this); _connections.push_back(connection); + if(connection->_observer.get()) + { + connection->_observer->getObserver()->detach(); + } } void @@ -151,7 +156,7 @@ Ice::ConnectionI::Observer::Observer(const BasicStream& readStream, const BasicS } void -Ice::ConnectionI::Observer::setObserver(const Ice::ConnectionObserverPtr& observer) +Ice::ConnectionI::Observer::setObserver(const ConnectionObserverPtr& observer) { _observer = observer; } @@ -500,10 +505,32 @@ void Ice::ConnectionI::updateObserver() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state < StateNotValidated || _state > StateClosed) + { + return; + } + const ObserverResolverPtr& resolver = _instance->initializationData().observerResolver; assert(resolver); - ConnectionObserverPtr obsv = _observer.get() ? _observer->getObserver() : ConnectionObserverPtr(); - obsv = resolver->getConnectionObserver(this, connectionStateMap[static_cast<int>(_state)], obsv); + ConnectionObserverPtr obsv; + if(_observer.get()) + { + obsv = _observer->getObserver(); + } + + ConnectionInfoPtr info; + if(!_info && _state < StateClosed) + { + _info = _transceiver->getInfo(); + _info->connectionId = _endpoint->connectionId(); + _info->incoming = _connector == 0; + _info->adapterName = _adapter ? _adapter->getName() : string(); + } + + obsv = resolver->getConnectionObserver(info, + _endpoint->getInfo(), + connectionStateMap[static_cast<int>(_state)], + obsv); if(obsv) { if(!_observer.get()) @@ -511,6 +538,7 @@ Ice::ConnectionI::updateObserver() _observer.reset(new Observer(_readStream, _writeStream)); } _observer->setObserver(obsv); + obsv->attach(); } else { @@ -1823,11 +1851,14 @@ Ice::ConnectionI::getInfo() const _exception->ice_throw(); } - ConnectionInfoPtr info = _transceiver->getInfo(); - info->connectionId = _endpoint->connectionId(); - info->incoming = _connector == 0; - info->adapterName = _adapter ? _adapter->getName() : string(); - return info; + if(!_info) + { + _info = _transceiver->getInfo(); + _info->connectionId = _endpoint->connectionId(); + _info->incoming = _connector == 0; + _info->adapterName = _adapter ? _adapter->getName() : string(); + } + return _info; } void @@ -1956,17 +1987,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool(); } _threadPool->initialize(this); - - const ObserverResolverPtr& resolver = _instance->initializationData().observerResolver; - if(resolver) - { - ConnectionObserverPtr obsv = resolver->getConnectionObserver(this, ObserverConnectionStateInitializing, 0); - if(obsv) - { - _observer.reset(new Observer(_readStream, _writeStream)); - _observer->setObserver(obsv); - } - } } catch(const IceUtil::Exception&) { @@ -1974,20 +1994,10 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, throw; } __setNoDelete(false); - - if(_observer.get()) - { - _observer->getObserver()->attach(); - } } Ice::ConnectionI::~ConnectionI() { - if(_observer.get()) - { - _observer->getObserver()->detach(); - } - assert(!_startCallback); assert(_state == StateFinished); assert(_dispatchCount == 0); @@ -2193,12 +2203,19 @@ Ice::ConnectionI::setState(State state) if(_observer.get()) { - Ice::ObserverConnectionState oldState = connectionStateMap[static_cast<int>(_state)]; - Ice::ObserverConnectionState newState = connectionStateMap[static_cast<int>(state)]; + ConnectionState oldState = connectionStateMap[static_cast<int>(_state)]; + ConnectionState newState = connectionStateMap[static_cast<int>(state)]; if(oldState != newState) { _observer->getObserver()->stateChanged(oldState, newState); } + if(state == StateClosed && _exception.get()) + { + if(!dynamic_cast<CloseConnectionException*>(_exception.get())) + { + _observer->getObserver()->failed(_exception->ice_name()); + } + } } _state = state; @@ -2275,6 +2292,26 @@ Ice::ConnectionI::initialize(SocketOperation operation) _threadPool->update(this, operation, s); return false; } + + const ObserverResolverPtr& resolver = _instance->initializationData().observerResolver; + if(resolver) + { + _info = _transceiver->getInfo(); + _info->connectionId = _endpoint->connectionId(); + _info->incoming = _connector == 0; + _info->adapterName = _adapter ? _adapter->getName() : string(); + + ConnectionObserverPtr obsv = resolver->getConnectionObserver(_info, + _endpoint->getInfo(), + ConnectionStateValidating, + 0); + if(obsv) + { + _observer.reset(new Observer(_readStream, _writeStream)); + _observer->setObserver(obsv); + obsv->attach(); + } + } // // Update the connection description once the transceiver is initialized. |