summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp189
1 files changed, 133 insertions, 56 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 46369c022d0..43a297405b2 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -145,6 +145,55 @@ IceInternal::ConnectionReaper::swapConnections(vector<ConnectionIPtr>& connectio
_connections.swap(connections);
}
+Ice::ConnectionI::Observer::Observer(const BasicStream& readStream, const BasicStream& writeStream) :
+ _readStream(readStream), _writeStream(writeStream)
+{
+}
+
+void
+Ice::ConnectionI::Observer::setObserver(const Ice::ConnectionObserverPtr& observer)
+{
+ _observer = observer;
+}
+
+void
+Ice::ConnectionI::Observer::startRead()
+{
+ if(_readWatch.isStarted())
+ {
+ assert(_readStream.i >= _readStreamPos);
+ _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop());
+ }
+ _readStreamPos = _readStream.i;
+ _readWatch.start();
+}
+
+void
+Ice::ConnectionI::Observer::finishRead()
+{
+ assert(_readStream.i >= _readStreamPos);
+ _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop());
+}
+
+void
+Ice::ConnectionI::Observer::startWrite()
+{
+ if(_writeWatch.isStarted())
+ {
+ assert(_writeStream.i >= _writeStreamPos);
+ _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop());
+ }
+ _writeStreamPos = _writeStream.i;
+ _writeWatch.start();
+}
+
+void
+Ice::ConnectionI::Observer::finishWrite()
+{
+ assert(_writeStream.i >= _writeStreamPos);
+ _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop());
+}
+
void
Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
{
@@ -451,8 +500,22 @@ void
Ice::ConnectionI::updateObserver()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(_instance->initializationData().observerResolver);
- _observer = _instance->initializationData().observerResolver->getConnectionObserver(this, _observer);
+ const ObserverResolverPtr& resolver = _instance->initializationData().observerResolver;
+ assert(resolver);
+ ConnectionObserverPtr obsv = _observer.get() ? _observer->getObserver() : ConnectionObserverPtr();
+ obsv = resolver->getConnectionObserver(this, connectionStateMap[static_cast<int>(_state)], obsv);
+ if(obsv)
+ {
+ if(!_observer.get())
+ {
+ _observer.reset(new Observer(_readStream, _writeStream));
+ }
+ _observer->setObserver(obsv);
+ }
+ else
+ {
+ _observer.reset(0);
+ }
}
void
@@ -1164,11 +1227,9 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
{
if(operation & SocketOperationWrite)
{
- if(_observer)
+ if(_observer.get())
{
- assert(!_writeWatch.isStarted());
- _writeStreamPos = _writeStream.i;
- _writeWatch.start();
+ _observer->startWrite();
}
if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty())
@@ -1179,11 +1240,9 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
}
else if(operation & SocketOperationRead)
{
- if(_observer && !_readHeader)
+ if(_observer.get() && !_readHeader)
{
- assert(!_readWatch.isStarted());
- _readStreamPos = _readStream.i;
- _readWatch.start();
+ _observer->startRead();
}
_transceiver->startRead(_readStream);
@@ -1204,19 +1263,17 @@ Ice::ConnectionI::finishAsync(SocketOperation operation)
{
if(operation & SocketOperationWrite)
{
- if(_observer)
+ if(_observer.get())
{
- assert(_writeWatch.isStarted() && _writeStream.i >= _writeStreamPos);
- _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop());
+ _observer->finishWrite();
}
_transceiver->finishWrite(_writeStream);
}
else if(operation & SocketOperationRead)
{
- if(_observer && !_readHeader)
+ if(_observer.get() && !_readHeader)
{
- assert(_readStream.i >= _readStreamPos);
- _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop());
+ _observer->finishRead();
}
_transceiver->finishRead(_readStream);
}
@@ -1262,15 +1319,9 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
unscheduleTimeout(current.operation);
if(current.operation & SocketOperationWrite && !_writeStream.b.empty())
{
- if(_observer && _writeStream.i != _writeStream.b.end())
+ if(_observer.get() && _writeStream.i != _writeStream.b.end())
{
- if(_writeWatch.isStarted())
- {
- assert(_writeStream.i >= _writeStreamPos);
- _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop());
- }
- _writeStreamPos = _writeStream.i;
- _writeWatch.start();
+ _observer->startWrite();
}
if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream))
@@ -1280,10 +1331,9 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
return;
}
- if(_observer)
+ if(_observer.get())
{
- assert(_writeStream.i >= _writeStreamPos);
- _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop());
+ _observer->finishWrite();
}
assert(_writeStream.i == _writeStream.b.end());
}
@@ -1298,15 +1348,13 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
assert(_readStream.i == _readStream.b.end());
_readHeader = false;
- if(_observer)
+ if(_observer.get())
{
//
// We can't measure the time to receive the header as it would
- // include the wait time. We start the timer now.
+ // include the wait time.
//
- _observer->receivedBytes(static_cast<int>(_readStream.i - _readStream.b.begin()), 0);
- _readStreamPos = _readStream.i;
- _readWatch.start();
+ _observer->getObserver()->receivedBytes(static_cast<int>(headerSize), 0);
}
ptrdiff_t pos = _readStream.i - _readStream.b.begin();
@@ -1355,17 +1403,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
_readStream.i = _readStream.b.begin() + pos;
}
- if(_observer && _readStream.i != _readStream.b.end())
- {
- if(_readWatch.isStarted())
- {
- assert(_readStream.i >= _readStreamPos);
- _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop());
- }
- _readStreamPos = _readStream.i;
- _readWatch.start();
- }
-
if(_readStream.i != _readStream.b.end())
{
if(_endpoint->datagram())
@@ -1374,21 +1411,26 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
else
{
+ if(_observer.get())
+ {
+ _observer->startRead();
+ }
+
if(!_transceiver->read(_readStream))
{
assert(!_readStream.b.empty());
scheduleTimeout(SocketOperationRead, _endpoint->timeout());
return;
}
+
+ if(_observer.get())
+ {
+ _observer->finishRead();
+ }
assert(_readStream.i == _readStream.b.end());
}
}
- if(_observer)
- {
- assert(_readStream.i >= _readStreamPos);
- _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop());
- }
}
if(_state <= StateNotValidated)
@@ -1782,6 +1824,7 @@ Ice::ConnectionI::getInfo() const
}
ConnectionInfoPtr info = _transceiver->getInfo();
+ info->connectionId = _endpoint->connectionId();
info->incoming = _connector == 0;
info->adapterName = _adapter ? _adapter->getName() : string();
return info;
@@ -1914,9 +1957,15 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
}
_threadPool->initialize(this);
- if(_instance->initializationData().observerResolver)
+ const ObserverResolverPtr& resolver = _instance->initializationData().observerResolver;
+ if(resolver)
{
- _observer = _instance->initializationData().observerResolver->getConnectionObserver(this, _observer);
+ ConnectionObserverPtr obsv = resolver->getConnectionObserver(this, ObserverConnectionStateInitializing, 0);
+ if(obsv)
+ {
+ _observer.reset(new Observer(_readStream, _writeStream));
+ _observer->setObserver(obsv);
+ }
}
}
catch(const IceUtil::Exception&)
@@ -1926,17 +1975,17 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
}
__setNoDelete(false);
- if(_observer)
+ if(_observer.get())
{
- _observer->attach();
+ _observer->getObserver()->attach();
}
}
Ice::ConnectionI::~ConnectionI()
{
- if(_observer)
+ if(_observer.get())
{
- _observer->detach();
+ _observer->getObserver()->detach();
}
assert(!_startCallback);
@@ -2142,13 +2191,13 @@ Ice::ConnectionI::setState(State state)
}
}
- if(_observer)
+ if(_observer.get())
{
Ice::ObserverConnectionState oldState = connectionStateMap[static_cast<int>(_state)];
Ice::ObserverConnectionState newState = connectionStateMap[static_cast<int>(state)];
if(oldState != newState)
{
- _observer->stateChanged(oldState, newState);
+ _observer->getObserver()->stateChanged(oldState, newState);
}
}
_state = state;
@@ -2257,12 +2306,22 @@ Ice::ConnectionI::validate(SocketOperation operation)
traceSend(_writeStream, _logger, _traceLevels);
}
+ if(_observer.get())
+ {
+ _observer->startWrite();
+ }
+
if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream))
{
scheduleTimeout(SocketOperationWrite, connectTimeout());
_threadPool->update(this, operation, SocketOperationWrite);
return false;
}
+
+ if(_observer.get())
+ {
+ _observer->finishWrite();
+ }
}
else // The client side has the passive role for connection validation.
{
@@ -2272,6 +2331,11 @@ Ice::ConnectionI::validate(SocketOperation operation)
_readStream.i = _readStream.b.begin();
}
+ if(_observer.get())
+ {
+ _observer->startRead();
+ }
+
if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream))
{
scheduleTimeout(SocketOperationRead, connectTimeout());
@@ -2279,6 +2343,11 @@ Ice::ConnectionI::validate(SocketOperation operation)
return false;
}
+ if(_observer.get())
+ {
+ _observer->finishRead();
+ }
+
assert(_readStream.i == _readStream.b.end());
_readStream.i = _readStream.b.begin();
Byte m[4];
@@ -2432,6 +2501,10 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb
//
// Send the message.
//
+ if(_observer.get())
+ {
+ _observer->startWrite();
+ }
assert(_writeStream.i);
if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream))
{
@@ -2439,6 +2512,10 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb
scheduleTimeout(SocketOperationWrite, _endpoint->timeout());
return;
}
+ if(_observer.get())
+ {
+ _observer->finishWrite();
+ }
}
}
catch(const Ice::LocalException& ex)