diff options
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 140 |
1 files changed, 139 insertions, 1 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index bdd5b1ea0c3..b32d8f1fa43 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -36,6 +36,8 @@ IceUtil::Shared* IceInternal::upCast(OutgoingAsyncMessageCallback* p) { return p IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(ProxyBatchOutgoingAsync* p) { return p; } +IceUtil::Shared* IceInternal::upCast(ConnectionBatchOutgoingAsync* p) { return p; } +IceUtil::Shared* IceInternal::upCast(CommunicatorBatchOutgoingAsync* p) { return p; } const unsigned char Ice::AsyncResult::OK = 0x1; const unsigned char Ice::AsyncResult::Done = 0x2; @@ -153,7 +155,7 @@ bool Ice::AsyncResult::isSent() const { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - return _state & (Sent | Done); + return _state & Sent; } void @@ -914,6 +916,142 @@ IceInternal::ProxyBatchOutgoingAsync::__send() } } +IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const ConnectionIPtr& con, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& delegate, + const Ice::LocalObjectPtr& cookie) : + BatchOutgoingAsync(instance, operation, delegate, cookie), + _connection(con) +{ +} + +void +IceInternal::ConnectionBatchOutgoingAsync::__send() +{ + AsyncStatus status = _connection->flushAsyncBatchRequests(this); + if(status & AsyncStatusSent) + { + _sentSynchronously = true; + if(status & AsyncStatusInvokeSentCallback) + { + __sent(); + } + } +} + +Ice::ConnectionPtr +IceInternal::ConnectionBatchOutgoingAsync::getConnection() const +{ + return _connection; +} + +IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& delegate, + const Ice::LocalObjectPtr& cookie) : + BatchOutgoingAsync(instance, operation, delegate, cookie), + _communicator(communicator) +{ + // + // _useCount is initialized to 1 to prevent premature callbacks. + // The caller must invoke ready() after all flush requests have + // been initiated. + // + _useCount = 1; + + // + // Assume all connections are flushed synchronously. + // + _sentSynchronously = true; +} + +void +IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionPtr& con) +{ + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + ++_useCount; + } + CallbackPtr cb = newCallback(this, &CommunicatorBatchOutgoingAsync::completed, + &CommunicatorBatchOutgoingAsync::sent); + con->begin_flushBatchRequests(cb); +} + +void +IceInternal::CommunicatorBatchOutgoingAsync::ready() +{ + check(0, 0, true); +} + +void +IceInternal::CommunicatorBatchOutgoingAsync::completed(const AsyncResultPtr& r) +{ + ConnectionPtr con = r->getConnection(); + assert(con); + + try + { + con->end_flushBatchRequests(r); + assert(false); // completed() should only be called when an exception occurs. + } + catch(const Ice::LocalException& ex) + { + check(r, &ex, false); + } +} + +void +IceInternal::CommunicatorBatchOutgoingAsync::sent(const AsyncResultPtr& r) +{ + check(r, 0, r->sentSynchronously()); +} + +void +IceInternal::CommunicatorBatchOutgoingAsync::check(const AsyncResultPtr& r, const LocalException* ex, bool userThread) +{ + bool done = false; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + assert(_useCount > 0); + --_useCount; + + // + // We report that the communicator flush request was sent synchronously + // if all of the connection flush requests are sent synchronously. + // + if((r && !r->sentSynchronously()) || ex) + { + _sentSynchronously = false; + } + + if(_useCount == 0) + { + done = true; + _state |= Done | OK | Sent; + _monitor.notifyAll(); + } + } + + if(done) + { + // + // _sentSynchronously is immutable here. + // + if(!_sentSynchronously && userThread) + { + __sentAsync(); + } + else + { + assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible. + BatchOutgoingAsync::__sent(); + } + } +} + namespace { |