summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp115
1 files changed, 76 insertions, 39 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 43a297405b2..729bc82cc23 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -30,6 +30,7 @@
using namespace std;
using namespace Ice;
+using namespace Ice::Instrumentation;
using namespace IceInternal;
Ice::LocalObject* Ice::upCast(ConnectionI* p) { return p; }
@@ -119,14 +120,14 @@ private:
ConnectionIPtr _connection;
};
-Ice::ObserverConnectionState connectionStateMap[] = {
- Ice::ObserverConnectionStateInitializing, // StateNotInitialized
- Ice::ObserverConnectionStateInitializing, // StateNotValidated
- Ice::ObserverConnectionStateActive, // StateActive
- Ice::ObserverConnectionStateHolding, // StateHolding
- Ice::ObserverConnectionStateClosing, // StateClosing
- Ice::ObserverConnectionStateClosed, // StateClosed
- Ice::ObserverConnectionStateClosed, // StateFinished
+ConnectionState connectionStateMap[] = {
+ ConnectionStateValidating, // StateNotInitialized
+ ConnectionStateValidating, // StateNotValidated
+ ConnectionStateActive, // StateActive
+ ConnectionStateHolding, // StateHolding
+ ConnectionStateClosing, // StateClosing
+ ConnectionStateClosed, // StateClosed
+ ConnectionStateClosed, // StateFinished
};
}
@@ -136,6 +137,10 @@ IceInternal::ConnectionReaper::add(const ConnectionIPtr& connection)
{
Lock sync(*this);
_connections.push_back(connection);
+ if(connection->_observer.get())
+ {
+ connection->_observer->getObserver()->detach();
+ }
}
void
@@ -151,7 +156,7 @@ Ice::ConnectionI::Observer::Observer(const BasicStream& readStream, const BasicS
}
void
-Ice::ConnectionI::Observer::setObserver(const Ice::ConnectionObserverPtr& observer)
+Ice::ConnectionI::Observer::setObserver(const ConnectionObserverPtr& observer)
{
_observer = observer;
}
@@ -500,10 +505,32 @@ void
Ice::ConnectionI::updateObserver()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state < StateNotValidated || _state > StateClosed)
+ {
+ return;
+ }
+
const ObserverResolverPtr& resolver = _instance->initializationData().observerResolver;
assert(resolver);
- ConnectionObserverPtr obsv = _observer.get() ? _observer->getObserver() : ConnectionObserverPtr();
- obsv = resolver->getConnectionObserver(this, connectionStateMap[static_cast<int>(_state)], obsv);
+ ConnectionObserverPtr obsv;
+ if(_observer.get())
+ {
+ obsv = _observer->getObserver();
+ }
+
+ ConnectionInfoPtr info;
+ if(!_info && _state < StateClosed)
+ {
+ _info = _transceiver->getInfo();
+ _info->connectionId = _endpoint->connectionId();
+ _info->incoming = _connector == 0;
+ _info->adapterName = _adapter ? _adapter->getName() : string();
+ }
+
+ obsv = resolver->getConnectionObserver(info,
+ _endpoint->getInfo(),
+ connectionStateMap[static_cast<int>(_state)],
+ obsv);
if(obsv)
{
if(!_observer.get())
@@ -511,6 +538,7 @@ Ice::ConnectionI::updateObserver()
_observer.reset(new Observer(_readStream, _writeStream));
}
_observer->setObserver(obsv);
+ obsv->attach();
}
else
{
@@ -1823,11 +1851,14 @@ Ice::ConnectionI::getInfo() const
_exception->ice_throw();
}
- ConnectionInfoPtr info = _transceiver->getInfo();
- info->connectionId = _endpoint->connectionId();
- info->incoming = _connector == 0;
- info->adapterName = _adapter ? _adapter->getName() : string();
- return info;
+ if(!_info)
+ {
+ _info = _transceiver->getInfo();
+ _info->connectionId = _endpoint->connectionId();
+ _info->incoming = _connector == 0;
+ _info->adapterName = _adapter ? _adapter->getName() : string();
+ }
+ return _info;
}
void
@@ -1956,17 +1987,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool();
}
_threadPool->initialize(this);
-
- const ObserverResolverPtr& resolver = _instance->initializationData().observerResolver;
- if(resolver)
- {
- ConnectionObserverPtr obsv = resolver->getConnectionObserver(this, ObserverConnectionStateInitializing, 0);
- if(obsv)
- {
- _observer.reset(new Observer(_readStream, _writeStream));
- _observer->setObserver(obsv);
- }
- }
}
catch(const IceUtil::Exception&)
{
@@ -1974,20 +1994,10 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
throw;
}
__setNoDelete(false);
-
- if(_observer.get())
- {
- _observer->getObserver()->attach();
- }
}
Ice::ConnectionI::~ConnectionI()
{
- if(_observer.get())
- {
- _observer->getObserver()->detach();
- }
-
assert(!_startCallback);
assert(_state == StateFinished);
assert(_dispatchCount == 0);
@@ -2193,12 +2203,19 @@ Ice::ConnectionI::setState(State state)
if(_observer.get())
{
- Ice::ObserverConnectionState oldState = connectionStateMap[static_cast<int>(_state)];
- Ice::ObserverConnectionState newState = connectionStateMap[static_cast<int>(state)];
+ ConnectionState oldState = connectionStateMap[static_cast<int>(_state)];
+ ConnectionState newState = connectionStateMap[static_cast<int>(state)];
if(oldState != newState)
{
_observer->getObserver()->stateChanged(oldState, newState);
}
+ if(state == StateClosed && _exception.get())
+ {
+ if(!dynamic_cast<CloseConnectionException*>(_exception.get()))
+ {
+ _observer->getObserver()->failed(_exception->ice_name());
+ }
+ }
}
_state = state;
@@ -2275,6 +2292,26 @@ Ice::ConnectionI::initialize(SocketOperation operation)
_threadPool->update(this, operation, s);
return false;
}
+
+ const ObserverResolverPtr& resolver = _instance->initializationData().observerResolver;
+ if(resolver)
+ {
+ _info = _transceiver->getInfo();
+ _info->connectionId = _endpoint->connectionId();
+ _info->incoming = _connector == 0;
+ _info->adapterName = _adapter ? _adapter->getName() : string();
+
+ ConnectionObserverPtr obsv = resolver->getConnectionObserver(_info,
+ _endpoint->getInfo(),
+ ConnectionStateValidating,
+ 0);
+ if(obsv)
+ {
+ _observer.reset(new Observer(_readStream, _writeStream));
+ _observer->setObserver(obsv);
+ obsv->attach();
+ }
+ }
//
// Update the connection description once the transceiver is initialized.