diff options
author | Benoit Foucher <benoit@zeroc.com> | 2012-09-10 08:47:58 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2012-09-10 08:47:58 +0200 |
commit | 9560b7d54ec4411f0605a3b53997835599f70ea2 (patch) | |
tree | c40611c772a7a4f1af4ea0df5d487305dded456d /cpp/src/Ice/OutgoingAsync.cpp | |
parent | Fix (diff) | |
download | ice-9560b7d54ec4411f0605a3b53997835599f70ea2.tar.bz2 ice-9560b7d54ec4411f0605a3b53997835599f70ea2.tar.xz ice-9560b7d54ec4411f0605a3b53997835599f70ea2.zip |
Fixed communicator flushBatchRequests to allow tracing
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 127 |
1 files changed, 70 insertions, 57 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 6f96ed33a8e..2224b7dee97 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -504,6 +504,7 @@ IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection) { if(!_proxy->ice_isTwoway()) { + _remoteObserver.detach(); if(!_callback || !_callback->__hasSentCallback()) { _observer.detach(); @@ -538,6 +539,7 @@ IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); assert(!(_state & Done)); + _remoteObserver.detach(); if(_timerTaskConnection) { _instance->timer()->cancel(this); @@ -576,6 +578,7 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc) // calling on the callback. The LocalExceptionWrapper exception is only called // before the invocation is sent. // + _remoteObserver.detach(); try { @@ -605,6 +608,7 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); assert(!_exception.get() && !(_state & Done)); + _remoteObserver.detach(); if(_timerTaskConnection) { @@ -883,6 +887,7 @@ IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection) IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); assert(!_exception.get()); _state |= Done | OK | Sent; + _remoteObserver.detach(); _monitor.notifyAll(); if(_callback && _callback->__hasSentCallback()) { @@ -904,6 +909,7 @@ IceInternal::BatchOutgoingAsync::__sent() void IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc, bool) { + _remoteObserver.detach(); __exception(exc); } @@ -954,6 +960,7 @@ IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const Co BatchOutgoingAsync(communicator, instance, operation, delegate, cookie), _connection(con) { + _observer.attach(instance.get(), operation); } void @@ -981,7 +988,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons const string& operation, const CallbackBasePtr& delegate, const Ice::LocalObjectPtr& cookie) : - BatchOutgoingAsync(communicator, instance, operation, delegate, cookie) + AsyncResult(communicator, instance, operation, delegate, cookie) { // // _useCount is initialized to 1 to prevent premature callbacks. @@ -994,93 +1001,98 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons // Assume all connections are flushed synchronously. // _sentSynchronously = true; + + // + // Attach observer + // + _observer.attach(instance.get(), operation); } void -IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionPtr& con) +IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPtr& con) { + class BatchOutgoingAsyncI : public BatchOutgoingAsync { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - ++_useCount; - } - CallbackPtr cb = newCallback(this, &CommunicatorBatchOutgoingAsync::completed, - &CommunicatorBatchOutgoingAsync::sent); - con->begin_flushBatchRequests(cb); -} + public: -void -IceInternal::CommunicatorBatchOutgoingAsync::ready() -{ - check(0, 0, true); -} + BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync) : + BatchOutgoingAsync(outAsync->_communicator, outAsync->_instance, outAsync->_operation, __dummyCallback, 0), + _outAsync(outAsync) + { + } -void -IceInternal::CommunicatorBatchOutgoingAsync::completed(const AsyncResultPtr& r) -{ - ConnectionPtr con = r->getConnection(); - assert(con); + virtual bool __sent(Ice::ConnectionI*) + { + _remoteObserver.detach(); + _outAsync->check(false); + return false; + } + + virtual void __finished(const Ice::LocalException&, bool) + { + _remoteObserver.detach(); + _outAsync->check(false); + } + + virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt) + { + _remoteObserver.attach(_outAsync->_observer.getRemoteObserver(connection, endpt)); + } + + private: + + const CommunicatorBatchOutgoingAsyncPtr _outAsync; + }; - try { - con->end_flushBatchRequests(r); - assert(false); // completed() should only be called when an exception occurs. + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + ++_useCount; } - catch(const Ice::LocalException& ex) + + AsyncStatus status = con->flushAsyncBatchRequests(new BatchOutgoingAsyncI(this)); + if(!(status & AsyncStatusSent)) { - check(r, &ex, false); + _sentSynchronously = false; } } void -IceInternal::CommunicatorBatchOutgoingAsync::sent(const AsyncResultPtr& r) +IceInternal::CommunicatorBatchOutgoingAsync::ready() { - check(r, 0, r->sentSynchronously()); + check(true); } void -IceInternal::CommunicatorBatchOutgoingAsync::check(const AsyncResultPtr& r, const LocalException* ex, bool userThread) +IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) { - bool done = false; - { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); assert(_useCount > 0); - --_useCount; - - // - // We report that the communicator flush request was sent synchronously - // if all of the connection flush requests are sent synchronously. - // - if((r && !r->sentSynchronously()) || ex) - { - _sentSynchronously = false; - } - - if(_useCount == 0) + if(--_useCount > 0) { - done = true; - _state |= Done | OK | Sent; - _monitor.notifyAll(); + return; } + + _observer.detach(); + _state |= Done | OK | Sent; + _monitor.notifyAll(); } - if(done) + // + // _sentSynchronously is immutable here. + // + if(!_sentSynchronously && userThread) { - // - // _sentSynchronously is immutable here. - // - if(!_sentSynchronously && userThread) - { - __sentAsync(); - } - else - { - assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible. - BatchOutgoingAsync::__sent(); - } + __sentAsync(); + } + else + { + assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible. + AsyncResult::__sent(); } } + namespace { @@ -1152,3 +1164,4 @@ Ice::AMICallbackBase::__sent(bool sentSynchronously) dynamic_cast<AMISentCallback*>(this)->ice_sent(); } } + |