diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 189 |
1 files changed, 133 insertions, 56 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 46369c022d0..43a297405b2 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -145,6 +145,55 @@ IceInternal::ConnectionReaper::swapConnections(vector<ConnectionIPtr>& connectio _connections.swap(connections); } +Ice::ConnectionI::Observer::Observer(const BasicStream& readStream, const BasicStream& writeStream) : + _readStream(readStream), _writeStream(writeStream) +{ +} + +void +Ice::ConnectionI::Observer::setObserver(const Ice::ConnectionObserverPtr& observer) +{ + _observer = observer; +} + +void +Ice::ConnectionI::Observer::startRead() +{ + if(_readWatch.isStarted()) + { + assert(_readStream.i >= _readStreamPos); + _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop()); + } + _readStreamPos = _readStream.i; + _readWatch.start(); +} + +void +Ice::ConnectionI::Observer::finishRead() +{ + assert(_readStream.i >= _readStreamPos); + _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop()); +} + +void +Ice::ConnectionI::Observer::startWrite() +{ + if(_writeWatch.isStarted()) + { + assert(_writeStream.i >= _writeStreamPos); + _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + } + _writeStreamPos = _writeStream.i; + _writeWatch.start(); +} + +void +Ice::ConnectionI::Observer::finishWrite() +{ + assert(_writeStream.i >= _writeStreamPos); + _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop()); +} + void Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) { @@ -451,8 +500,22 @@ void Ice::ConnectionI::updateObserver() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(_instance->initializationData().observerResolver); - _observer = _instance->initializationData().observerResolver->getConnectionObserver(this, _observer); + const ObserverResolverPtr& resolver = _instance->initializationData().observerResolver; + assert(resolver); + ConnectionObserverPtr obsv = _observer.get() ? _observer->getObserver() : ConnectionObserverPtr(); + obsv = resolver->getConnectionObserver(this, connectionStateMap[static_cast<int>(_state)], obsv); + if(obsv) + { + if(!_observer.get()) + { + _observer.reset(new Observer(_readStream, _writeStream)); + } + _observer->setObserver(obsv); + } + else + { + _observer.reset(0); + } } void @@ -1164,11 +1227,9 @@ Ice::ConnectionI::startAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { - if(_observer) + if(_observer.get()) { - assert(!_writeWatch.isStarted()); - _writeStreamPos = _writeStream.i; - _writeWatch.start(); + _observer->startWrite(); } if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty()) @@ -1179,11 +1240,9 @@ Ice::ConnectionI::startAsync(SocketOperation operation) } else if(operation & SocketOperationRead) { - if(_observer && !_readHeader) + if(_observer.get() && !_readHeader) { - assert(!_readWatch.isStarted()); - _readStreamPos = _readStream.i; - _readWatch.start(); + _observer->startRead(); } _transceiver->startRead(_readStream); @@ -1204,19 +1263,17 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { - if(_observer) + if(_observer.get()) { - assert(_writeWatch.isStarted() && _writeStream.i >= _writeStreamPos); - _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + _observer->finishWrite(); } _transceiver->finishWrite(_writeStream); } else if(operation & SocketOperationRead) { - if(_observer && !_readHeader) + if(_observer.get() && !_readHeader) { - assert(_readStream.i >= _readStreamPos); - _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop()); + _observer->finishRead(); } _transceiver->finishRead(_readStream); } @@ -1262,15 +1319,9 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) unscheduleTimeout(current.operation); if(current.operation & SocketOperationWrite && !_writeStream.b.empty()) { - if(_observer && _writeStream.i != _writeStream.b.end()) + if(_observer.get() && _writeStream.i != _writeStream.b.end()) { - if(_writeWatch.isStarted()) - { - assert(_writeStream.i >= _writeStreamPos); - _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop()); - } - _writeStreamPos = _writeStream.i; - _writeWatch.start(); + _observer->startWrite(); } if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream)) @@ -1280,10 +1331,9 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) return; } - if(_observer) + if(_observer.get()) { - assert(_writeStream.i >= _writeStreamPos); - _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + _observer->finishWrite(); } assert(_writeStream.i == _writeStream.b.end()); } @@ -1298,15 +1348,13 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) assert(_readStream.i == _readStream.b.end()); _readHeader = false; - if(_observer) + if(_observer.get()) { // // We can't measure the time to receive the header as it would - // include the wait time. We start the timer now. + // include the wait time. // - _observer->receivedBytes(static_cast<int>(_readStream.i - _readStream.b.begin()), 0); - _readStreamPos = _readStream.i; - _readWatch.start(); + _observer->getObserver()->receivedBytes(static_cast<int>(headerSize), 0); } ptrdiff_t pos = _readStream.i - _readStream.b.begin(); @@ -1355,17 +1403,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) _readStream.i = _readStream.b.begin() + pos; } - if(_observer && _readStream.i != _readStream.b.end()) - { - if(_readWatch.isStarted()) - { - assert(_readStream.i >= _readStreamPos); - _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop()); - } - _readStreamPos = _readStream.i; - _readWatch.start(); - } - if(_readStream.i != _readStream.b.end()) { if(_endpoint->datagram()) @@ -1374,21 +1411,26 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } else { + if(_observer.get()) + { + _observer->startRead(); + } + if(!_transceiver->read(_readStream)) { assert(!_readStream.b.empty()); scheduleTimeout(SocketOperationRead, _endpoint->timeout()); return; } + + if(_observer.get()) + { + _observer->finishRead(); + } assert(_readStream.i == _readStream.b.end()); } } - if(_observer) - { - assert(_readStream.i >= _readStreamPos); - _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop()); - } } if(_state <= StateNotValidated) @@ -1782,6 +1824,7 @@ Ice::ConnectionI::getInfo() const } ConnectionInfoPtr info = _transceiver->getInfo(); + info->connectionId = _endpoint->connectionId(); info->incoming = _connector == 0; info->adapterName = _adapter ? _adapter->getName() : string(); return info; @@ -1914,9 +1957,15 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, } _threadPool->initialize(this); - if(_instance->initializationData().observerResolver) + const ObserverResolverPtr& resolver = _instance->initializationData().observerResolver; + if(resolver) { - _observer = _instance->initializationData().observerResolver->getConnectionObserver(this, _observer); + ConnectionObserverPtr obsv = resolver->getConnectionObserver(this, ObserverConnectionStateInitializing, 0); + if(obsv) + { + _observer.reset(new Observer(_readStream, _writeStream)); + _observer->setObserver(obsv); + } } } catch(const IceUtil::Exception&) @@ -1926,17 +1975,17 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, } __setNoDelete(false); - if(_observer) + if(_observer.get()) { - _observer->attach(); + _observer->getObserver()->attach(); } } Ice::ConnectionI::~ConnectionI() { - if(_observer) + if(_observer.get()) { - _observer->detach(); + _observer->getObserver()->detach(); } assert(!_startCallback); @@ -2142,13 +2191,13 @@ Ice::ConnectionI::setState(State state) } } - if(_observer) + if(_observer.get()) { Ice::ObserverConnectionState oldState = connectionStateMap[static_cast<int>(_state)]; Ice::ObserverConnectionState newState = connectionStateMap[static_cast<int>(state)]; if(oldState != newState) { - _observer->stateChanged(oldState, newState); + _observer->getObserver()->stateChanged(oldState, newState); } } _state = state; @@ -2257,12 +2306,22 @@ Ice::ConnectionI::validate(SocketOperation operation) traceSend(_writeStream, _logger, _traceLevels); } + if(_observer.get()) + { + _observer->startWrite(); + } + if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream)) { scheduleTimeout(SocketOperationWrite, connectTimeout()); _threadPool->update(this, operation, SocketOperationWrite); return false; } + + if(_observer.get()) + { + _observer->finishWrite(); + } } else // The client side has the passive role for connection validation. { @@ -2272,6 +2331,11 @@ Ice::ConnectionI::validate(SocketOperation operation) _readStream.i = _readStream.b.begin(); } + if(_observer.get()) + { + _observer->startRead(); + } + if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream)) { scheduleTimeout(SocketOperationRead, connectTimeout()); @@ -2279,6 +2343,11 @@ Ice::ConnectionI::validate(SocketOperation operation) return false; } + if(_observer.get()) + { + _observer->finishRead(); + } + assert(_readStream.i == _readStream.b.end()); _readStream.i = _readStream.b.begin(); Byte m[4]; @@ -2432,6 +2501,10 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb // // Send the message. // + if(_observer.get()) + { + _observer->startWrite(); + } assert(_writeStream.i); if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream)) { @@ -2439,6 +2512,10 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb scheduleTimeout(SocketOperationWrite, _endpoint->timeout()); return; } + if(_observer.get()) + { + _observer->finishWrite(); + } } } catch(const Ice::LocalException& ex) |