diff options
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 293 |
1 files changed, 0 insertions, 293 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index bd82a31e9a5..15a2d819260 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -29,7 +29,6 @@ using namespace IceInternal; IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; } IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; } IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; } #endif const unsigned char OutgoingAsyncBase::OK = 0x1; @@ -1181,298 +1180,6 @@ OutgoingAsync::throwUserException() #endif -ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy) : ProxyOutgoingAsyncBase(proxy) -{ -} - -AsyncStatus -ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool) -{ - if(_batchRequestNum == 0) - { - if(sent()) - { - return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback); - } - else - { - return AsyncStatusSent; - } - } - _cachedConnection = connection; - return connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, _batchRequestNum); -} - -AsyncStatus -ProxyFlushBatchAsync::invokeCollocated(CollocatedRequestHandler* handler) -{ - if(_batchRequestNum == 0) - { - if(sent()) - { - return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback); - } - else - { - return AsyncStatusSent; - } - } - return handler->invokeAsyncRequest(this, _batchRequestNum, false); -} - -void -ProxyFlushBatchAsync::invoke(const string& operation) -{ - checkSupportedProtocol(getCompatibleProtocol(_proxy->_getReference()->getProtocol())); - _observer.attach(_proxy, operation, ::Ice::noExplicitContext); - _batchRequestNum = _proxy->_getBatchRequestQueue()->swap(&_os); - invokeImpl(true); // userThread = true -} - -ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx) : ProxyOutgoingAsyncBase(prx) -{ -} - -AsyncStatus -ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool) -{ - _cachedConnection = connection; - if(responseImpl(true)) - { - invokeResponseAsync(); - } - return AsyncStatusSent; -} - -AsyncStatus -ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*) -{ - if(responseImpl(true)) - { - invokeResponseAsync(); - } - return AsyncStatusSent; -} - -Ice::ConnectionPtr -ProxyGetConnection::getConnection() const -{ - return _cachedConnection; -} - -void -ProxyGetConnection::invoke(const string& operation) -{ - _observer.attach(_proxy, operation, ::Ice::noExplicitContext); - invokeImpl(true); // userThread = true -} - -ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, const InstancePtr& instance) : - OutgoingAsyncBase(instance), _connection(connection) -{ -} - -ConnectionPtr -ConnectionFlushBatchAsync::getConnection() const -{ - return _connection; -} - -void -ConnectionFlushBatchAsync::invoke(const string& operation) -{ - _observer.attach(_instance.get(), operation); - try - { - AsyncStatus status; - int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os); - if(batchRequestNum == 0) - { - status = AsyncStatusSent; - if(sent()) - { - status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback); - } - } - else - { - status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, false, false, batchRequestNum); - } - - if(status & AsyncStatusSent) - { - _sentSynchronously = true; - if(status & AsyncStatusInvokeSentCallback) - { - invokeSent(); - } - } - } - catch(const RetryException& ex) - { - if(exception(*ex.get())) - { - invokeExceptionAsync(); - } - } - catch(const Exception& ex) - { - if(exception(ex)) - { - invokeExceptionAsync(); - } - } -} - -CommunicatorFlushBatchAsync::~CommunicatorFlushBatchAsync() -{ - // Out of line to avoid weak vtable -} - -CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const InstancePtr& instance) : - OutgoingAsyncBase(instance) -{ - // - // _useCount is initialized to 1 to prevent premature callbacks. - // The caller must invoke ready() after all flush requests have - // been initiated. - // - _useCount = 1; -} - -void -CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con) -{ - class FlushBatch : public OutgoingAsyncBase - { - public: - - FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync, - const InstancePtr& instance, - InvocationObserver& observer) : - OutgoingAsyncBase(instance), _outAsync(outAsync), _observer(observer) - { - } - - virtual bool - sent() - { - _childObserver.detach(); - _outAsync->check(false); - return false; - } - - virtual bool - exception(const Exception& ex) - { - _childObserver.failed(ex.ice_id()); - _childObserver.detach(); - _outAsync->check(false); - return false; - } - - virtual InvocationObserver& - getObserver() - { - return _observer; - } - - virtual bool handleSent(bool, bool) - { - return false; - } - - virtual bool handleException(const Ice::Exception&) - { - return false; - } - - virtual bool handleResponse(bool) - { - return false; - } - - virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const - { - assert(false); - } - - virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const - { - assert(false); - } - - virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const - { - assert(false); - } - - private: - - const CommunicatorFlushBatchAsyncPtr _outAsync; - InvocationObserver& _observer; - }; - - { - Lock sync(_m); - ++_useCount; - } - - try - { - OutgoingAsyncBasePtr flushBatch = ICE_MAKE_SHARED(FlushBatch, ICE_SHARED_FROM_THIS, _instance, _observer); - int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs()); - if(batchRequestNum == 0) - { - flushBatch->sent(); - } - else - { - con->sendAsyncRequest(flushBatch, false, false, batchRequestNum); - } - } - catch(const LocalException&) - { - check(false); - throw; - } -} - -void -CommunicatorFlushBatchAsync::invoke(const string& operation) -{ - _observer.attach(_instance.get(), operation); - _instance->outgoingConnectionFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS); - _instance->objectAdapterFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS); - check(true); -} - -void -CommunicatorFlushBatchAsync::check(bool userThread) -{ - { - Lock sync(_m); - assert(_useCount > 0); - if(--_useCount > 0) - { - return; - } - } - - if(sentImpl(true)) - { - if(userThread) - { - _sentSynchronously = true; - invokeSent(); - } - else - { - invokeSentAsync(); - } - } -} - #ifdef ICE_CPP11_MAPPING bool |