diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 106 |
1 files changed, 55 insertions, 51 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index fd722eca237..1b29f6a7664 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -38,6 +38,9 @@ Ice::LocalObject* Ice::upCast(ConnectionI* p) { return p; } namespace { +const ::std::string __flushBatchRequests_name = "flushBatchRequests"; + + class TimeoutCallback : public IceUtil::TimerTask { public: @@ -501,23 +504,11 @@ Ice::ConnectionI::updateObserver() return; } - if(!_info && _state < StateClosed) - { - _info = _transceiver->getInfo(); - _info->connectionId = _endpoint->connectionId(); - _info->incoming = _connector == 0; - _info->adapterName = _adapter ? _adapter->getName() : string(); - } - - if(_info) - { - const CommunicatorObserverPtr& comObsv = _instance->initializationData().observer; - assert(comObsv); - _observer.attach(comObsv->getConnectionObserver(_info, - _endpoint->getInfo(), - connectionStateMap[static_cast<int>(_state)], - _observer.get())); - } + assert(_instance->initializationData().observer); + _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), + _endpoint, + toConnectionState(_state), + _observer.get())); } void @@ -554,7 +545,6 @@ bool Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) { BasicStream* os = out->os(); - out->attachRemoteObserver(this); IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_exception.get()) @@ -570,6 +560,8 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) assert(_state > StateNotValidated); assert(_state < StateClosing); + out->attachRemoteObserver(initConnectionInfo(), _endpoint); + // // Ensure the message isn't bigger than what we can send with the // transport. @@ -647,6 +639,8 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b assert(_state > StateNotValidated); assert(_state < StateClosing); + out->__attachRemoteObserver(initConnectionInfo(), _endpoint); + // // Ensure the message isn't bigger than what we can send with the // transport. @@ -904,7 +898,8 @@ Ice::ConnectionI::abortBatchRequest() void Ice::ConnectionI::flushBatchRequests() { - BatchOutgoing out(this, _instance.get()); + IceInternal::InvocationObserver observer(_instance.get(), __flushBatchRequests_name); + BatchOutgoing out(this, _instance.get(), observer); out.invoke(); } @@ -914,13 +909,6 @@ Ice::ConnectionI::begin_flushBatchRequests() return __begin_flushBatchRequests(__dummyCallback, 0); } -namespace -{ - -const ::std::string __flushBatchRequests_name = "flushBatchRequests"; - -} - AsyncResultPtr Ice::ConnectionI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie) { @@ -971,6 +959,8 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) _exception->ice_throw(); } + out->attachRemoteObserver(initConnectionInfo(), _endpoint); + if(_batchRequestNum == 0) { out->sent(false); @@ -1029,6 +1019,8 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) _exception->ice_throw(); } + outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint); + if(_batchRequestNum == 0) { AsyncStatus status = AsyncStatusSent; @@ -1824,15 +1816,7 @@ Ice::ConnectionI::getInfo() const { _exception->ice_throw(); } - - if(!_info) - { - _info = _transceiver->getInfo(); - _info->connectionId = _endpoint->connectionId(); - _info->incoming = _connector == 0; - _info->adapterName = _adapter ? _adapter->getName() : string(); - } - return _info; + return initConnectionInfo(); } void @@ -2177,8 +2161,8 @@ Ice::ConnectionI::setState(State state) if(_observer) { - ConnectionState oldState = connectionStateMap[static_cast<int>(_state)]; - ConnectionState newState = connectionStateMap[static_cast<int>(state)]; + ConnectionState oldState = toConnectionState(_state); + ConnectionState newState = toConnectionState(state); if(oldState != newState) { _observer->stateChanged(oldState, newState); @@ -2271,26 +2255,21 @@ Ice::ConnectionI::initialize(SocketOperation operation) _threadPool->update(this, operation, s); return false; } - - const CommunicatorObserverPtr& comObsv = _instance->initializationData().observer; - if(comObsv) - { - _info = _transceiver->getInfo(); - _info->connectionId = _endpoint->connectionId(); - _info->incoming = _connector == 0; - _info->adapterName = _adapter ? _adapter->getName() : string(); - - _observer.attach(comObsv->getConnectionObserver(_info, - _endpoint->getInfo(), - ConnectionStateValidating, - 0)); - } // // Update the connection description once the transceiver is initialized. // const_cast<string&>(_desc) = _transceiver->toString(); setState(StateNotValidated); + + if(_instance->initializationData().observer) + { + _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), + _endpoint, + ConnectionStateValidating, + 0)); + } + return true; } @@ -3114,3 +3093,28 @@ Ice::ConnectionI::closeTimeout() return _endpoint->timeout(); } } + +Ice::ConnectionInfoPtr +Ice::ConnectionI::initConnectionInfo() const +{ + if(_info) + { + return _info; + } + + ConnectionInfoPtr info = _transceiver->getInfo(); + info->connectionId = _endpoint->connectionId(); + info->incoming = _connector == 0; + info->adapterName = _adapter ? _adapter->getName() : string(); + if(_state > StateNotInitialized) + { + _info = info; // Cache the connection information only if initialized. + } + return info; +} + +ConnectionState +ConnectionI::toConnectionState(State state) const +{ + return connectionStateMap[static_cast<int>(state)]; +} |