diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 105 |
1 files changed, 104 insertions, 1 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index e1a8e03dd82..7e619ba2e0f 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -25,6 +25,7 @@ #include <Ice/LocalException.h> #include <Ice/ReferenceFactory.h> // For createProxy(). #include <Ice/ProxyFactory.h> // For createProxy(). +#include <Ice/Observer.h> #include <bzlib.h> using namespace std; @@ -118,6 +119,16 @@ private: ConnectionIPtr _connection; }; +Ice::ConnectionState connectionStateMap[] = { + Ice::ConnectionStateInitializing, // StateNotInitialized + Ice::ConnectionStateInitializing, // StateNotValidated + Ice::ConnectionStateActive, // StateActive + Ice::ConnectionStateHolding, // StateHolding + Ice::ConnectionStateClosing, // StateClosing + Ice::ConnectionStateClosed, // StateClosed + Ice::ConnectionStateClosed, // StateFinished +}; + } void @@ -1145,6 +1156,13 @@ Ice::ConnectionI::startAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { + if(_observer) + { + assert(!_writeWatch.isStarted()); + _writeStreamPos = _writeStream.i; + _writeWatch.start(); + } + if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty()) { // The whole message is written, assume it's sent now for at-most-once semantics. @@ -1153,6 +1171,13 @@ Ice::ConnectionI::startAsync(SocketOperation operation) } else if(operation & SocketOperationRead) { + if(_observer && !_readHeader) + { + assert(!_readWatch.isStarted()); + _readStreamPos = _readStream.i; + _readWatch.start(); + } + _transceiver->startRead(_readStream); } } @@ -1171,10 +1196,20 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { + if(_observer) + { + assert(_writeWatch.isStarted() && _writeStream.i >= _writeStreamPos); + _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + } _transceiver->finishWrite(_writeStream); } else if(operation & SocketOperationRead) { + if(_observer && !_readHeader) + { + assert(_readStream.i >= _readStreamPos); + _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop()); + } _transceiver->finishRead(_readStream); } } @@ -1219,12 +1254,29 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) unscheduleTimeout(current.operation); if(current.operation & SocketOperationWrite && !_writeStream.b.empty()) { + if(_observer && _writeStream.i != _writeStream.b.end()) + { + if(_writeWatch.isStarted()) + { + assert(_writeStream.i >= _writeStreamPos); + _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + } + _writeStreamPos = _writeStream.i; + _writeWatch.start(); + } + if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream)) { assert(!_writeStream.b.empty()); scheduleTimeout(SocketOperationWrite, _endpoint->timeout()); return; } + + if(_observer) + { + assert(_writeStream.i >= _writeStreamPos); + _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + } assert(_writeStream.i == _writeStream.b.end()); } if(current.operation & SocketOperationRead && !_readStream.b.empty()) @@ -1237,6 +1289,17 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } assert(_readStream.i == _readStream.b.end()); _readHeader = false; + + if(_observer) + { + // + // We can't measure the time to receive the header as it would + // include the wait time. We start the timer now. + // + _observer->receivedBytes(static_cast<int>(_readStream.i - _readStream.b.begin()), 0); + _readStreamPos = _readStream.i; + _readWatch.start(); + } ptrdiff_t pos = _readStream.i - _readStream.b.begin(); if(pos < headerSize) @@ -1284,6 +1347,17 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) _readStream.i = _readStream.b.begin() + pos; } + if(_observer && _readStream.i != _readStream.b.end()) + { + if(_readWatch.isStarted()) + { + assert(_readStream.i >= _readStreamPos); + _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop()); + } + _readStreamPos = _readStream.i; + _readWatch.start(); + } + if(_readStream.i != _readStream.b.end()) { if(_endpoint->datagram()) @@ -1301,6 +1375,12 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) assert(_readStream.i == _readStream.b.end()); } } + + if(_observer) + { + assert(_readStream.i >= _readStreamPos); + _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop()); + } } if(_state <= StateNotValidated) @@ -1778,7 +1858,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _state(StateNotInitialized), _shutdownInitiated(false) { - int& compressionLevel = const_cast<int&>(_compressionLevel); compressionLevel = _instance->initializationData().properties->getPropertyAsIntWithDefault( "Ice.Compression.Level", 1); @@ -1826,6 +1905,11 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool(); } _threadPool->initialize(this); + + if(_instance->initializationData().observerResolver) + { + _observer = _instance->initializationData().observerResolver->getConnectionObserver(_observer, this); + } } catch(const IceUtil::Exception&) { @@ -1833,10 +1917,20 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, throw; } __setNoDelete(false); + + if(_observer) + { + _observer->attach(); + } } Ice::ConnectionI::~ConnectionI() { + if(_observer) + { + _observer->detach(); + } + assert(!_startCallback); assert(_state == StateFinished); assert(_dispatchCount == 0); @@ -2040,6 +2134,15 @@ Ice::ConnectionI::setState(State state) } } + if(_observer) + { + Ice::ConnectionState oldState = connectionStateMap[static_cast<int>(_state)]; + Ice::ConnectionState newState = connectionStateMap[static_cast<int>(state)]; + if(oldState != newState) + { + _observer->stateChanged(oldState, newState); + } + } _state = state; notifyAll(); |