summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
authorZeroC Staff <git@zeroc.com>2007-12-12 21:01:52 -0500
committerZeroC Staff <git@zeroc.com>2007-12-12 21:01:52 -0500
commit883a047970693d63716aade9bd94f38c75012c7c (patch)
tree28f414393cc199bd852d618bcaf4702dd380759b /cpp/src/Ice/OutgoingAsync.cpp
parentFixed VC build (diff)
parentFixed bug 2592 (diff)
downloadice-883a047970693d63716aade9bd94f38c75012c7c.tar.bz2
ice-883a047970693d63716aade9bd94f38c75012c7c.tar.xz
ice-883a047970693d63716aade9bd94f38c75012c7c.zip
Merge branch 'master' of bernard@cvs.zeroc.com:/home/git/ice
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp645
1 files changed, 324 insertions, 321 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 4286cc1949c..1a1fe2beda5 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -23,6 +23,7 @@
#include <Ice/Protocol.h>
#include <Ice/ReplyStatus.h>
#include <Ice/ImplicitContextI.h>
+#include <Ice/ThreadPool.h>
using namespace std;
using namespace Ice;
@@ -35,38 +36,178 @@ IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_invoke* p) { return p; }
IceUtil::Shared* IceInternal::upCast(AMI_Array_Object_ice_invoke* p) { return p; }
IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_flushBatchRequests* p) { return p; }
-IceInternal::OutgoingAsync::OutgoingAsync() :
+
+namespace
+{
+
+class CallException : public ThreadPoolWorkItem
+{
+public:
+
+ CallException(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::LocalException& ex) :
+ _outAsync(outAsync), _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone()))
+ {
+ }
+
+ virtual void
+ execute(const ThreadPoolPtr& threadPool)
+ {
+ threadPool->promoteFollower();
+ _outAsync->__exception(*_exception.get());
+ }
+
+private:
+
+ const OutgoingAsyncMessageCallbackPtr _outAsync;
+ const auto_ptr<Ice::LocalException> _exception;
+};
+
+};
+
+IceInternal::OutgoingAsyncMessageCallback::OutgoingAsyncMessageCallback() :
__is(0),
__os(0)
{
}
-IceInternal::OutgoingAsync::~OutgoingAsync()
+IceInternal::OutgoingAsyncMessageCallback::~OutgoingAsyncMessageCallback()
{
assert(!__is);
assert(!__os);
}
void
+IceInternal::OutgoingAsyncMessageCallback::__exception(const Ice::Exception& exc)
+{
+ try
+ {
+ ice_exception(exc);
+ }
+ catch(const std::exception& ex)
+ {
+ __warning(ex);
+ }
+ catch(...)
+ {
+ __warning();
+ }
+
+ __release();
+}
+
+void
+IceInternal::OutgoingAsyncMessageCallback::__acquire(const Ice::ObjectPrx& proxy)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
+
+ //
+ // We must first wait for other requests to finish.
+ //
+ while(__os)
+ {
+ __monitor.wait();
+ }
+
+ Instance* instance = proxy->__reference()->getInstance().get();
+ assert(!__is);
+ __is = new BasicStream(instance);
+ assert(!__os);
+ __os = new BasicStream(instance);
+}
+
+void
+IceInternal::OutgoingAsyncMessageCallback::__release(const Ice::LocalException& exc)
+{
+ assert(__os);
+
+ //
+ // This is called by the invoking thread to release the callback following a direct
+ // failure to marhsall/send the request. We call the ice_exception() callback with
+ // the thread pool to avoid potential deadlocks in case the invoking thread locked
+ // some mutexes/resources (which couldn't be re-acquired by the callback).
+ //
+
+ try
+ {
+ __os->instance()->clientThreadPool()->execute(new CallException(this, exc));
+ }
+ catch(const Ice::CommunicatorDestroyedException&)
+ {
+ __release();
+ throw; // CommunicatorDestroyedException is the only exception that can propagate directly.
+ }
+}
+
+void
+IceInternal::OutgoingAsyncMessageCallback::__releaseNoSync()
+{
+ assert(__is);
+ delete __is;
+ __is = 0;
+
+ assert(__os);
+ delete __os;
+ __os = 0;
+
+ __monitor.notify();
+}
+
+void
+IceInternal::OutgoingAsyncMessageCallback::__warning(const std::exception& exc) const
+{
+ if(__os) // Don't print anything if release() was already called.
+ {
+ InstancePtr instance = __os->instance();
+ if(instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ Warning out(instance->initializationData().logger);
+ const Exception* ex = dynamic_cast<const ObjectNotExistException*>(&exc);
+ if(ex)
+ {
+ out << "Ice::Exception raised by AMI callback:\n" << ex;
+ }
+ else
+ {
+ out << "std::exception raised by AMI callback:\n" << exc.what();
+ }
+ }
+ }
+}
+
+void
+IceInternal::OutgoingAsyncMessageCallback::__warning() const
+{
+ if(__os) // Don't print anything if release() was already called.
+ {
+ InstancePtr instance = __os->instance();
+ if(instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ Warning out(instance->initializationData().logger);
+ out << "unknown exception raised by AMI callback";
+ }
+ }
+}
+
+void
IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
_sent = true;
if(!_proxy->ice_isTwoway())
{
- cleanup(); // No response expected, we're done with the OutgoingAsync.
+ __releaseNoSync(); // No response expected, we're done with the OutgoingAsync.
}
else if(_response)
{
- _monitor.notifyAll(); // If the response was already received notify finished() which is waiting.
+ __monitor.notifyAll(); // If the response was already received notify finished() which is waiting.
}
else if(connection->timeout() > 0)
{
- assert(!_timerTaskConnection);
+ assert(!_timerTaskConnection && __os);
_timerTaskConnection = connection;
IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout());
- _proxy->__reference()->getInstance()->timer()->schedule(this, timeout);
+ __os->instance()->timer()->schedule(this, timeout);
}
}
@@ -78,18 +219,18 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
Ice::Byte replyStatus;
try
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
assert(__os);
_response = true;
- if(_timerTaskConnection && _proxy->__reference()->getInstance()->timer()->cancel(this))
+ if(_timerTaskConnection && __os->instance()->timer()->cancel(this))
{
_timerTaskConnection = 0; // Timer cancelled.
}
while(!_sent || _timerTaskConnection)
{
- _monitor.wait();
+ __monitor.wait();
}
__is->swap(is);
@@ -222,419 +363,271 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
}
catch(const std::exception& ex)
{
- warning(ex);
+ __warning(ex);
+ __release();
}
catch(...)
{
- warning();
+ __warning();
+ __release();
}
-
-
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- cleanup();
}
void
-IceInternal::OutgoingAsync::__finished(const LocalException& exc)
+IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc)
{
- bool retry = false;
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
-
- if(__os) // Might be called from __prepare or before __prepare
- {
- if(_timerTaskConnection && _proxy->__reference()->getInstance()->timer()->cancel(this))
- {
- _timerTaskConnection = 0; // Timer cancelled.
- }
-
- while(_timerTaskConnection)
- {
- _monitor.wait();
- }
-
- //
- // A CloseConnectionException indicates graceful server
- // shutdown, and is therefore always repeatable without
- // violating "at-most-once". That's because by sending a close
- // connection message, the server guarantees that all
- // outstanding requests can safely be repeated. Otherwise, we
- // can also retry if the operation mode is Nonmutating or
- // Idempotent.
- //
- // An ObjectNotExistException can always be retried as
- // well without violating "at-most-once".
- //
- if(!_sent ||
- _mode == Nonmutating || _mode == Idempotent ||
- dynamic_cast<const CloseConnectionException*>(&exc) ||
- dynamic_cast<const ObjectNotExistException*>(&exc))
- {
- retry = true;
- }
- }
- }
-
- if(retry)
{
- try
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
+ assert(__os);
+
+ if(_timerTaskConnection && __os->instance()->timer()->cancel(this))
{
- _proxy->__handleException(_delegate, exc, _cnt);
- __send();
- return;
+ _timerTaskConnection = 0; // Timer cancelled.
}
- catch(const LocalException&)
+
+ while(_timerTaskConnection)
{
+ __monitor.wait();
}
}
+
+ //
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback.
+ //
try
{
- ice_exception(exc);
- }
- catch(const std::exception& ex)
- {
- warning(ex);
+ handleException(exc); // This will throw if the invocation can't be retried.
+ __send();
}
- catch(...)
+ catch(const Ice::LocalException& ex)
{
- warning();
+ __exception(ex);
}
-
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- cleanup();
}
void
IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& ex)
{
+ assert(__os && !_sent);
+
//
- // NOTE: This is called if sendRequest/sendAsyncRequest fails with
- // a LocalExceptionWrapper exception. It's not possible for the
- // timer to be set at this point because the request couldn't be
- // sent.
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback. The LocalExceptionWrapper exception is only called
+ // before the invocation is sent.
//
- assert(!_sent && !_timerTaskConnection);
try
{
- if(_mode == Nonmutating || _mode == Idempotent)
- {
- _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, _cnt);
- }
- else
- {
- _proxy->__handleExceptionWrapper(_delegate, ex);
- }
+ handleException(ex); // This will throw if the invocation can't be retried.
__send();
}
- catch(const Ice::LocalException& exc)
+ catch(const Ice::LocalException& ex)
{
- try
- {
- ice_exception(exc);
- }
- catch(const std::exception& ex)
- {
- warning(ex);
- }
- catch(...)
- {
- warning();
- }
-
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- cleanup();
+ __exception(ex);
}
}
void
-IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operation, OperationMode mode,
+IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operation, OperationMode mode,
const Context* context)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _proxy = prx;
+ _delegate = 0;
+ _cnt = 0;
+ _mode = mode;
- try
+ //
+ // Can't call async via a batch proxy.
+ //
+ if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram())
{
- //
- // We must first wait for other requests to finish.
- //
- while(__os)
- {
- _monitor.wait();
- }
-
- //
- // Can't call async via a batch proxy.
- //
- _proxy = prx;
- if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram())
- {
- throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI");
- }
- _delegate = 0;
- _cnt = 0;
- _mode = mode;
- _sent = false;
- _response = false;
-
- ReferencePtr ref = _proxy->__reference();
- assert(!__is);
- __is = new BasicStream(ref->getInstance().get());
- assert(!__os);
- __os = new BasicStream(ref->getInstance().get());
-
- __os->writeBlob(requestHdr, sizeof(requestHdr));
-
- ref->getIdentity().__write(__os);
-
- //
- // For compatibility with the old FacetPath.
- //
- if(ref->getFacet().empty())
- {
- __os->write(static_cast<string*>(0), static_cast<string*>(0));
- }
- else
- {
- string facet = ref->getFacet();
- __os->write(&facet, &facet + 1);
- }
+ throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI");
+ }
- __os->write(operation, false);
+ __os->writeBlob(requestHdr, sizeof(requestHdr));
- __os->write(static_cast<Byte>(_mode));
+ Reference* ref = _proxy->__reference().get();
- if(context != 0)
- {
- //
- // Explicit context
- //
- __writeContext(__os, *context);
- }
- else
- {
- //
- // Implicit context
- //
- const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext();
- const Context& prxContext = ref->getContext()->getValue();
- if(implicitContext == 0)
- {
- __writeContext(__os, prxContext);
- }
- else
- {
- implicitContext->write(prxContext, __os);
- }
- }
-
- __os->startWriteEncaps();
- }
- catch(const LocalException& ex)
- {
- cleanup();
- ex.ice_throw();
- }
-}
+ ref->getIdentity().__write(__os);
-void
-IceInternal::OutgoingAsync::__send()
-{
//
- // NOTE: no synchronization needed. At this point, no other threads can be calling on this object.
+ // For compatibility with the old FacetPath.
//
- RequestHandler* handler;
- try
+ if(ref->getFacet().empty())
{
- _delegate = _proxy->__getDelegate(true);
- handler = _delegate->__getRequestHandler().get();
+ __os->write(static_cast<string*>(0), static_cast<string*>(0));
}
- catch(const Ice::LocalException& ex)
+ else
{
- __finished(ex);
- return;
+ string facet = ref->getFacet();
+ __os->write(&facet, &facet + 1);
}
- _sent = false;
- _response = false;
- handler->sendAsyncRequest(this);
-}
-
-void
-IceInternal::OutgoingAsync::runTimerTask() // Implementation of TimerTask::runTimerTask()
-{
- Ice::ConnectionIPtr connection;
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(_timerTaskConnection && _sent); // Can only be set once the request is sent.
+ __os->write(operation, false);
- if(!_response) // If the response was just received, don't close the connection.
- {
- connection = _timerTaskConnection;
- }
- _timerTaskConnection = 0;
- _monitor.notifyAll();
- }
+ __os->write(static_cast<Byte>(_mode));
- if(connection)
+ if(context != 0)
{
- connection->exception(Ice::TimeoutException(__FILE__, __LINE__));
+ //
+ // Explicit context
+ //
+ __writeContext(__os, *context);
}
-}
-
-void
-IceInternal::OutgoingAsync::warning(const std::exception& exc) const
-{
- if(__os) // Don't print anything if cleanup() was already called.
+ else
{
- ReferencePtr ref = _proxy->__reference();
- if(ref->getInstance()->initializationData().properties->
- getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ //
+ // Implicit context
+ //
+ const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext();
+ const Context& prxContext = ref->getContext()->getValue();
+ if(implicitContext == 0)
{
- Warning out(ref->getInstance()->initializationData().logger);
- const Exception* ex = dynamic_cast<const ObjectNotExistException*>(&exc);
- if(ex)
- {
- out << "Ice::Exception raised by AMI callback:\n" << ex;
- }
- else
- {
- out << "std::exception raised by AMI callback:\n" << exc.what();
- }
+ __writeContext(__os, prxContext);
+ }
+ else
+ {
+ implicitContext->write(prxContext, __os);
}
}
+
+ __os->startWriteEncaps();
}
void
-IceInternal::OutgoingAsync::warning() const
+IceInternal::OutgoingAsync::__send()
{
- if(__os) // Don't print anything if cleanup() was already called.
+ while(true)
{
- ReferencePtr ref = _proxy->__reference();
- if(ref->getInstance()->initializationData().properties->
- getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ try
{
- Warning out(ref->getInstance()->initializationData().logger);
- out << "unknown exception raised by AMI callback";
+ _sent = false;
+ _response = false;
+ _delegate = _proxy->__getDelegate(true);
+ _delegate->__getRequestHandler()->sendAsyncRequest(this);
+ return;
+ }
+ catch(const LocalExceptionWrapper& ex)
+ {
+ handleException(ex);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ handleException(ex);
}
}
}
void
-IceInternal::OutgoingAsync::cleanup()
-{
- assert(!_timerTaskConnection);
-
- delete __is;
- __is = 0;
- delete __os;
- __os = 0;
-
- _monitor.notify();
-}
-
-IceInternal::BatchOutgoingAsync::BatchOutgoingAsync() : _os(0)
-{
-}
-
-void
-IceInternal::BatchOutgoingAsync::__prepare(const InstancePtr& instance)
+IceInternal::OutgoingAsync::handleException(const LocalExceptionWrapper& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- while(_os)
+ if(_mode == Nonmutating || _mode == Idempotent)
{
- _monitor.wait();
+ _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, _cnt);
+ }
+ else
+ {
+ _proxy->__handleExceptionWrapper(_delegate, ex);
}
- _os = new BasicStream(instance.get());
-}
-
-void
-IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- cleanup();
}
void
-IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc)
+IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc)
{
try
{
- ice_exception(exc);
+ //
+ // A CloseConnectionException indicates graceful server shutdown, and is therefore
+ // always repeatable without violating "at-most-once". That's because by sending a
+ // close connection message, the server guarantees that all outstanding requests
+ // can safely be repeated.
+ //
+ // An ObjectNotExistException can always be retried as well without violating
+ // "at-most-once".
+ //
+ if(!_sent ||
+ dynamic_cast<const CloseConnectionException*>(&exc) ||
+ dynamic_cast<const ObjectNotExistException*>(&exc))
+ {
+ exc.ice_throw();
+ }
+
+ //
+ // Throw the exception wrapped in a LocalExceptionWrapper, to indicate that the
+ // request cannot be resent without potentially violating the "at-most-once"
+ // principle.
+ //
+ throw LocalExceptionWrapper(exc, false);
}
- catch(const std::exception& ex)
+ catch(const LocalExceptionWrapper& ex)
{
- warning(ex);
+ if(_mode == Nonmutating || _mode == Idempotent)
+ {
+ _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, _cnt);
+ }
+ else
+ {
+ _proxy->__handleExceptionWrapper(_delegate, ex);
+ }
}
- catch(...)
+ catch(const Ice::LocalException& ex)
{
- warning();
+ _proxy->__handleException(_delegate, ex, _cnt);
}
-
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- cleanup();
}
void
-IceInternal::BatchOutgoingAsync::warning(const std::exception& exc) const
+IceInternal::OutgoingAsync::runTimerTask() // Implementation of TimerTask::runTimerTask()
{
- if(_os) // Don't print anything if cleanup() was already called.
+ Ice::ConnectionIPtr connection;
{
- if(_os->instance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
+ assert(_timerTaskConnection && _sent); // Can only be set once the request is sent.
+
+ if(!_response) // If the response was just received, don't close the connection.
{
- Warning out(_os->instance()->initializationData().logger);
- const Exception* ex = dynamic_cast<const ObjectNotExistException*>(&exc);
- if(ex)
- {
- out << "Ice::Exception raised by AMI callback:\n" << ex;
- }
- else
- {
- out << "std::exception raised by AMI callback:\n" << exc.what();
- }
+ connection = _timerTaskConnection;
}
+ _timerTaskConnection = 0;
+ __monitor.notifyAll();
}
-}
-void
-IceInternal::BatchOutgoingAsync::warning() const
-{
- if(_os) // Don't print anything if cleanup() was already called.
+ if(connection)
{
- if(_os->instance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
- {
- Warning out(_os->instance()->initializationData().logger);
- out << "unknown exception raised by AMI callback";
- }
+ connection->exception(Ice::TimeoutException(__FILE__, __LINE__));
}
}
void
-IceInternal::BatchOutgoingAsync::cleanup()
+IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection)
{
- delete _os;
- _os = 0;
+ __release();
+}
- _monitor.notify();
+void
+IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc)
+{
+ __exception(exc);
}
void
Ice::AMI_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode,
const vector<Byte>& inParams, const Context* context)
{
+ __acquire(prx);
try
{
__prepare(prx, operation, mode, context);
__os->writeBlob(inParams);
__os->endWriteEncaps();
+ __send();
}
- catch(const LocalException& ex)
+ catch(const Ice::LocalException& ex)
{
- __finished(ex);
- return;
+ __release(ex);
}
- __send();
}
void
@@ -652,24 +645,25 @@ Ice::AMI_Object_ice_invoke::__response(bool ok) // ok == true means no user exce
return;
}
ice_response(ok, outParams);
+ __release();
}
void
Ice::AMI_Array_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode,
const pair<const Byte*, const Byte*>& inParams, const Context* context)
{
+ __acquire(prx);
try
{
__prepare(prx, operation, mode, context);
__os->writeBlob(inParams.first, static_cast<Int>(inParams.second - inParams.first));
__os->endWriteEncaps();
+ __send();
}
- catch(const LocalException& ex)
+ catch(const Ice::LocalException& ex)
{
- __finished(ex);
- return;
+ __release(ex);
}
- __send();
}
void
@@ -688,25 +682,34 @@ Ice::AMI_Array_Object_ice_invoke::__response(bool ok) // ok == true means no use
return;
}
ice_response(ok, outParams);
+ __release();
}
void
Ice::AMI_Object_ice_flushBatchRequests::__invoke(const ObjectPrx& prx)
{
- Handle< ::IceDelegate::Ice::Object> delegate;
- RequestHandler* handler;
+ __acquire(prx);
try
{
- __prepare(prx->__reference()->getInstance());
- delegate = prx->__getDelegate(true);
- handler = delegate->__getRequestHandler().get();
+ //
+ // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch
+ // requests were queued with the connection, they would be lost without being noticed.
+ //
+ Handle< ::IceDelegate::Ice::Object> delegate;
+ int cnt = -1; // Don't retry.
+ try
+ {
+ delegate = prx->__getDelegate(true);
+ delegate->__getRequestHandler()->flushAsyncBatchRequests(this);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ prx->__handleException(delegate, ex, cnt);
+ }
}
catch(const Ice::LocalException& ex)
{
- __finished(ex);
- return;
+ __release(ex);
}
-
- handler->flushAsyncBatchRequests(this);
}