diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 249 |
1 files changed, 231 insertions, 18 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 7d1303a00ca..c984fbab901 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -29,6 +29,7 @@ using namespace std; using namespace Ice; +using namespace Ice::Instrumentation; using namespace IceInternal; Ice::LocalObject* Ice::upCast(ConnectionI* p) { return p; } @@ -36,6 +37,9 @@ Ice::LocalObject* Ice::upCast(ConnectionI* p) { return p; } namespace { +const ::std::string __flushBatchRequests_name = "flushBatchRequests"; + + class TimeoutCallback : public IceUtil::TimerTask { public: @@ -118,6 +122,16 @@ private: ConnectionIPtr _connection; }; +ConnectionState connectionStateMap[] = { + ConnectionStateValidating, // StateNotInitialized + ConnectionStateValidating, // StateNotValidated + ConnectionStateActive, // StateActive + ConnectionStateHolding, // StateHolding + ConnectionStateClosing, // StateClosing + ConnectionStateClosed, // StateClosed + ConnectionStateClosed, // StateFinished +}; + } void @@ -125,6 +139,10 @@ IceInternal::ConnectionReaper::add(const ConnectionIPtr& connection) { Lock sync(*this); _connections.push_back(connection); + if(connection->_observer) + { + connection->_observer.detach(); + } } void @@ -134,6 +152,46 @@ IceInternal::ConnectionReaper::swapConnections(vector<ConnectionIPtr>& connectio _connections.swap(connections); } +Ice::ConnectionI::Observer::Observer() : _readStreamPos(0), _writeStreamPos(0) +{ +} + +void +Ice::ConnectionI::Observer::startRead(Ice::Byte* i) +{ + if(_readStreamPos) + { + _observer->receivedBytes(static_cast<int>(i - _readStreamPos)); + } + _readStreamPos = i; +} + +void +Ice::ConnectionI::Observer::finishRead(Ice::Byte* i) +{ + assert(i >= _readStreamPos); + _observer->receivedBytes(static_cast<int>(i - _readStreamPos)); + _readStreamPos = 0; +} + +void +Ice::ConnectionI::Observer::startWrite(Ice::Byte* i) +{ + if(_writeStreamPos) + { + _observer->sentBytes(static_cast<int>(i - _writeStreamPos)); + } + _writeStreamPos = i; +} + +void +Ice::ConnectionI::Observer::finishWrite(Ice::Byte* i) +{ + assert(i >= _writeStreamPos); + _observer->sentBytes(static_cast<int>(i - _writeStreamPos)); + _writeStreamPos = 0; +} + void Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) { @@ -437,6 +495,22 @@ Ice::ConnectionI::waitUntilFinished() } void +Ice::ConnectionI::updateObserver() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state < StateNotValidated || _state > StateClosed) + { + return; + } + + assert(_instance->initializationData().observer); + _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), + _endpoint, + toConnectionState(_state), + _observer.get())); +} + +void Ice::ConnectionI::monitor(const IceUtil::Time& now) { IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this); @@ -485,6 +559,8 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) assert(_state > StateNotValidated); assert(_state < StateClosing); + out->attachRemoteObserver(initConnectionInfo(), _endpoint); + // // Ensure the message isn't bigger than what we can send with the // transport. @@ -562,6 +638,8 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b assert(_state > StateNotValidated); assert(_state < StateClosing); + out->__attachRemoteObserver(initConnectionInfo(), _endpoint); + // // Ensure the message isn't bigger than what we can send with the // transport. @@ -819,7 +897,8 @@ Ice::ConnectionI::abortBatchRequest() void Ice::ConnectionI::flushBatchRequests() { - BatchOutgoing out(this, _instance.get()); + IceInternal::InvocationObserver observer(_instance.get(), __flushBatchRequests_name); + BatchOutgoing out(this, _instance.get(), observer); out.invoke(); } @@ -829,13 +908,6 @@ Ice::ConnectionI::begin_flushBatchRequests() return __begin_flushBatchRequests(__dummyCallback, 0); } -namespace -{ - -const ::std::string __flushBatchRequests_name = "flushBatchRequests"; - -} - AsyncResultPtr Ice::ConnectionI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie) { @@ -886,6 +958,8 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) _exception->ice_throw(); } + out->attachRemoteObserver(initConnectionInfo(), _endpoint); + if(_batchRequestNum == 0) { out->sent(false); @@ -944,6 +1018,8 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) _exception->ice_throw(); } + outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint); + if(_batchRequestNum == 0) { AsyncStatus status = AsyncStatusSent; @@ -1145,6 +1221,11 @@ Ice::ConnectionI::startAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { + if(_observer) + { + _observer.startWrite(_writeStream.i); + } + if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty()) { // The whole message is written, assume it's sent now for at-most-once semantics. @@ -1153,6 +1234,11 @@ Ice::ConnectionI::startAsync(SocketOperation operation) } else if(operation & SocketOperationRead) { + if(_observer && !_readHeader) + { + _observer.startRead(_readStream.i); + } + _transceiver->startRead(_readStream); } } @@ -1172,10 +1258,18 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) if(operation & SocketOperationWrite) { _transceiver->finishWrite(_writeStream); + if(_observer) + { + _observer.finishWrite(_writeStream.i); + } } else if(operation & SocketOperationRead) { _transceiver->finishRead(_readStream); + if(_observer && !_readHeader) + { + _observer.finishRead(_readStream.i); + } } } catch(const Ice::LocalException& ex) @@ -1219,11 +1313,24 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) unscheduleTimeout(current.operation); if(current.operation & SocketOperationWrite && !_writeStream.b.empty()) { - if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream)) + if(_writeStream.i != _writeStream.b.end()) { - assert(!_writeStream.b.empty()); - scheduleTimeout(SocketOperationWrite, _endpoint->timeout()); - return; + if(_observer) + { + _observer.startWrite(_writeStream.i); + } + + if(!_transceiver->write(_writeStream)) + { + assert(!_writeStream.b.empty()); + scheduleTimeout(SocketOperationWrite, _endpoint->timeout()); + return; + } + + if(_observer) + { + _observer.finishWrite(_writeStream.i); + } } assert(_writeStream.i == _writeStream.b.end()); } @@ -1237,6 +1344,11 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } assert(_readStream.i == _readStream.b.end()); _readHeader = false; + + if(_observer) + { + _observer->receivedBytes(static_cast<int>(headerSize)); + } ptrdiff_t pos = _readStream.i - _readStream.b.begin(); if(pos < headerSize) @@ -1292,15 +1404,26 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } else { + if(_observer) + { + _observer.startRead(_readStream.i); + } + if(!_transceiver->read(_readStream)) { assert(!_readStream.b.empty()); scheduleTimeout(SocketOperationRead, _endpoint->timeout()); return; } + + if(_observer) + { + _observer.finishRead(_readStream.i); + } assert(_readStream.i == _readStream.b.end()); } } + } if(_state <= StateNotValidated) @@ -1692,11 +1815,7 @@ Ice::ConnectionI::getInfo() const { _exception->ice_throw(); } - - ConnectionInfoPtr info = _transceiver->getInfo(); - info->incoming = _connector == 0; - info->adapterName = _adapter ? _adapter->getName() : string(); - return info; + return initConnectionInfo(); } void @@ -1778,7 +1897,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _state(StateNotInitialized), _shutdownInitiated(false) { - int& compressionLevel = const_cast<int&>(_compressionLevel); compressionLevel = _instance->initializationData().properties->getPropertyAsIntWithDefault( "Ice.Compression.Level", 1); @@ -2040,6 +2158,30 @@ Ice::ConnectionI::setState(State state) } } + if(_instance->initializationData().observer) + { + ConnectionState oldState = toConnectionState(_state); + ConnectionState newState = toConnectionState(state); + if(oldState != newState) + { + _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), + _endpoint, + newState, + _observer.get())); + } + if(_observer && state == StateClosed && _exception.get()) + { + if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || + dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || + dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || + dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || + dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || + (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing))) + { + _observer->failed(_exception->ice_name()); + } + } + } _state = state; notifyAll(); @@ -2121,6 +2263,7 @@ Ice::ConnectionI::initialize(SocketOperation operation) // const_cast<string&>(_desc) = _transceiver->toString(); setState(StateNotValidated); + return true; } @@ -2146,12 +2289,22 @@ Ice::ConnectionI::validate(SocketOperation operation) traceSend(_writeStream, _logger, _traceLevels); } + if(_observer) + { + _observer.startWrite(_writeStream.i); + } + if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream)) { scheduleTimeout(SocketOperationWrite, connectTimeout()); _threadPool->update(this, operation, SocketOperationWrite); return false; } + + if(_observer) + { + _observer.finishWrite(_writeStream.i); + } } else // The client side has the passive role for connection validation. { @@ -2161,6 +2314,11 @@ Ice::ConnectionI::validate(SocketOperation operation) _readStream.i = _readStream.b.begin(); } + if(_observer) + { + _observer.startRead(_readStream.i); + } + if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream)) { scheduleTimeout(SocketOperationRead, connectTimeout()); @@ -2168,6 +2326,11 @@ Ice::ConnectionI::validate(SocketOperation operation) return false; } + if(_observer) + { + _observer.finishRead(_readStream.i); + } + assert(_readStream.i == _readStream.b.end()); _readStream.i = _readStream.b.begin(); Byte m[4]; @@ -2321,6 +2484,10 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb // // Send the message. // + if(_observer) + { + _observer.startWrite(_writeStream.i); + } assert(_writeStream.i); if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream)) { @@ -2328,6 +2495,10 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb scheduleTimeout(SocketOperationWrite, _endpoint->timeout()); return; } + if(_observer) + { + _observer.finishWrite(_writeStream.i); + } } } catch(const Ice::LocalException& ex) @@ -2396,8 +2567,17 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // // Send the message without blocking. // + if(_observer) + { + _observer.startWrite(stream.i); + } if(_transceiver->write(stream)) { + if(_observer) + { + _observer.finishWrite(stream.i); + } + AsyncStatus status = AsyncStatusSent; if(message.sent(this, false)) { @@ -2449,8 +2629,16 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // // Send the message without blocking. // + if(_observer) + { + _observer.startWrite(message.stream->i); + } if(_transceiver->write(*message.stream)) { + if(_observer) + { + _observer.finishWrite(message.stream->i); + } AsyncStatus status = AsyncStatusSent; if(message.sent(this, false)) { @@ -2899,3 +3087,28 @@ Ice::ConnectionI::closeTimeout() return _endpoint->timeout(); } } + +Ice::ConnectionInfoPtr +Ice::ConnectionI::initConnectionInfo() const +{ + if(_info) + { + return _info; + } + + ConnectionInfoPtr info = _transceiver->getInfo(); + info->connectionId = _endpoint->connectionId(); + info->incoming = _connector == 0; + info->adapterName = _adapter ? _adapter->getName() : string(); + if(_state > StateNotInitialized) + { + _info = info; // Cache the connection information only if initialized. + } + return info; +} + +ConnectionState +ConnectionI::toConnectionState(State state) const +{ + return connectionStateMap[static_cast<int>(state)]; +} |