diff options
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 224 |
1 files changed, 119 insertions, 105 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index f32e5f3b547..513ba5b287d 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -26,7 +26,7 @@ using namespace IceInternal; IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; } IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; } IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatch* p) { return p; } +IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; } bool OutgoingAsyncBase::sent() @@ -40,10 +40,17 @@ OutgoingAsyncBase::completed(const Exception& ex) return finished(ex); } -void -OutgoingAsyncBase::retryException(const Exception& ex) +bool +OutgoingAsyncBase::completed() +{ + assert(false); // Must be overriden by request that can handle responses + return false; +} + +BasicStream* +OutgoingAsyncBase::getIs() { - assert(false); + return 0; // Must be overriden by request that can handle responses } OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator, @@ -129,7 +136,7 @@ ProxyOutgoingAsyncBase::retryException(const Exception& ex) // require could end up waiting for the flush of the // connection to be done. // - handleRetryException(ex); + _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and always retry. _instance->retryQueue()->add(this, 0); } catch(const Ice::Exception& exc) @@ -238,9 +245,9 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) } return; // We're done! } - catch(const RetryException& ex) + catch(const RetryException&) { - handleRetryException(*ex.get()); + _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and always retry. } catch(const Exception& ex) { @@ -267,8 +274,8 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) // // If called from the user thread we re-throw, the exception // will be catch by the caller and abort() will be called. - // - if(userThread) + // + if(userThread) { throw; } @@ -313,12 +320,6 @@ ProxyOutgoingAsyncBase::finished(bool ok) return AsyncResult::finished(ok); } -void -ProxyOutgoingAsyncBase::handleRetryException(const Ice::Exception&) -{ - _proxy->__setRequestHandler(_handler, 0); // Clear request handler and always retry. -} - int ProxyOutgoingAsyncBase::handleException(const Exception& exc) { @@ -338,8 +339,8 @@ ProxyOutgoingAsyncBase::runTimerTask() } } -OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, - const string& operation, +OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, + const string& operation, const CallbackBasePtr& delegate, const LocalObjectPtr& cookie) : ProxyOutgoingAsyncBase(prx, operation, delegate, cookie), @@ -368,26 +369,7 @@ OutgoingAsync::prepare(const string& operation, OperationMode mode, const Contex case Reference::ModeBatchOneway: case Reference::ModeBatchDatagram: { - while(true) - { - try - { - _handler = _proxy->__getRequestHandler(); - _handler->prepareBatchRequest(&_os); - break; - } - catch(const RetryException&) - { - _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. - } - catch(const LocalException& ex) - { - _observer.failed(ex.ice_name()); - _proxy->__setRequestHandler(_handler, 0); // Clear request handler - _handler = 0; - throw; - } - } + _proxy->__getBatchRequestQueue()->prepareBatchRequest(&_os); break; } } @@ -445,16 +427,16 @@ OutgoingAsync::sent() } AsyncStatus -OutgoingAsync::send(const ConnectionIPtr& connection, bool compress, bool response) +OutgoingAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool response) { _cachedConnection = connection; - return connection->sendAsyncRequest(this, compress, response); + return connection->sendAsyncRequest(this, compress, response, 0); } AsyncStatus OutgoingAsync::invokeCollocated(CollocatedRequestHandler* handler) { - return handler->invokeAsyncRequest(this); + return handler->invokeAsyncRequest(this, 0); } void @@ -463,17 +445,14 @@ OutgoingAsync::abort(const Exception& ex) const Reference::Mode mode = _proxy->__reference()->getMode(); if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) { - if(_handler) - { - // - // If we didn't finish a batch oneway or datagram request, we - // must notify the connection about that we give up ownership - // of the batch stream. - // - _handler->abortBatchRequest(); - } + // + // If we didn't finish a batch oneway or datagram request, we + // must notify the connection about that we give up ownership + // of the batch stream. + // + _proxy->__getBatchRequestQueue()->abortBatchRequest(&_os); } - + ProxyOutgoingAsyncBase::abort(ex); } @@ -483,12 +462,9 @@ OutgoingAsync::invoke() const Reference::Mode mode = _proxy->__reference()->getMode(); if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) { - if(_handler) - { - _sentSynchronously = true; - _handler->finishBatchRequest(&_os); - finished(true); - } + _sentSynchronously = true; + _proxy->__getBatchRequestQueue()->finishBatchRequest(&_os, _proxy, getOperation()); + finished(true); return; // Don't call sent/completed callback for batch AMI requests } @@ -645,50 +621,58 @@ OutgoingAsync::completed() } } -ProxyFlushBatch::ProxyFlushBatch(const ObjectPrx& proxy, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : +ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrx& proxy, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : ProxyOutgoingAsyncBase(proxy, operation, delegate, cookie) { _observer.attach(proxy.get(), operation, 0); + _batchRequestNum = proxy->__getBatchRequestQueue()->swap(&_os); } AsyncStatus -ProxyFlushBatch::send(const ConnectionIPtr& connection, bool, bool) +ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool) { + if(_batchRequestNum == 0) + { + if(sent()) + { + return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback); + } + else + { + return AsyncStatusSent; + } + } _cachedConnection = connection; - return connection->flushAsyncBatchRequests(this); + return connection->sendAsyncRequest(this, compress, false, _batchRequestNum); } AsyncStatus -ProxyFlushBatch::invokeCollocated(CollocatedRequestHandler* handler) +ProxyFlushBatchAsync::invokeCollocated(CollocatedRequestHandler* handler) { - return handler->invokeAsyncBatchRequests(this); + if(_batchRequestNum == 0) + { + if(sent()) + { + return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback); + } + else + { + return AsyncStatusSent; + } + } + return handler->invokeAsyncRequest(this, _batchRequestNum); } void -ProxyFlushBatch::invoke() +ProxyFlushBatchAsync::invoke() { checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol())); invokeImpl(true); // userThread = true } -void -ProxyFlushBatch::handleRetryException(const Ice::Exception& ex) -{ - _proxy->__setRequestHandler(_handler, 0); // Clear request handler - ex.ice_throw(); // No retries, we want to notify the user of potentially lost batch requests -} - -int -ProxyFlushBatch::handleException(const Exception& ex) -{ - _proxy->__setRequestHandler(_handler, 0); // Clear request handler - ex.ice_throw(); // No retries, we want to notify the user of potentially lost batch requests - return 0; -} - ProxyGetConnection::ProxyGetConnection(const ObjectPrx& prx, const string& operation, const CallbackBasePtr& delegate, @@ -699,7 +683,7 @@ ProxyGetConnection::ProxyGetConnection(const ObjectPrx& prx, } AsyncStatus -ProxyGetConnection::send(const ConnectionIPtr& connection, bool, bool) +ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool) { _cachedConnection = connection; if(finished(true)) @@ -725,29 +709,43 @@ ProxyGetConnection::invoke() invokeImpl(true); // userThread = true } -ConnectionFlushBatch::ConnectionFlushBatch(const ConnectionIPtr& connection, - const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& delegate, - const LocalObjectPtr& cookie) : +ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, + const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : OutgoingAsyncBase(communicator, instance, operation, delegate, cookie), _connection(connection) { _observer.attach(instance.get(), operation); } -ConnectionPtr -ConnectionFlushBatch::getConnection() const +ConnectionPtr +ConnectionFlushBatchAsync::getConnection() const { return _connection; } void -ConnectionFlushBatch::invoke() +ConnectionFlushBatchAsync::invoke() { try { - AsyncStatus status = _connection->flushAsyncBatchRequests(this); + AsyncStatus status; + int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os); + if(batchRequestNum == 0) + { + status = AsyncStatusSent; + if(sent()) + { + status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback); + } + } + else + { + status = _connection->sendAsyncRequest(this, false, false, batchRequestNum); + } + if(status & AsyncStatusSent) { _sentSynchronously = true; @@ -757,6 +755,13 @@ ConnectionFlushBatch::invoke() } } } + catch(const RetryException& ex) + { + if(completed(*ex.get())) + { + invokeCompletedAsync(); + } + } catch(const Exception& ex) { if(completed(ex)) @@ -766,11 +771,11 @@ ConnectionFlushBatch::invoke() } } -CommunicatorFlushBatch::CommunicatorFlushBatch(const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& cb, - const LocalObjectPtr& cookie) : +CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& cb, + const LocalObjectPtr& cookie) : AsyncResult(communicator, instance, operation, cb, cookie) { _observer.attach(instance.get(), operation); @@ -784,17 +789,17 @@ CommunicatorFlushBatch::CommunicatorFlushBatch(const CommunicatorPtr& communicat } void -CommunicatorFlushBatch::flushConnection(const ConnectionIPtr& con) +CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con) { class FlushBatch : public OutgoingAsyncBase { public: - - FlushBatch(const CommunicatorFlushBatchPtr& outAsync, - const InstancePtr& instance, + + FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync, + const InstancePtr& instance, InvocationObserver& observer) : OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0), - _outAsync(outAsync), + _outAsync(outAsync), _observer(observer) { } @@ -821,7 +826,7 @@ CommunicatorFlushBatch::flushConnection(const ConnectionIPtr& con) return _observer; } - const CommunicatorFlushBatchPtr _outAsync; + const CommunicatorFlushBatchAsyncPtr _outAsync; InvocationObserver& _observer; }; @@ -832,7 +837,16 @@ CommunicatorFlushBatch::flushConnection(const ConnectionIPtr& con) try { - con->flushAsyncBatchRequests(new FlushBatch(this, _instance, _observer)); + IceUtil::Handle<FlushBatch> flushBatch = new FlushBatch(this, _instance, _observer); + int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs()); + if(batchRequestNum == 0) + { + flushBatch->sent(); + } + else + { + con->sendAsyncRequest(flushBatch, false, false, batchRequestNum); + } } catch(const LocalException&) { @@ -842,13 +856,13 @@ CommunicatorFlushBatch::flushConnection(const ConnectionIPtr& con) } void -CommunicatorFlushBatch::ready() +CommunicatorFlushBatchAsync::ready() { check(true); } void -CommunicatorFlushBatch::check(bool userThread) +CommunicatorFlushBatchAsync::check(bool userThread) { { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); |