summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp249
1 files changed, 231 insertions, 18 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 7d1303a00ca..c984fbab901 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -29,6 +29,7 @@
using namespace std;
using namespace Ice;
+using namespace Ice::Instrumentation;
using namespace IceInternal;
Ice::LocalObject* Ice::upCast(ConnectionI* p) { return p; }
@@ -36,6 +37,9 @@ Ice::LocalObject* Ice::upCast(ConnectionI* p) { return p; }
namespace
{
+const ::std::string __flushBatchRequests_name = "flushBatchRequests";
+
+
class TimeoutCallback : public IceUtil::TimerTask
{
public:
@@ -118,6 +122,16 @@ private:
ConnectionIPtr _connection;
};
+ConnectionState connectionStateMap[] = {
+ ConnectionStateValidating, // StateNotInitialized
+ ConnectionStateValidating, // StateNotValidated
+ ConnectionStateActive, // StateActive
+ ConnectionStateHolding, // StateHolding
+ ConnectionStateClosing, // StateClosing
+ ConnectionStateClosed, // StateClosed
+ ConnectionStateClosed, // StateFinished
+};
+
}
void
@@ -125,6 +139,10 @@ IceInternal::ConnectionReaper::add(const ConnectionIPtr& connection)
{
Lock sync(*this);
_connections.push_back(connection);
+ if(connection->_observer)
+ {
+ connection->_observer.detach();
+ }
}
void
@@ -134,6 +152,46 @@ IceInternal::ConnectionReaper::swapConnections(vector<ConnectionIPtr>& connectio
_connections.swap(connections);
}
+Ice::ConnectionI::Observer::Observer() : _readStreamPos(0), _writeStreamPos(0)
+{
+}
+
+void
+Ice::ConnectionI::Observer::startRead(Ice::Byte* i)
+{
+ if(_readStreamPos)
+ {
+ _observer->receivedBytes(static_cast<int>(i - _readStreamPos));
+ }
+ _readStreamPos = i;
+}
+
+void
+Ice::ConnectionI::Observer::finishRead(Ice::Byte* i)
+{
+ assert(i >= _readStreamPos);
+ _observer->receivedBytes(static_cast<int>(i - _readStreamPos));
+ _readStreamPos = 0;
+}
+
+void
+Ice::ConnectionI::Observer::startWrite(Ice::Byte* i)
+{
+ if(_writeStreamPos)
+ {
+ _observer->sentBytes(static_cast<int>(i - _writeStreamPos));
+ }
+ _writeStreamPos = i;
+}
+
+void
+Ice::ConnectionI::Observer::finishWrite(Ice::Byte* i)
+{
+ assert(i >= _writeStreamPos);
+ _observer->sentBytes(static_cast<int>(i - _writeStreamPos));
+ _writeStreamPos = 0;
+}
+
void
Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
{
@@ -437,6 +495,22 @@ Ice::ConnectionI::waitUntilFinished()
}
void
+Ice::ConnectionI::updateObserver()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state < StateNotValidated || _state > StateClosed)
+ {
+ return;
+ }
+
+ assert(_instance->initializationData().observer);
+ _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ toConnectionState(_state),
+ _observer.get()));
+}
+
+void
Ice::ConnectionI::monitor(const IceUtil::Time& now)
{
IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this);
@@ -485,6 +559,8 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
assert(_state > StateNotValidated);
assert(_state < StateClosing);
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint);
+
//
// Ensure the message isn't bigger than what we can send with the
// transport.
@@ -562,6 +638,8 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
assert(_state > StateNotValidated);
assert(_state < StateClosing);
+ out->__attachRemoteObserver(initConnectionInfo(), _endpoint);
+
//
// Ensure the message isn't bigger than what we can send with the
// transport.
@@ -819,7 +897,8 @@ Ice::ConnectionI::abortBatchRequest()
void
Ice::ConnectionI::flushBatchRequests()
{
- BatchOutgoing out(this, _instance.get());
+ IceInternal::InvocationObserver observer(_instance.get(), __flushBatchRequests_name);
+ BatchOutgoing out(this, _instance.get(), observer);
out.invoke();
}
@@ -829,13 +908,6 @@ Ice::ConnectionI::begin_flushBatchRequests()
return __begin_flushBatchRequests(__dummyCallback, 0);
}
-namespace
-{
-
-const ::std::string __flushBatchRequests_name = "flushBatchRequests";
-
-}
-
AsyncResultPtr
Ice::ConnectionI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie)
{
@@ -886,6 +958,8 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
_exception->ice_throw();
}
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint);
+
if(_batchRequestNum == 0)
{
out->sent(false);
@@ -944,6 +1018,8 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
_exception->ice_throw();
}
+ outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint);
+
if(_batchRequestNum == 0)
{
AsyncStatus status = AsyncStatusSent;
@@ -1145,6 +1221,11 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
{
if(operation & SocketOperationWrite)
{
+ if(_observer)
+ {
+ _observer.startWrite(_writeStream.i);
+ }
+
if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty())
{
// The whole message is written, assume it's sent now for at-most-once semantics.
@@ -1153,6 +1234,11 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
}
else if(operation & SocketOperationRead)
{
+ if(_observer && !_readHeader)
+ {
+ _observer.startRead(_readStream.i);
+ }
+
_transceiver->startRead(_readStream);
}
}
@@ -1172,10 +1258,18 @@ Ice::ConnectionI::finishAsync(SocketOperation operation)
if(operation & SocketOperationWrite)
{
_transceiver->finishWrite(_writeStream);
+ if(_observer)
+ {
+ _observer.finishWrite(_writeStream.i);
+ }
}
else if(operation & SocketOperationRead)
{
_transceiver->finishRead(_readStream);
+ if(_observer && !_readHeader)
+ {
+ _observer.finishRead(_readStream.i);
+ }
}
}
catch(const Ice::LocalException& ex)
@@ -1219,11 +1313,24 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
unscheduleTimeout(current.operation);
if(current.operation & SocketOperationWrite && !_writeStream.b.empty())
{
- if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream))
+ if(_writeStream.i != _writeStream.b.end())
{
- assert(!_writeStream.b.empty());
- scheduleTimeout(SocketOperationWrite, _endpoint->timeout());
- return;
+ if(_observer)
+ {
+ _observer.startWrite(_writeStream.i);
+ }
+
+ if(!_transceiver->write(_writeStream))
+ {
+ assert(!_writeStream.b.empty());
+ scheduleTimeout(SocketOperationWrite, _endpoint->timeout());
+ return;
+ }
+
+ if(_observer)
+ {
+ _observer.finishWrite(_writeStream.i);
+ }
}
assert(_writeStream.i == _writeStream.b.end());
}
@@ -1237,6 +1344,11 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
assert(_readStream.i == _readStream.b.end());
_readHeader = false;
+
+ if(_observer)
+ {
+ _observer->receivedBytes(static_cast<int>(headerSize));
+ }
ptrdiff_t pos = _readStream.i - _readStream.b.begin();
if(pos < headerSize)
@@ -1292,15 +1404,26 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
else
{
+ if(_observer)
+ {
+ _observer.startRead(_readStream.i);
+ }
+
if(!_transceiver->read(_readStream))
{
assert(!_readStream.b.empty());
scheduleTimeout(SocketOperationRead, _endpoint->timeout());
return;
}
+
+ if(_observer)
+ {
+ _observer.finishRead(_readStream.i);
+ }
assert(_readStream.i == _readStream.b.end());
}
}
+
}
if(_state <= StateNotValidated)
@@ -1692,11 +1815,7 @@ Ice::ConnectionI::getInfo() const
{
_exception->ice_throw();
}
-
- ConnectionInfoPtr info = _transceiver->getInfo();
- info->incoming = _connector == 0;
- info->adapterName = _adapter ? _adapter->getName() : string();
- return info;
+ return initConnectionInfo();
}
void
@@ -1778,7 +1897,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
_state(StateNotInitialized),
_shutdownInitiated(false)
{
-
int& compressionLevel = const_cast<int&>(_compressionLevel);
compressionLevel = _instance->initializationData().properties->getPropertyAsIntWithDefault(
"Ice.Compression.Level", 1);
@@ -2040,6 +2158,30 @@ Ice::ConnectionI::setState(State state)
}
}
+ if(_instance->initializationData().observer)
+ {
+ ConnectionState oldState = toConnectionState(_state);
+ ConnectionState newState = toConnectionState(state);
+ if(oldState != newState)
+ {
+ _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ newState,
+ _observer.get()));
+ }
+ if(_observer && state == StateClosed && _exception.get())
+ {
+ if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
+ dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
+ dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
+ (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing)))
+ {
+ _observer->failed(_exception->ice_name());
+ }
+ }
+ }
_state = state;
notifyAll();
@@ -2121,6 +2263,7 @@ Ice::ConnectionI::initialize(SocketOperation operation)
//
const_cast<string&>(_desc) = _transceiver->toString();
setState(StateNotValidated);
+
return true;
}
@@ -2146,12 +2289,22 @@ Ice::ConnectionI::validate(SocketOperation operation)
traceSend(_writeStream, _logger, _traceLevels);
}
+ if(_observer)
+ {
+ _observer.startWrite(_writeStream.i);
+ }
+
if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream))
{
scheduleTimeout(SocketOperationWrite, connectTimeout());
_threadPool->update(this, operation, SocketOperationWrite);
return false;
}
+
+ if(_observer)
+ {
+ _observer.finishWrite(_writeStream.i);
+ }
}
else // The client side has the passive role for connection validation.
{
@@ -2161,6 +2314,11 @@ Ice::ConnectionI::validate(SocketOperation operation)
_readStream.i = _readStream.b.begin();
}
+ if(_observer)
+ {
+ _observer.startRead(_readStream.i);
+ }
+
if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream))
{
scheduleTimeout(SocketOperationRead, connectTimeout());
@@ -2168,6 +2326,11 @@ Ice::ConnectionI::validate(SocketOperation operation)
return false;
}
+ if(_observer)
+ {
+ _observer.finishRead(_readStream.i);
+ }
+
assert(_readStream.i == _readStream.b.end());
_readStream.i = _readStream.b.begin();
Byte m[4];
@@ -2321,6 +2484,10 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb
//
// Send the message.
//
+ if(_observer)
+ {
+ _observer.startWrite(_writeStream.i);
+ }
assert(_writeStream.i);
if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream))
{
@@ -2328,6 +2495,10 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb
scheduleTimeout(SocketOperationWrite, _endpoint->timeout());
return;
}
+ if(_observer)
+ {
+ _observer.finishWrite(_writeStream.i);
+ }
}
}
catch(const Ice::LocalException& ex)
@@ -2396,8 +2567,17 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
//
// Send the message without blocking.
//
+ if(_observer)
+ {
+ _observer.startWrite(stream.i);
+ }
if(_transceiver->write(stream))
{
+ if(_observer)
+ {
+ _observer.finishWrite(stream.i);
+ }
+
AsyncStatus status = AsyncStatusSent;
if(message.sent(this, false))
{
@@ -2449,8 +2629,16 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
//
// Send the message without blocking.
//
+ if(_observer)
+ {
+ _observer.startWrite(message.stream->i);
+ }
if(_transceiver->write(*message.stream))
{
+ if(_observer)
+ {
+ _observer.finishWrite(message.stream->i);
+ }
AsyncStatus status = AsyncStatusSent;
if(message.sent(this, false))
{
@@ -2899,3 +3087,28 @@ Ice::ConnectionI::closeTimeout()
return _endpoint->timeout();
}
}
+
+Ice::ConnectionInfoPtr
+Ice::ConnectionI::initConnectionInfo() const
+{
+ if(_info)
+ {
+ return _info;
+ }
+
+ ConnectionInfoPtr info = _transceiver->getInfo();
+ info->connectionId = _endpoint->connectionId();
+ info->incoming = _connector == 0;
+ info->adapterName = _adapter ? _adapter->getName() : string();
+ if(_state > StateNotInitialized)
+ {
+ _info = info; // Cache the connection information only if initialized.
+ }
+ return info;
+}
+
+ConnectionState
+ConnectionI::toConnectionState(State state) const
+{
+ return connectionStateMap[static_cast<int>(state)];
+}