summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp105
1 files changed, 104 insertions, 1 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index e1a8e03dd82..7e619ba2e0f 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -25,6 +25,7 @@
#include <Ice/LocalException.h>
#include <Ice/ReferenceFactory.h> // For createProxy().
#include <Ice/ProxyFactory.h> // For createProxy().
+#include <Ice/Observer.h>
#include <bzlib.h>
using namespace std;
@@ -118,6 +119,16 @@ private:
ConnectionIPtr _connection;
};
+Ice::ConnectionState connectionStateMap[] = {
+ Ice::ConnectionStateInitializing, // StateNotInitialized
+ Ice::ConnectionStateInitializing, // StateNotValidated
+ Ice::ConnectionStateActive, // StateActive
+ Ice::ConnectionStateHolding, // StateHolding
+ Ice::ConnectionStateClosing, // StateClosing
+ Ice::ConnectionStateClosed, // StateClosed
+ Ice::ConnectionStateClosed, // StateFinished
+};
+
}
void
@@ -1145,6 +1156,13 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
{
if(operation & SocketOperationWrite)
{
+ if(_observer)
+ {
+ assert(!_writeWatch.isStarted());
+ _writeStreamPos = _writeStream.i;
+ _writeWatch.start();
+ }
+
if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty())
{
// The whole message is written, assume it's sent now for at-most-once semantics.
@@ -1153,6 +1171,13 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
}
else if(operation & SocketOperationRead)
{
+ if(_observer && !_readHeader)
+ {
+ assert(!_readWatch.isStarted());
+ _readStreamPos = _readStream.i;
+ _readWatch.start();
+ }
+
_transceiver->startRead(_readStream);
}
}
@@ -1171,10 +1196,20 @@ Ice::ConnectionI::finishAsync(SocketOperation operation)
{
if(operation & SocketOperationWrite)
{
+ if(_observer)
+ {
+ assert(_writeWatch.isStarted() && _writeStream.i >= _writeStreamPos);
+ _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop());
+ }
_transceiver->finishWrite(_writeStream);
}
else if(operation & SocketOperationRead)
{
+ if(_observer && !_readHeader)
+ {
+ assert(_readStream.i >= _readStreamPos);
+ _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop());
+ }
_transceiver->finishRead(_readStream);
}
}
@@ -1219,12 +1254,29 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
unscheduleTimeout(current.operation);
if(current.operation & SocketOperationWrite && !_writeStream.b.empty())
{
+ if(_observer && _writeStream.i != _writeStream.b.end())
+ {
+ if(_writeWatch.isStarted())
+ {
+ assert(_writeStream.i >= _writeStreamPos);
+ _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop());
+ }
+ _writeStreamPos = _writeStream.i;
+ _writeWatch.start();
+ }
+
if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream))
{
assert(!_writeStream.b.empty());
scheduleTimeout(SocketOperationWrite, _endpoint->timeout());
return;
}
+
+ if(_observer)
+ {
+ assert(_writeStream.i >= _writeStreamPos);
+ _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop());
+ }
assert(_writeStream.i == _writeStream.b.end());
}
if(current.operation & SocketOperationRead && !_readStream.b.empty())
@@ -1237,6 +1289,17 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
assert(_readStream.i == _readStream.b.end());
_readHeader = false;
+
+ if(_observer)
+ {
+ //
+ // We can't measure the time to receive the header as it would
+ // include the wait time. We start the timer now.
+ //
+ _observer->receivedBytes(static_cast<int>(_readStream.i - _readStream.b.begin()), 0);
+ _readStreamPos = _readStream.i;
+ _readWatch.start();
+ }
ptrdiff_t pos = _readStream.i - _readStream.b.begin();
if(pos < headerSize)
@@ -1284,6 +1347,17 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
_readStream.i = _readStream.b.begin() + pos;
}
+ if(_observer && _readStream.i != _readStream.b.end())
+ {
+ if(_readWatch.isStarted())
+ {
+ assert(_readStream.i >= _readStreamPos);
+ _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop());
+ }
+ _readStreamPos = _readStream.i;
+ _readWatch.start();
+ }
+
if(_readStream.i != _readStream.b.end())
{
if(_endpoint->datagram())
@@ -1301,6 +1375,12 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
assert(_readStream.i == _readStream.b.end());
}
}
+
+ if(_observer)
+ {
+ assert(_readStream.i >= _readStreamPos);
+ _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop());
+ }
}
if(_state <= StateNotValidated)
@@ -1778,7 +1858,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
_state(StateNotInitialized),
_shutdownInitiated(false)
{
-
int& compressionLevel = const_cast<int&>(_compressionLevel);
compressionLevel = _instance->initializationData().properties->getPropertyAsIntWithDefault(
"Ice.Compression.Level", 1);
@@ -1826,6 +1905,11 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool();
}
_threadPool->initialize(this);
+
+ if(_instance->initializationData().observerResolver)
+ {
+ _observer = _instance->initializationData().observerResolver->getConnectionObserver(_observer, this);
+ }
}
catch(const IceUtil::Exception&)
{
@@ -1833,10 +1917,20 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
throw;
}
__setNoDelete(false);
+
+ if(_observer)
+ {
+ _observer->attach();
+ }
}
Ice::ConnectionI::~ConnectionI()
{
+ if(_observer)
+ {
+ _observer->detach();
+ }
+
assert(!_startCallback);
assert(_state == StateFinished);
assert(_dispatchCount == 0);
@@ -2040,6 +2134,15 @@ Ice::ConnectionI::setState(State state)
}
}
+ if(_observer)
+ {
+ Ice::ConnectionState oldState = connectionStateMap[static_cast<int>(_state)];
+ Ice::ConnectionState newState = connectionStateMap[static_cast<int>(state)];
+ if(oldState != newState)
+ {
+ _observer->stateChanged(oldState, newState);
+ }
+ }
_state = state;
notifyAll();