summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2012-10-08 15:03:46 +0200
committerBenoit Foucher <benoit@zeroc.com>2012-10-08 15:03:46 +0200
commitd36ec7c740d5dfaa8e4ce2a2df6c9cb0818f01ae (patch)
tree7abffd29e98ff112cec85b658fab404961f8306a /cpp/src/Ice/OutgoingAsync.cpp
parentFreeBSD port (diff)
parentWin32 fixes (diff)
downloadice-d36ec7c740d5dfaa8e4ce2a2df6c9cb0818f01ae.tar.bz2
ice-d36ec7c740d5dfaa8e4ce2a2df6c9cb0818f01ae.tar.xz
ice-d36ec7c740d5dfaa8e4ce2a2df6c9cb0818f01ae.zip
Merge branch 'mx' into encoding11
Conflicts: cpp/demo/Freeze/backup/.depend cpp/demo/Freeze/backup/.depend.mak cpp/demo/Freeze/bench/.depend cpp/demo/Freeze/bench/.depend.mak cpp/demo/Freeze/casino/.depend cpp/demo/Freeze/casino/.depend.mak cpp/demo/Freeze/customEvictor/.depend cpp/demo/Freeze/customEvictor/.depend.mak cpp/demo/Freeze/library/.depend cpp/demo/Freeze/library/.depend.mak cpp/demo/Freeze/phonebook/.depend cpp/demo/Freeze/phonebook/.depend.mak cpp/demo/Freeze/transform/.depend cpp/demo/Freeze/transform/.depend.mak cpp/demo/Glacier2/callback/.depend cpp/demo/Glacier2/callback/.depend.mak cpp/demo/Glacier2/chat/.depend cpp/demo/Glacier2/chat/.depend.mak cpp/demo/Ice/async/.depend cpp/demo/Ice/async/.depend.mak cpp/demo/Ice/bidir/.depend cpp/demo/Ice/bidir/.depend.mak cpp/demo/Ice/callback/.depend cpp/demo/Ice/callback/.depend.mak cpp/demo/Ice/converter/.depend cpp/demo/Ice/converter/.depend.mak cpp/demo/Ice/hello/.depend cpp/demo/Ice/hello/.depend.mak cpp/demo/Ice/interleaved/.depend cpp/demo/Ice/interleaved/.depend.mak cpp/demo/Ice/invoke/.depend cpp/demo/Ice/invoke/.depend.mak cpp/demo/Ice/latency/.depend cpp/demo/Ice/latency/.depend.mak cpp/demo/Ice/minimal/.depend cpp/demo/Ice/minimal/.depend.mak cpp/demo/Ice/multicast/.depend cpp/demo/Ice/multicast/.depend.mak cpp/demo/Ice/nested/.depend cpp/demo/Ice/nested/.depend.mak cpp/demo/Ice/nrvo/.depend cpp/demo/Ice/nrvo/.depend.mak cpp/demo/Ice/plugin/.depend cpp/demo/Ice/plugin/.depend.mak cpp/demo/Ice/session/.depend cpp/demo/Ice/session/.depend.mak cpp/demo/Ice/throughput/.depend cpp/demo/Ice/throughput/.depend.mak cpp/demo/Ice/value/.depend cpp/demo/Ice/value/.depend.mak cpp/demo/IceBox/hello/.depend cpp/demo/IceBox/hello/.depend.mak cpp/demo/IceGrid/allocate/.depend cpp/demo/IceGrid/allocate/.depend.mak cpp/demo/IceGrid/icebox/.depend cpp/demo/IceGrid/icebox/.depend.mak cpp/demo/IceGrid/replication/.depend cpp/demo/IceGrid/replication/.depend.mak cpp/demo/IceGrid/secure/.depend cpp/demo/IceGrid/secure/.depend.mak cpp/demo/IceGrid/sessionActivation/.depend cpp/demo/IceGrid/sessionActivation/.depend.mak cpp/demo/IceGrid/simple/.depend cpp/demo/IceGrid/simple/.depend.mak cpp/demo/IceStorm/clock/.depend cpp/demo/IceStorm/clock/.depend.mak cpp/demo/IceStorm/counter/.depend cpp/demo/IceStorm/counter/.depend.mak cpp/demo/IceStorm/replicated/.depend cpp/demo/IceStorm/replicated/.depend.mak cpp/demo/IceStorm/replicated2/.depend cpp/demo/IceStorm/replicated2/.depend.mak cpp/demo/book/evictor_filesystem/.depend cpp/demo/book/evictor_filesystem/.depend.mak cpp/demo/book/lifecycle/.depend cpp/demo/book/lifecycle/.depend.mak cpp/demo/book/map_filesystem/.depend cpp/demo/book/map_filesystem/.depend.mak cpp/demo/book/printer/.depend cpp/demo/book/printer/.depend.mak cpp/demo/book/simple_filesystem/.depend cpp/demo/book/simple_filesystem/.depend.mak cpp/include/Ice/Outgoing.h cpp/include/Ice/OutgoingAsync.h cpp/include/Ice/StreamTraits.h cpp/src/Freeze/.depend cpp/src/Freeze/.depend.mak cpp/src/FreezeScript/.depend cpp/src/FreezeScript/.depend.mak cpp/src/Glacier2/.depend cpp/src/Glacier2/.depend.mak cpp/src/Glacier2Lib/.depend cpp/src/Glacier2Lib/.depend.mak cpp/src/Ice/.depend cpp/src/Ice/.depend.mak cpp/src/IceBox/.depend cpp/src/IceBox/.depend.mak cpp/src/IceDB/.depend cpp/src/IceDB/.depend.mak cpp/src/IceGrid/.depend cpp/src/IceGrid/.depend.mak cpp/src/IceGrid/FreezeDB/.depend cpp/src/IceGrid/FreezeDB/.depend.mak cpp/src/IceGrid/ServerCache.h cpp/src/IceGrid/ServerI.h cpp/src/IceGridLib/.depend cpp/src/IceGridLib/.depend.mak cpp/src/IcePatch2/.depend cpp/src/IcePatch2/.depend.mak cpp/src/IcePatch2Lib/.depend cpp/src/IcePatch2Lib/.depend.mak cpp/src/IceSSL/.depend cpp/src/IceSSL/.depend.mak cpp/src/IceStorm/.depend cpp/src/IceStorm/.depend.mak cpp/src/IceStorm/FreezeDB/.depend cpp/src/IceStorm/FreezeDB/.depend.mak cpp/src/IceStormLib/.depend cpp/src/IceStormLib/.depend.mak cpp/src/slice2cpp/Gen.cpp cpp/test/Freeze/complex/.depend cpp/test/Freeze/complex/.depend.mak cpp/test/Freeze/dbmap/.depend cpp/test/Freeze/dbmap/.depend.mak cpp/test/Freeze/evictor/.depend cpp/test/Freeze/evictor/.depend.mak cpp/test/Freeze/fileLock/.depend cpp/test/Freeze/fileLock/.depend.mak cpp/test/FreezeScript/dbmap/.depend cpp/test/FreezeScript/dbmap/.depend.mak cpp/test/FreezeScript/evictor/.depend cpp/test/FreezeScript/evictor/.depend.mak cpp/test/Glacier2/attack/.depend cpp/test/Glacier2/attack/.depend.mak cpp/test/Glacier2/dynamicFiltering/.depend cpp/test/Glacier2/dynamicFiltering/.depend.mak cpp/test/Glacier2/override/.depend cpp/test/Glacier2/override/.depend.mak cpp/test/Glacier2/router/.depend cpp/test/Glacier2/router/.depend.mak cpp/test/Glacier2/sessionControl/.depend cpp/test/Glacier2/sessionControl/.depend.mak cpp/test/Glacier2/sessionHelper/.depend cpp/test/Glacier2/sessionHelper/.depend.mak cpp/test/Glacier2/ssl/.depend cpp/test/Glacier2/ssl/.depend.mak cpp/test/Glacier2/staticFiltering/.depend cpp/test/Glacier2/staticFiltering/.depend.mak cpp/test/Ice/adapterDeactivation/.depend cpp/test/Ice/adapterDeactivation/.depend.mak cpp/test/Ice/ami/.depend cpp/test/Ice/ami/.depend.mak cpp/test/Ice/background/.depend cpp/test/Ice/background/.depend.mak cpp/test/Ice/binding/.depend cpp/test/Ice/binding/.depend.mak cpp/test/Ice/checksum/.depend cpp/test/Ice/checksum/.depend.mak cpp/test/Ice/checksum/server/.depend cpp/test/Ice/checksum/server/.depend.mak cpp/test/Ice/custom/.depend cpp/test/Ice/custom/.depend.mak cpp/test/Ice/defaultServant/.depend cpp/test/Ice/defaultServant/.depend.mak cpp/test/Ice/defaultValue/.depend cpp/test/Ice/defaultValue/.depend.mak cpp/test/Ice/dispatcher/.depend cpp/test/Ice/dispatcher/.depend.mak cpp/test/Ice/exceptions/.depend cpp/test/Ice/exceptions/.depend.mak cpp/test/Ice/facets/.depend cpp/test/Ice/facets/.depend.mak cpp/test/Ice/faultTolerance/.depend cpp/test/Ice/faultTolerance/.depend.mak cpp/test/Ice/gc/.depend cpp/test/Ice/gc/.depend.mak cpp/test/Ice/hash/.depend cpp/test/Ice/hash/.depend.mak cpp/test/Ice/hold/.depend cpp/test/Ice/hold/.depend.mak cpp/test/Ice/info/.depend cpp/test/Ice/info/.depend.mak cpp/test/Ice/inheritance/.depend cpp/test/Ice/inheritance/.depend.mak cpp/test/Ice/interceptor/.depend cpp/test/Ice/interceptor/.depend.mak cpp/test/Ice/invoke/.depend cpp/test/Ice/invoke/.depend.mak cpp/test/Ice/location/.depend cpp/test/Ice/location/.depend.mak cpp/test/Ice/objects/.depend cpp/test/Ice/objects/.depend.mak cpp/test/Ice/operations/.depend cpp/test/Ice/operations/.depend.mak cpp/test/Ice/plugin/.depend cpp/test/Ice/plugin/.depend.mak cpp/test/Ice/properties/.depend cpp/test/Ice/properties/.depend.mak cpp/test/Ice/proxy/.depend cpp/test/Ice/proxy/.depend.mak cpp/test/Ice/retry/.depend cpp/test/Ice/retry/.depend.mak cpp/test/Ice/servantLocator/.depend cpp/test/Ice/servantLocator/.depend.mak cpp/test/Ice/slicing/exceptions/.depend cpp/test/Ice/slicing/exceptions/.depend.mak cpp/test/Ice/slicing/objects/.depend cpp/test/Ice/slicing/objects/.depend.mak cpp/test/Ice/stream/.depend cpp/test/Ice/stream/.depend.mak cpp/test/Ice/stringConverter/.depend cpp/test/Ice/stringConverter/.depend.mak cpp/test/Ice/threadPoolPriority/.depend cpp/test/Ice/threadPoolPriority/.depend.mak cpp/test/Ice/timeout/.depend cpp/test/Ice/timeout/.depend.mak cpp/test/Ice/udp/.depend cpp/test/Ice/udp/.depend.mak cpp/test/IceBox/configuration/.depend cpp/test/IceBox/configuration/.depend.mak cpp/test/IceGrid/activation/.depend cpp/test/IceGrid/activation/.depend.mak cpp/test/IceGrid/admin/.depend cpp/test/IceGrid/admin/.depend.mak cpp/test/IceGrid/allocation/.depend cpp/test/IceGrid/allocation/.depend.mak cpp/test/IceGrid/deployer/.depend cpp/test/IceGrid/deployer/.depend.mak cpp/test/IceGrid/distribution/.depend cpp/test/IceGrid/distribution/.depend.mak cpp/test/IceGrid/replicaGroup/.depend cpp/test/IceGrid/replicaGroup/.depend.mak cpp/test/IceGrid/replication/.depend cpp/test/IceGrid/replication/.depend.mak cpp/test/IceGrid/session/.depend cpp/test/IceGrid/session/.depend.mak cpp/test/IceGrid/simple/.depend cpp/test/IceGrid/simple/.depend.mak cpp/test/IceGrid/update/.depend cpp/test/IceGrid/update/.depend.mak cpp/test/IceSSL/configuration/.depend cpp/test/IceSSL/configuration/.depend.mak cpp/test/IceStorm/federation/.depend cpp/test/IceStorm/federation/.depend.mak cpp/test/IceStorm/federation2/.depend cpp/test/IceStorm/federation2/.depend.mak cpp/test/IceStorm/rep1/.depend cpp/test/IceStorm/rep1/.depend.mak cpp/test/IceStorm/repgrid/.depend cpp/test/IceStorm/repgrid/.depend.mak cpp/test/IceStorm/repstress/.depend cpp/test/IceStorm/repstress/.depend.mak cpp/test/IceStorm/single/.depend cpp/test/IceStorm/single/.depend.mak cpp/test/IceStorm/stress/.depend cpp/test/IceStorm/stress/.depend.mak cpp/test/Slice/keyword/.depend cpp/test/Slice/keyword/.depend.mak cpp/test/Slice/parser/.depend cpp/test/Slice/parser/.depend.mak cpp/test/Slice/structure/.depend cpp/test/Slice/structure/.depend.mak py/modules/IcePy/.depend py/modules/IcePy/.depend.mak
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp167
1 files changed, 103 insertions, 64 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index dad736c7e14..a1f5c9ef77e 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -286,6 +286,8 @@ Ice::AsyncResult::__exception(const Ice::Exception& ex)
__warning();
}
}
+
+ _observer.detach();
}
void
@@ -326,6 +328,8 @@ Ice::AsyncResult::__response()
__warning();
}
}
+
+ _observer.detach();
}
void
@@ -425,6 +429,8 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod
_mode = mode;
_sentSynchronously = false;
+ _observer.attach(_proxy.get(), operation, context);
+
//
// Can't call async via a batch proxy.
//
@@ -498,6 +504,11 @@ IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection)
{
if(!_proxy->ice_isTwoway())
{
+ _remoteObserver.detach();
+ if(!_callback || !_callback->__hasSentCallback())
+ {
+ _observer.detach();
+ }
_state |= Done | OK;
}
else if(connection->timeout() > 0)
@@ -516,6 +527,10 @@ void
IceInternal::OutgoingAsync::__sent()
{
::Ice::AsyncResult::__sent();
+ if(!_proxy->ice_isTwoway())
+ {
+ _observer.detach();
+ }
}
void
@@ -524,6 +539,7 @@ IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!(_state & Done));
+ _remoteObserver.detach();
if(_timerTaskConnection)
{
_instance->timer()->cancel(this);
@@ -562,6 +578,7 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
// calling on the callback. The LocalExceptionWrapper exception is only called
// before the invocation is sent.
//
+ _remoteObserver.detach();
try
{
@@ -591,6 +608,7 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!_exception.get() && !(_state & Done));
+ _remoteObserver.detach();
if(_timerTaskConnection)
{
@@ -783,11 +801,11 @@ IceInternal::OutgoingAsync::handleException(const LocalExceptionWrapper& ex)
{
if(_mode == Nonmutating || _mode == Idempotent)
{
- return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt);
+ return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt, _observer);
}
else
{
- return _proxy->__handleExceptionWrapper(_delegate, ex);
+ return _proxy->__handleExceptionWrapper(_delegate, ex, _observer);
}
}
@@ -824,16 +842,16 @@ IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc, bool
{
if(_mode == Nonmutating || _mode == Idempotent)
{
- return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt);
+ return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt, _observer);
}
else
{
- return _proxy->__handleExceptionWrapper(_delegate, ex);
+ return _proxy->__handleExceptionWrapper(_delegate, ex, _observer);
}
}
catch(const Ice::LocalException& ex)
{
- return _proxy->__handleException(_delegate, ex, false, _cnt);
+ return _proxy->__handleException(_delegate, ex, false, _cnt, _observer);
}
return 0; // Keep the compiler happy.
}
@@ -869,8 +887,17 @@ IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection)
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!_exception.get());
_state |= Done | OK | Sent;
+ _remoteObserver.detach();
_monitor.notifyAll();
- return _callback && _callback->__hasSentCallback();
+ if(_callback && _callback->__hasSentCallback())
+ {
+ return true;
+ }
+ else
+ {
+ _observer.detach();
+ return false;
+ }
}
void
@@ -882,6 +909,7 @@ IceInternal::BatchOutgoingAsync::__sent()
void
IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc, bool)
{
+ _remoteObserver.detach();
__exception(exc);
}
@@ -892,6 +920,7 @@ IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectP
BatchOutgoingAsync(proxy->ice_getCommunicator(), proxy->__reference()->getInstance(), operation, delegate, cookie),
_proxy(proxy)
{
+ _observer.attach(proxy.get(), operation, 0);
}
void
@@ -918,7 +947,7 @@ IceInternal::ProxyBatchOutgoingAsync::__send()
}
catch(const ::Ice::LocalException& ex)
{
- _proxy->__handleException(delegate, ex, 0, cnt);
+ _proxy->__handleException(delegate, ex, 0, cnt, _observer);
}
}
@@ -931,6 +960,7 @@ IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const Co
BatchOutgoingAsync(communicator, instance, operation, delegate, cookie),
_connection(con)
{
+ _observer.attach(instance.get(), operation);
}
void
@@ -958,7 +988,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons
const string& operation,
const CallbackBasePtr& delegate,
const Ice::LocalObjectPtr& cookie) :
- BatchOutgoingAsync(communicator, instance, operation, delegate, cookie)
+ AsyncResult(communicator, instance, operation, delegate, cookie)
{
//
// _useCount is initialized to 1 to prevent premature callbacks.
@@ -971,93 +1001,101 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons
// Assume all connections are flushed synchronously.
//
_sentSynchronously = true;
+
+ //
+ // Attach observer
+ //
+ _observer.attach(instance.get(), operation);
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionPtr& con)
+IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPtr& con)
{
+ class BatchOutgoingAsyncI : public BatchOutgoingAsync
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- ++_useCount;
- }
- CallbackPtr cb = newCallback(this, &CommunicatorBatchOutgoingAsync::completed,
- &CommunicatorBatchOutgoingAsync::sent);
- con->begin_flushBatchRequests(cb);
-}
+ public:
-void
-IceInternal::CommunicatorBatchOutgoingAsync::ready()
-{
- check(0, 0, true);
-}
+ BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync,
+ const InstancePtr& instance,
+ InvocationObserver& observer) :
+ BatchOutgoingAsync(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0),
+ _outAsync(outAsync), _observer(observer)
+ {
+ }
-void
-IceInternal::CommunicatorBatchOutgoingAsync::completed(const AsyncResultPtr& r)
-{
- ConnectionPtr con = r->getConnection();
- assert(con);
+ virtual bool __sent(Ice::ConnectionI*)
+ {
+ _remoteObserver.detach();
+ _outAsync->check(false);
+ return false;
+ }
+
+ virtual void __finished(const Ice::LocalException&, bool)
+ {
+ _remoteObserver.detach();
+ _outAsync->check(false);
+ }
+
+ virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt)
+ {
+ _remoteObserver.attach(_observer.getRemoteObserver(connection, endpt));
+ }
+
+ private:
+
+ const CommunicatorBatchOutgoingAsyncPtr _outAsync;
+ InvocationObserver& _observer;
+ };
- try
{
- con->end_flushBatchRequests(r);
- assert(false); // completed() should only be called when an exception occurs.
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ ++_useCount;
}
- catch(const Ice::LocalException& ex)
+
+ AsyncStatus status = con->flushAsyncBatchRequests(new BatchOutgoingAsyncI(this, _instance, _observer));
+ if(!(status & AsyncStatusSent))
{
- check(r, &ex, false);
+ _sentSynchronously = false;
}
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::sent(const AsyncResultPtr& r)
+IceInternal::CommunicatorBatchOutgoingAsync::ready()
{
- check(r, 0, r->sentSynchronously());
+ check(true);
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::check(const AsyncResultPtr& r, const LocalException* ex, bool userThread)
+IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread)
{
- bool done = false;
-
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(_useCount > 0);
- --_useCount;
-
- //
- // We report that the communicator flush request was sent synchronously
- // if all of the connection flush requests are sent synchronously.
- //
- if((r && !r->sentSynchronously()) || ex)
+ if(--_useCount > 0)
{
- _sentSynchronously = false;
- }
-
- if(_useCount == 0)
- {
- done = true;
- _state |= Done | OK | Sent;
- _monitor.notifyAll();
+ return;
}
+
+ _observer.detach();
+ _state |= Done | OK | Sent;
+ _monitor.notifyAll();
}
- if(done)
+ //
+ // _sentSynchronously is immutable here.
+ //
+ if(!_sentSynchronously && userThread)
{
- //
- // _sentSynchronously is immutable here.
- //
- if(!_sentSynchronously && userThread)
- {
- __sentAsync();
- }
- else
- {
- assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible.
- BatchOutgoingAsync::__sent();
- }
+ __sentAsync();
+ }
+ else
+ {
+ assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible.
+ AsyncResult::__sent();
}
}
+
namespace
{
@@ -1129,3 +1167,4 @@ Ice::AMICallbackBase::__sent(bool sentSynchronously)
dynamic_cast<AMISentCallback*>(this)->ice_sent();
}
}
+