summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp127
1 files changed, 70 insertions, 57 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 6f96ed33a8e..2224b7dee97 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -504,6 +504,7 @@ IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection)
{
if(!_proxy->ice_isTwoway())
{
+ _remoteObserver.detach();
if(!_callback || !_callback->__hasSentCallback())
{
_observer.detach();
@@ -538,6 +539,7 @@ IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!(_state & Done));
+ _remoteObserver.detach();
if(_timerTaskConnection)
{
_instance->timer()->cancel(this);
@@ -576,6 +578,7 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
// calling on the callback. The LocalExceptionWrapper exception is only called
// before the invocation is sent.
//
+ _remoteObserver.detach();
try
{
@@ -605,6 +608,7 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!_exception.get() && !(_state & Done));
+ _remoteObserver.detach();
if(_timerTaskConnection)
{
@@ -883,6 +887,7 @@ IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection)
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!_exception.get());
_state |= Done | OK | Sent;
+ _remoteObserver.detach();
_monitor.notifyAll();
if(_callback && _callback->__hasSentCallback())
{
@@ -904,6 +909,7 @@ IceInternal::BatchOutgoingAsync::__sent()
void
IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc, bool)
{
+ _remoteObserver.detach();
__exception(exc);
}
@@ -954,6 +960,7 @@ IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const Co
BatchOutgoingAsync(communicator, instance, operation, delegate, cookie),
_connection(con)
{
+ _observer.attach(instance.get(), operation);
}
void
@@ -981,7 +988,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons
const string& operation,
const CallbackBasePtr& delegate,
const Ice::LocalObjectPtr& cookie) :
- BatchOutgoingAsync(communicator, instance, operation, delegate, cookie)
+ AsyncResult(communicator, instance, operation, delegate, cookie)
{
//
// _useCount is initialized to 1 to prevent premature callbacks.
@@ -994,93 +1001,98 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons
// Assume all connections are flushed synchronously.
//
_sentSynchronously = true;
+
+ //
+ // Attach observer
+ //
+ _observer.attach(instance.get(), operation);
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionPtr& con)
+IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPtr& con)
{
+ class BatchOutgoingAsyncI : public BatchOutgoingAsync
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- ++_useCount;
- }
- CallbackPtr cb = newCallback(this, &CommunicatorBatchOutgoingAsync::completed,
- &CommunicatorBatchOutgoingAsync::sent);
- con->begin_flushBatchRequests(cb);
-}
+ public:
-void
-IceInternal::CommunicatorBatchOutgoingAsync::ready()
-{
- check(0, 0, true);
-}
+ BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync) :
+ BatchOutgoingAsync(outAsync->_communicator, outAsync->_instance, outAsync->_operation, __dummyCallback, 0),
+ _outAsync(outAsync)
+ {
+ }
-void
-IceInternal::CommunicatorBatchOutgoingAsync::completed(const AsyncResultPtr& r)
-{
- ConnectionPtr con = r->getConnection();
- assert(con);
+ virtual bool __sent(Ice::ConnectionI*)
+ {
+ _remoteObserver.detach();
+ _outAsync->check(false);
+ return false;
+ }
+
+ virtual void __finished(const Ice::LocalException&, bool)
+ {
+ _remoteObserver.detach();
+ _outAsync->check(false);
+ }
+
+ virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt)
+ {
+ _remoteObserver.attach(_outAsync->_observer.getRemoteObserver(connection, endpt));
+ }
+
+ private:
+
+ const CommunicatorBatchOutgoingAsyncPtr _outAsync;
+ };
- try
{
- con->end_flushBatchRequests(r);
- assert(false); // completed() should only be called when an exception occurs.
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ ++_useCount;
}
- catch(const Ice::LocalException& ex)
+
+ AsyncStatus status = con->flushAsyncBatchRequests(new BatchOutgoingAsyncI(this));
+ if(!(status & AsyncStatusSent))
{
- check(r, &ex, false);
+ _sentSynchronously = false;
}
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::sent(const AsyncResultPtr& r)
+IceInternal::CommunicatorBatchOutgoingAsync::ready()
{
- check(r, 0, r->sentSynchronously());
+ check(true);
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::check(const AsyncResultPtr& r, const LocalException* ex, bool userThread)
+IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread)
{
- bool done = false;
-
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(_useCount > 0);
- --_useCount;
-
- //
- // We report that the communicator flush request was sent synchronously
- // if all of the connection flush requests are sent synchronously.
- //
- if((r && !r->sentSynchronously()) || ex)
- {
- _sentSynchronously = false;
- }
-
- if(_useCount == 0)
+ if(--_useCount > 0)
{
- done = true;
- _state |= Done | OK | Sent;
- _monitor.notifyAll();
+ return;
}
+
+ _observer.detach();
+ _state |= Done | OK | Sent;
+ _monitor.notifyAll();
}
- if(done)
+ //
+ // _sentSynchronously is immutable here.
+ //
+ if(!_sentSynchronously && userThread)
{
- //
- // _sentSynchronously is immutable here.
- //
- if(!_sentSynchronously && userThread)
- {
- __sentAsync();
- }
- else
- {
- assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible.
- BatchOutgoingAsync::__sent();
- }
+ __sentAsync();
+ }
+ else
+ {
+ assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible.
+ AsyncResult::__sent();
}
}
+
namespace
{
@@ -1152,3 +1164,4 @@ Ice::AMICallbackBase::__sent(bool sentSynchronously)
dynamic_cast<AMISentCallback*>(this)->ice_sent();
}
}
+