summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp224
1 files changed, 119 insertions, 105 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index f32e5f3b547..513ba5b287d 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -26,7 +26,7 @@ using namespace IceInternal;
IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; }
IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; }
IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatch* p) { return p; }
+IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; }
bool
OutgoingAsyncBase::sent()
@@ -40,10 +40,17 @@ OutgoingAsyncBase::completed(const Exception& ex)
return finished(ex);
}
-void
-OutgoingAsyncBase::retryException(const Exception& ex)
+bool
+OutgoingAsyncBase::completed()
+{
+ assert(false); // Must be overriden by request that can handle responses
+ return false;
+}
+
+BasicStream*
+OutgoingAsyncBase::getIs()
{
- assert(false);
+ return 0; // Must be overriden by request that can handle responses
}
OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator,
@@ -129,7 +136,7 @@ ProxyOutgoingAsyncBase::retryException(const Exception& ex)
// require could end up waiting for the flush of the
// connection to be done.
//
- handleRetryException(ex);
+ _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and always retry.
_instance->retryQueue()->add(this, 0);
}
catch(const Ice::Exception& exc)
@@ -238,9 +245,9 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
}
return; // We're done!
}
- catch(const RetryException& ex)
+ catch(const RetryException&)
{
- handleRetryException(*ex.get());
+ _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and always retry.
}
catch(const Exception& ex)
{
@@ -267,8 +274,8 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
//
// If called from the user thread we re-throw, the exception
// will be catch by the caller and abort() will be called.
- //
- if(userThread)
+ //
+ if(userThread)
{
throw;
}
@@ -313,12 +320,6 @@ ProxyOutgoingAsyncBase::finished(bool ok)
return AsyncResult::finished(ok);
}
-void
-ProxyOutgoingAsyncBase::handleRetryException(const Ice::Exception&)
-{
- _proxy->__setRequestHandler(_handler, 0); // Clear request handler and always retry.
-}
-
int
ProxyOutgoingAsyncBase::handleException(const Exception& exc)
{
@@ -338,8 +339,8 @@ ProxyOutgoingAsyncBase::runTimerTask()
}
}
-OutgoingAsync::OutgoingAsync(const ObjectPrx& prx,
- const string& operation,
+OutgoingAsync::OutgoingAsync(const ObjectPrx& prx,
+ const string& operation,
const CallbackBasePtr& delegate,
const LocalObjectPtr& cookie) :
ProxyOutgoingAsyncBase(prx, operation, delegate, cookie),
@@ -368,26 +369,7 @@ OutgoingAsync::prepare(const string& operation, OperationMode mode, const Contex
case Reference::ModeBatchOneway:
case Reference::ModeBatchDatagram:
{
- while(true)
- {
- try
- {
- _handler = _proxy->__getRequestHandler();
- _handler->prepareBatchRequest(&_os);
- break;
- }
- catch(const RetryException&)
- {
- _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
- }
- catch(const LocalException& ex)
- {
- _observer.failed(ex.ice_name());
- _proxy->__setRequestHandler(_handler, 0); // Clear request handler
- _handler = 0;
- throw;
- }
- }
+ _proxy->__getBatchRequestQueue()->prepareBatchRequest(&_os);
break;
}
}
@@ -445,16 +427,16 @@ OutgoingAsync::sent()
}
AsyncStatus
-OutgoingAsync::send(const ConnectionIPtr& connection, bool compress, bool response)
+OutgoingAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool response)
{
_cachedConnection = connection;
- return connection->sendAsyncRequest(this, compress, response);
+ return connection->sendAsyncRequest(this, compress, response, 0);
}
AsyncStatus
OutgoingAsync::invokeCollocated(CollocatedRequestHandler* handler)
{
- return handler->invokeAsyncRequest(this);
+ return handler->invokeAsyncRequest(this, 0);
}
void
@@ -463,17 +445,14 @@ OutgoingAsync::abort(const Exception& ex)
const Reference::Mode mode = _proxy->__reference()->getMode();
if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
{
- if(_handler)
- {
- //
- // If we didn't finish a batch oneway or datagram request, we
- // must notify the connection about that we give up ownership
- // of the batch stream.
- //
- _handler->abortBatchRequest();
- }
+ //
+ // If we didn't finish a batch oneway or datagram request, we
+ // must notify the connection about that we give up ownership
+ // of the batch stream.
+ //
+ _proxy->__getBatchRequestQueue()->abortBatchRequest(&_os);
}
-
+
ProxyOutgoingAsyncBase::abort(ex);
}
@@ -483,12 +462,9 @@ OutgoingAsync::invoke()
const Reference::Mode mode = _proxy->__reference()->getMode();
if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
{
- if(_handler)
- {
- _sentSynchronously = true;
- _handler->finishBatchRequest(&_os);
- finished(true);
- }
+ _sentSynchronously = true;
+ _proxy->__getBatchRequestQueue()->finishBatchRequest(&_os, _proxy, getOperation());
+ finished(true);
return; // Don't call sent/completed callback for batch AMI requests
}
@@ -645,50 +621,58 @@ OutgoingAsync::completed()
}
}
-ProxyFlushBatch::ProxyFlushBatch(const ObjectPrx& proxy,
- const string& operation,
- const CallbackBasePtr& delegate,
- const LocalObjectPtr& cookie) :
+ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrx& proxy,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
ProxyOutgoingAsyncBase(proxy, operation, delegate, cookie)
{
_observer.attach(proxy.get(), operation, 0);
+ _batchRequestNum = proxy->__getBatchRequestQueue()->swap(&_os);
}
AsyncStatus
-ProxyFlushBatch::send(const ConnectionIPtr& connection, bool, bool)
+ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool)
{
+ if(_batchRequestNum == 0)
+ {
+ if(sent())
+ {
+ return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
+ }
+ else
+ {
+ return AsyncStatusSent;
+ }
+ }
_cachedConnection = connection;
- return connection->flushAsyncBatchRequests(this);
+ return connection->sendAsyncRequest(this, compress, false, _batchRequestNum);
}
AsyncStatus
-ProxyFlushBatch::invokeCollocated(CollocatedRequestHandler* handler)
+ProxyFlushBatchAsync::invokeCollocated(CollocatedRequestHandler* handler)
{
- return handler->invokeAsyncBatchRequests(this);
+ if(_batchRequestNum == 0)
+ {
+ if(sent())
+ {
+ return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
+ }
+ else
+ {
+ return AsyncStatusSent;
+ }
+ }
+ return handler->invokeAsyncRequest(this, _batchRequestNum);
}
void
-ProxyFlushBatch::invoke()
+ProxyFlushBatchAsync::invoke()
{
checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol()));
invokeImpl(true); // userThread = true
}
-void
-ProxyFlushBatch::handleRetryException(const Ice::Exception& ex)
-{
- _proxy->__setRequestHandler(_handler, 0); // Clear request handler
- ex.ice_throw(); // No retries, we want to notify the user of potentially lost batch requests
-}
-
-int
-ProxyFlushBatch::handleException(const Exception& ex)
-{
- _proxy->__setRequestHandler(_handler, 0); // Clear request handler
- ex.ice_throw(); // No retries, we want to notify the user of potentially lost batch requests
- return 0;
-}
-
ProxyGetConnection::ProxyGetConnection(const ObjectPrx& prx,
const string& operation,
const CallbackBasePtr& delegate,
@@ -699,7 +683,7 @@ ProxyGetConnection::ProxyGetConnection(const ObjectPrx& prx,
}
AsyncStatus
-ProxyGetConnection::send(const ConnectionIPtr& connection, bool, bool)
+ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool)
{
_cachedConnection = connection;
if(finished(true))
@@ -725,29 +709,43 @@ ProxyGetConnection::invoke()
invokeImpl(true); // userThread = true
}
-ConnectionFlushBatch::ConnectionFlushBatch(const ConnectionIPtr& connection,
- const CommunicatorPtr& communicator,
- const InstancePtr& instance,
- const string& operation,
- const CallbackBasePtr& delegate,
- const LocalObjectPtr& cookie) :
+ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection,
+ const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
OutgoingAsyncBase(communicator, instance, operation, delegate, cookie), _connection(connection)
{
_observer.attach(instance.get(), operation);
}
-ConnectionPtr
-ConnectionFlushBatch::getConnection() const
+ConnectionPtr
+ConnectionFlushBatchAsync::getConnection() const
{
return _connection;
}
void
-ConnectionFlushBatch::invoke()
+ConnectionFlushBatchAsync::invoke()
{
try
{
- AsyncStatus status = _connection->flushAsyncBatchRequests(this);
+ AsyncStatus status;
+ int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os);
+ if(batchRequestNum == 0)
+ {
+ status = AsyncStatusSent;
+ if(sent())
+ {
+ status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
+ }
+ }
+ else
+ {
+ status = _connection->sendAsyncRequest(this, false, false, batchRequestNum);
+ }
+
if(status & AsyncStatusSent)
{
_sentSynchronously = true;
@@ -757,6 +755,13 @@ ConnectionFlushBatch::invoke()
}
}
}
+ catch(const RetryException& ex)
+ {
+ if(completed(*ex.get()))
+ {
+ invokeCompletedAsync();
+ }
+ }
catch(const Exception& ex)
{
if(completed(ex))
@@ -766,11 +771,11 @@ ConnectionFlushBatch::invoke()
}
}
-CommunicatorFlushBatch::CommunicatorFlushBatch(const CommunicatorPtr& communicator,
- const InstancePtr& instance,
- const string& operation,
- const CallbackBasePtr& cb,
- const LocalObjectPtr& cookie) :
+CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const string& operation,
+ const CallbackBasePtr& cb,
+ const LocalObjectPtr& cookie) :
AsyncResult(communicator, instance, operation, cb, cookie)
{
_observer.attach(instance.get(), operation);
@@ -784,17 +789,17 @@ CommunicatorFlushBatch::CommunicatorFlushBatch(const CommunicatorPtr& communicat
}
void
-CommunicatorFlushBatch::flushConnection(const ConnectionIPtr& con)
+CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con)
{
class FlushBatch : public OutgoingAsyncBase
{
public:
-
- FlushBatch(const CommunicatorFlushBatchPtr& outAsync,
- const InstancePtr& instance,
+
+ FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync,
+ const InstancePtr& instance,
InvocationObserver& observer) :
OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0),
- _outAsync(outAsync),
+ _outAsync(outAsync),
_observer(observer)
{
}
@@ -821,7 +826,7 @@ CommunicatorFlushBatch::flushConnection(const ConnectionIPtr& con)
return _observer;
}
- const CommunicatorFlushBatchPtr _outAsync;
+ const CommunicatorFlushBatchAsyncPtr _outAsync;
InvocationObserver& _observer;
};
@@ -832,7 +837,16 @@ CommunicatorFlushBatch::flushConnection(const ConnectionIPtr& con)
try
{
- con->flushAsyncBatchRequests(new FlushBatch(this, _instance, _observer));
+ IceUtil::Handle<FlushBatch> flushBatch = new FlushBatch(this, _instance, _observer);
+ int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs());
+ if(batchRequestNum == 0)
+ {
+ flushBatch->sent();
+ }
+ else
+ {
+ con->sendAsyncRequest(flushBatch, false, false, batchRequestNum);
+ }
}
catch(const LocalException&)
{
@@ -842,13 +856,13 @@ CommunicatorFlushBatch::flushConnection(const ConnectionIPtr& con)
}
void
-CommunicatorFlushBatch::ready()
+CommunicatorFlushBatchAsync::ready()
{
check(true);
}
void
-CommunicatorFlushBatch::check(bool userThread)
+CommunicatorFlushBatchAsync::check(bool userThread)
{
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);