summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp77
1 files changed, 66 insertions, 11 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 749384821f4..c9d0ac28234 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -456,15 +456,42 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod
_observer.attach(_proxy.get(), operation, context);
- //
- // Can't call async via a batch proxy.
- //
- if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram())
+ switch(_proxy->__reference()->getMode())
{
- throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI");
- }
+ case Reference::ModeTwoway:
+ case Reference::ModeOneway:
+ case Reference::ModeDatagram:
+ {
+ _os.writeBlob(requestHdr, sizeof(requestHdr));
+ break;
+ }
- _os.writeBlob(requestHdr, sizeof(requestHdr));
+ case Reference::ModeBatchOneway:
+ case Reference::ModeBatchDatagram:
+ {
+ while(true)
+ {
+ try
+ {
+ _handler = _proxy->__getRequestHandler();
+ _handler->prepareBatchRequest(&_os);
+ break;
+ }
+ catch(const RetryException&)
+ {
+ _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ _observer.failed(ex.ice_name());
+ _proxy->__setRequestHandler(_handler, 0); // Clear request handler
+ _handler = 0;
+ throw;
+ }
+ }
+ break;
+ }
+ }
Reference* ref = _proxy->__reference().get();
@@ -535,7 +562,7 @@ IceInternal::OutgoingAsync::__sent()
_sent = true;
assert(!(_state & Done));
- if(_proxy->__reference()->getMode() != Reference::ModeTwoway)
+ if(!_proxy->ice_isTwoway())
{
_childObserver.detach();
if(!_callback || !_callback->hasSentCallback())
@@ -595,6 +622,25 @@ IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc)
}
void
+IceInternal::OutgoingAsync::__invokeExceptionAsync(const Ice::Exception& ex)
+{
+ if((_state & Done) == 0 && _handler)
+ {
+ //
+ // If we didn't finish a batch oneway or datagram request, we
+ // must notify the connection about that we give up ownership
+ // of the batch stream.
+ //
+ int mode = _proxy->__reference()->getMode();
+ if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
+ {
+ _handler->abortBatchRequest();
+ }
+ }
+ AsyncResult::__invokeExceptionAsync(ex);
+}
+
+void
IceInternal::OutgoingAsync::__finished()
{
assert(_proxy->ice_isTwoway()); // Can only be called for twoways.
@@ -757,12 +803,21 @@ IceInternal::OutgoingAsync::__finished()
bool
IceInternal::OutgoingAsync::__invoke(bool synchronous)
{
+ const Reference::Mode mode = _proxy->__reference()->getMode();
+ if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
+ {
+ _state |= Done | OK;
+ _handler->finishBatchRequest(&_os);
+ _observer.detach();
+ return true;
+ }
+
while(true)
{
try
{
_sent = false;
- _handler = _proxy->__getRequestHandler(true);
+ _handler = _proxy->__getRequestHandler();
AsyncStatus status = _handler->sendAsyncRequest(this);
if(status & AsyncStatusSent)
{
@@ -783,7 +838,7 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous)
}
}
- if(_proxy->ice_isTwoway() || !(status & AsyncStatusSent))
+ if(mode == Reference::ModeTwoway || !(status & AsyncStatusSent))
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
if(!(_state & Done))
@@ -921,7 +976,7 @@ IceInternal::ProxyBatchOutgoingAsync::__invoke()
RequestHandlerPtr handler;
try
{
- handler = _proxy->__getRequestHandler(true);
+ handler = _proxy->__getRequestHandler();
AsyncStatus status = handler->sendAsyncRequest(this);
if(status & AsyncStatusSent)
{