summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp140
1 files changed, 139 insertions, 1 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index bdd5b1ea0c3..b32d8f1fa43 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -36,6 +36,8 @@ IceUtil::Shared* IceInternal::upCast(OutgoingAsyncMessageCallback* p) { return p
IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; }
IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; }
IceUtil::Shared* IceInternal::upCast(ProxyBatchOutgoingAsync* p) { return p; }
+IceUtil::Shared* IceInternal::upCast(ConnectionBatchOutgoingAsync* p) { return p; }
+IceUtil::Shared* IceInternal::upCast(CommunicatorBatchOutgoingAsync* p) { return p; }
const unsigned char Ice::AsyncResult::OK = 0x1;
const unsigned char Ice::AsyncResult::Done = 0x2;
@@ -153,7 +155,7 @@ bool
Ice::AsyncResult::isSent() const
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- return _state & (Sent | Done);
+ return _state & Sent;
}
void
@@ -914,6 +916,142 @@ IceInternal::ProxyBatchOutgoingAsync::__send()
}
}
+IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const ConnectionIPtr& con,
+ const InstancePtr& instance,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const Ice::LocalObjectPtr& cookie) :
+ BatchOutgoingAsync(instance, operation, delegate, cookie),
+ _connection(con)
+{
+}
+
+void
+IceInternal::ConnectionBatchOutgoingAsync::__send()
+{
+ AsyncStatus status = _connection->flushAsyncBatchRequests(this);
+ if(status & AsyncStatusSent)
+ {
+ _sentSynchronously = true;
+ if(status & AsyncStatusInvokeSentCallback)
+ {
+ __sent();
+ }
+ }
+}
+
+Ice::ConnectionPtr
+IceInternal::ConnectionBatchOutgoingAsync::getConnection() const
+{
+ return _connection;
+}
+
+IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const Ice::LocalObjectPtr& cookie) :
+ BatchOutgoingAsync(instance, operation, delegate, cookie),
+ _communicator(communicator)
+{
+ //
+ // _useCount is initialized to 1 to prevent premature callbacks.
+ // The caller must invoke ready() after all flush requests have
+ // been initiated.
+ //
+ _useCount = 1;
+
+ //
+ // Assume all connections are flushed synchronously.
+ //
+ _sentSynchronously = true;
+}
+
+void
+IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionPtr& con)
+{
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ ++_useCount;
+ }
+ CallbackPtr cb = newCallback(this, &CommunicatorBatchOutgoingAsync::completed,
+ &CommunicatorBatchOutgoingAsync::sent);
+ con->begin_flushBatchRequests(cb);
+}
+
+void
+IceInternal::CommunicatorBatchOutgoingAsync::ready()
+{
+ check(0, 0, true);
+}
+
+void
+IceInternal::CommunicatorBatchOutgoingAsync::completed(const AsyncResultPtr& r)
+{
+ ConnectionPtr con = r->getConnection();
+ assert(con);
+
+ try
+ {
+ con->end_flushBatchRequests(r);
+ assert(false); // completed() should only be called when an exception occurs.
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ check(r, &ex, false);
+ }
+}
+
+void
+IceInternal::CommunicatorBatchOutgoingAsync::sent(const AsyncResultPtr& r)
+{
+ check(r, 0, r->sentSynchronously());
+}
+
+void
+IceInternal::CommunicatorBatchOutgoingAsync::check(const AsyncResultPtr& r, const LocalException* ex, bool userThread)
+{
+ bool done = false;
+
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ assert(_useCount > 0);
+ --_useCount;
+
+ //
+ // We report that the communicator flush request was sent synchronously
+ // if all of the connection flush requests are sent synchronously.
+ //
+ if((r && !r->sentSynchronously()) || ex)
+ {
+ _sentSynchronously = false;
+ }
+
+ if(_useCount == 0)
+ {
+ done = true;
+ _state |= Done | OK | Sent;
+ _monitor.notifyAll();
+ }
+ }
+
+ if(done)
+ {
+ //
+ // _sentSynchronously is immutable here.
+ //
+ if(!_sentSynchronously && userThread)
+ {
+ __sentAsync();
+ }
+ else
+ {
+ assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible.
+ BatchOutgoingAsync::__sent();
+ }
+ }
+}
+
namespace
{