summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-10-10 12:03:07 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-10-10 12:03:07 +0200
commit570455a381e6620f8ddfcca448559d3fa545ba38 (patch)
treefe3fa45e6a643b473d9370babff6224b1a9d4dcb /cpp/src
parentFixed ICE-5726: provide deprecated public StringConverterPlugin (diff)
downloadice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.bz2
ice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.xz
ice-570455a381e6620f8ddfcca448559d3fa545ba38.zip
Fixed invocation timeouts/interrupt issues, addded AsyncResult.cancel()
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/AsyncResult.cpp602
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp104
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.h24
-rw-r--r--cpp/src/Ice/CommunicatorI.cpp19
-rw-r--r--cpp/src/Ice/ConnectRequestHandler.cpp43
-rw-r--r--cpp/src/Ice/ConnectRequestHandler.h12
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp5
-rw-r--r--cpp/src/Ice/ConnectionFactory.h4
-rw-r--r--cpp/src/Ice/ConnectionI.cpp176
-rw-r--r--cpp/src/Ice/ConnectionI.h25
-rw-r--r--cpp/src/Ice/ConnectionRequestHandler.cpp14
-rw-r--r--cpp/src/Ice/ConnectionRequestHandler.h8
-rw-r--r--cpp/src/Ice/Exception.cpp7
-rw-r--r--cpp/src/Ice/Makefile1
-rw-r--r--cpp/src/Ice/Makefile.mak1
-rw-r--r--cpp/src/Ice/ObjectAdapterFactory.cpp2
-rw-r--r--cpp/src/Ice/ObjectAdapterFactory.h2
-rw-r--r--cpp/src/Ice/ObjectAdapterI.cpp2
-rw-r--r--cpp/src/Ice/ObjectAdapterI.h2
-rw-r--r--cpp/src/Ice/Outgoing.cpp232
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp1268
-rw-r--r--cpp/src/Ice/Proxy.cpp94
-rw-r--r--cpp/src/Ice/ProxyFactory.cpp2
-rw-r--r--cpp/src/Ice/RequestHandler.cpp5
-rw-r--r--cpp/src/Ice/RequestHandler.h19
-rw-r--r--cpp/src/Ice/RetryQueue.cpp58
-rw-r--r--cpp/src/Ice/RetryQueue.h15
-rw-r--r--cpp/src/Ice/winrt/Makefile.mak3
-rw-r--r--cpp/src/IceDiscovery/LocatorI.h2
-rw-r--r--cpp/src/IceDiscovery/LookupI.h1
-rw-r--r--cpp/src/IceGrid/AdapterCache.h2
-rw-r--r--cpp/src/slice2cpp/Gen.cpp20
-rw-r--r--cpp/src/slice2cs/Gen.cpp8
-rw-r--r--cpp/src/slice2java/Gen.cpp8
34 files changed, 1443 insertions, 1347 deletions
diff --git a/cpp/src/Ice/AsyncResult.cpp b/cpp/src/Ice/AsyncResult.cpp
new file mode 100644
index 00000000000..03abc55e020
--- /dev/null
+++ b/cpp/src/Ice/AsyncResult.cpp
@@ -0,0 +1,602 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceUtil/DisableWarnings.h>
+#include <Ice/AsyncResult.h>
+#include <Ice/ThreadPool.h>
+#include <Ice/Instance.h>
+#include <Ice/LoggerUtil.h>
+#include <Ice/Properties.h>
+#include <Ice/RequestHandler.h>
+#include <Ice/OutgoingAsync.h>
+
+using namespace std;
+using namespace Ice;
+using namespace IceInternal;
+
+IceUtil::Shared* Ice::upCast(AsyncResult* p) { return p; }
+
+const unsigned char Ice::AsyncResult::OK = 0x1;
+const unsigned char Ice::AsyncResult::Done = 0x2;
+const unsigned char Ice::AsyncResult::Sent = 0x4;
+const unsigned char Ice::AsyncResult::EndCalled = 0x8;
+
+void
+AsyncResult::cancel()
+{
+ cancel(InvocationCanceledException(__FILE__, __LINE__));
+}
+
+Int
+AsyncResult::getHash() const
+{
+ return static_cast<Int>(reinterpret_cast<Long>(this) >> 4);
+}
+
+CommunicatorPtr
+AsyncResult::getCommunicator() const
+{
+ return _communicator;
+}
+
+ConnectionPtr
+AsyncResult::getConnection() const
+{
+ return 0;
+}
+
+ObjectPrx
+AsyncResult::getProxy() const
+{
+ return 0;
+}
+
+bool
+AsyncResult::isCompleted() const
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ return _state & Done;
+}
+
+void
+AsyncResult::waitForCompleted()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(!(_state & Done))
+ {
+ _monitor.wait();
+ }
+}
+
+bool
+AsyncResult::isSent() const
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ return _state & Sent;
+}
+
+void
+AsyncResult::waitForSent()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(!(_state & Sent) && !_exception.get())
+ {
+ _monitor.wait();
+ }
+}
+
+void
+AsyncResult::throwLocalException() const
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ if(_exception.get())
+ {
+ _exception.get()->ice_throw();
+ }
+}
+
+bool
+AsyncResult::sentSynchronously() const
+{
+ return _sentSynchronously;
+}
+
+LocalObjectPtr
+AsyncResult::getCookie() const
+{
+ return _cookie;
+}
+
+const std::string&
+AsyncResult::getOperation() const
+{
+ return _operation;
+}
+
+void
+AsyncResult::__throwUserException()
+{
+ try
+ {
+ _is.startReadEncaps();
+ _is.throwException();
+ }
+ catch(const Ice::UserException&)
+ {
+ _is.endReadEncaps();
+ throw;
+ }
+}
+
+bool
+AsyncResult::__wait()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ if(_state & EndCalled)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once");
+ }
+ _state |= EndCalled;
+ while(!(_state & Done))
+ {
+ _monitor.wait();
+ }
+ if(_exception.get())
+ {
+ _exception.get()->ice_throw();
+ }
+ return _state & OK;
+}
+
+void
+AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation)
+{
+ __check(r, operation);
+ if(r->getProxy().get() != prx)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Proxy for call to end_" + operation +
+ " does not match proxy that was used to call corresponding begin_" +
+ operation + " method");
+ }
+}
+
+void
+AsyncResult::__check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation)
+{
+ __check(r, operation);
+ if(r->getCommunicator().get() != com)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Communicator for call to end_" + operation +
+ " does not match communicator that was used to call corresponding " +
+ "begin_" + operation + " method");
+ }
+}
+
+void
+AsyncResult::__check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation)
+{
+ __check(r, operation);
+ if(r->getConnection().get() != con)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Connection for call to end_" + operation +
+ " does not match connection that was used to call corresponding " +
+ "begin_" + operation + " method");
+ }
+}
+
+void
+AsyncResult::__check(const AsyncResultPtr& r, const string& operation)
+{
+ if(!r)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "AsyncResult == null");
+ }
+ else if(&r->_operation != &operation)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Incorrect operation for end_" + operation +
+ " method: " + r->_operation);
+ }
+}
+
+AsyncResult::AsyncResult(const CommunicatorPtr& communicator,
+ const IceInternal::InstancePtr& instance,
+ const string& op,
+ const CallbackBasePtr& del,
+ const LocalObjectPtr& cookie) :
+ _instance(instance),
+ _sentSynchronously(false),
+ _is(instance.get(), Ice::currentProtocolEncoding),
+ _communicator(communicator),
+ _operation(op),
+ _callback(del),
+ _cookie(cookie),
+ _state(0)
+{
+ if(!_callback)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__);
+ }
+ const_cast<CallbackBasePtr&>(_callback) = _callback->verify(_cookie);
+}
+
+AsyncResult::~AsyncResult()
+{
+}
+
+bool
+AsyncResult::sent(bool done)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ assert(!_exception.get());
+
+ bool alreadySent = _state & Sent;
+ _state |= Sent;
+ if(done)
+ {
+ _state |= Done | OK;
+ _cancellationHandler = 0;
+ if(!_callback || !_callback->hasSentCallback())
+ {
+ _observer.detach();
+ }
+ }
+
+ _monitor.notifyAll();
+ return !alreadySent && _callback && _callback->hasSentCallback();
+}
+
+bool
+AsyncResult::finished(bool ok)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _state |= Done;
+ if(ok)
+ {
+ _state |= OK;
+ }
+ _cancellationHandler = 0;
+ if(!_callback)
+ {
+ _observer.detach();
+ }
+ _monitor.notifyAll();
+ return _callback;
+}
+
+bool
+AsyncResult::finished(const Ice::Exception& ex)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _state |= Done;
+ _exception.reset(ex.ice_clone());
+ _cancellationHandler = 0;
+ _observer.failed(ex.ice_name());
+ if(!_callback)
+ {
+ _observer.detach();
+ }
+ _monitor.notifyAll();
+ return _callback;
+}
+
+void
+AsyncResult::invokeSentAsync()
+{
+ class AsynchronousSent : public DispatchWorkItem
+ {
+ public:
+
+ AsynchronousSent(const ConnectionPtr& connection, const AsyncResultPtr& result) :
+ DispatchWorkItem(connection), _result(result)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ _result->invokeSent();
+ }
+
+ private:
+
+ const AsyncResultPtr _result;
+ };
+
+ //
+ // This is called when it's not safe to call the sent callback
+ // synchronously from this thread. Instead the exception callback
+ // is called asynchronously from the client thread pool.
+ //
+ try
+ {
+ _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, this));
+ }
+ catch(const Ice::CommunicatorDestroyedException&)
+ {
+ }
+}
+
+void
+AsyncResult::invokeCompletedAsync()
+{
+ class AsynchronousCompleted : public DispatchWorkItem
+ {
+ public:
+
+ AsynchronousCompleted(const ConnectionPtr& connection, const AsyncResultPtr& result) :
+ DispatchWorkItem(connection), _result(result)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ _result->invokeCompleted();
+ }
+
+ private:
+
+ const AsyncResultPtr _result;
+ };
+
+ //
+ // CommunicatorDestroyedCompleted is the only exception that can propagate directly
+ // from this method.
+ //
+ _instance->clientThreadPool()->dispatch(new AsynchronousCompleted(_cachedConnection, this));
+}
+
+void
+AsyncResult::invokeSent()
+{
+ assert(_callback);
+
+ try
+ {
+ AsyncResultPtr self(this);
+ _callback->sent(self);
+ }
+ catch(const std::exception& ex)
+ {
+ warning(ex);
+ }
+ catch(...)
+ {
+ warning();
+ }
+
+ if(_observer)
+ {
+ ObjectPrx proxy = getProxy();
+ if(!proxy || !proxy->ice_isTwoway())
+ {
+ _observer.detach();
+ }
+ }
+}
+
+void
+AsyncResult::invokeCompleted()
+{
+ assert(_callback);
+
+ try
+ {
+ AsyncResultPtr self(this);
+ _callback->completed(self);
+ }
+ catch(const std::exception& ex)
+ {
+ warning(ex);
+ }
+ catch(...)
+ {
+ warning();
+ }
+
+ _observer.detach();
+}
+
+void
+AsyncResult::cancel(const Ice::LocalException& ex)
+{
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _cancellationException.reset(ex.ice_clone());
+ if(!_cancellationHandler)
+ {
+ return;
+ }
+ }
+ _cancellationHandler->asyncRequestCanceled(OutgoingAsyncBasePtr::dynamicCast(this), ex);
+}
+
+void
+AsyncResult::cancelable(const CancellationHandlerPtr& handler)
+{
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ if(!_cancellationException.get())
+ {
+ _cancellationHandler = handler;
+ return;
+ }
+ }
+ handler->asyncRequestCanceled(OutgoingAsyncBasePtr::dynamicCast(this), *_cancellationException.get());
+}
+
+void
+AsyncResult::checkCanceled()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ if(_cancellationException.get())
+ {
+ _cancellationException->ice_throw();
+ }
+}
+
+void
+AsyncResult::warning(const std::exception& exc) const
+{
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ Warning out(_instance->initializationData().logger);
+ const Exception* ex = dynamic_cast<const Exception*>(&exc);
+ if(ex)
+ {
+ out << "Ice::Exception raised by AMI callback:\n" << *ex;
+ }
+ else
+ {
+ out << "std::exception raised by AMI callback:\n" << exc.what();
+ }
+ }
+}
+
+void
+AsyncResult::warning() const
+{
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "unknown exception raised by AMI callback";
+ }
+}
+
+
+namespace
+{
+
+//
+// Dummy class derived from CallbackBase
+// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn,
+// this allows us to test whether the user supplied a null delegate instance to the
+// generated begin_ method without having to generate a separate test to throw IllegalArgumentException
+// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated
+// object code.
+//
+class DummyCallback : public CallbackBase
+{
+public:
+
+ DummyCallback()
+ {
+ }
+
+ virtual void
+ completed(const Ice::AsyncResultPtr&) const
+ {
+ assert(false);
+ }
+
+ virtual CallbackBasePtr
+ verify(const Ice::LocalObjectPtr&)
+ {
+ //
+ // Called by the AsyncResult constructor to verify the delegate. The dummy
+ // delegate is passed when the user used a begin_ method without delegate.
+ // By returning 0 here, we tell the AsyncResult that no delegates was
+ // provided.
+ //
+ return 0;
+ }
+
+ virtual void
+ sent(const AsyncResultPtr&) const
+ {
+ assert(false);
+ }
+
+ virtual bool
+ hasSentCallback() const
+ {
+ assert(false);
+ return false;
+ }
+};
+
+}
+
+//
+// This gives a pointer value to compare against in the generated
+// begin_ method to decide whether the caller passed a null pointer
+// versus the generated inline version of the begin_ method having
+// passed a pointer to the dummy delegate.
+//
+CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback;
+
+#ifdef ICE_CPP11
+
+Ice::CallbackPtr
+Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& completed,
+ const ::IceInternal::Function<void (const AsyncResultPtr&)>& sent)
+{
+ class Cpp11CB : public GenericCallbackBase
+ {
+ public:
+
+ Cpp11CB(const ::std::function<void (const AsyncResultPtr&)>& completed,
+ const ::std::function<void (const AsyncResultPtr&)>& sent) :
+ _completed(completed),
+ _sent(sent)
+ {
+ checkCallback(true, completed != nullptr);
+ }
+
+ virtual void
+ completed(const AsyncResultPtr& result) const
+ {
+ _completed(result);
+ }
+
+ virtual CallbackBasePtr
+ verify(const LocalObjectPtr&)
+ {
+ return this; // Nothing to do, the cookie is not type-safe.
+ }
+
+ virtual void
+ sent(const AsyncResultPtr& result) const
+ {
+ if(_sent != nullptr)
+ {
+ _sent(result);
+ }
+ }
+
+ virtual bool
+ hasSentCallback() const
+ {
+ return _sent != nullptr;
+ }
+
+ private:
+
+ ::std::function< void (const AsyncResultPtr&)> _completed;
+ ::std::function< void (const AsyncResultPtr&)> _sent;
+ };
+
+ return new Cpp11CB(completed, sent);
+}
+#endif
+
+void
+IceInternal::CallbackBase::checkCallback(bool obj, bool cb)
+{
+ if(!obj)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback object cannot be null");
+ }
+ if(!cb)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback cannot be null");
+ }
+}
+
+
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index 0b08198726a..3fc321735cb 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -14,6 +14,8 @@
#include <Ice/Reference.h>
#include <Ice/Instance.h>
#include <Ice/TraceLevels.h>
+#include <Ice/Outgoing.h>
+#include <Ice/OutgoingAsync.h>
#include <Ice/TraceUtil.h>
@@ -28,7 +30,7 @@ class InvokeAll : public DispatchWorkItem
{
public:
- InvokeAll(OutgoingMessageCallback* out,
+ InvokeAll(OutgoingBase* out,
BasicStream* os,
CollocatedRequestHandler* handler,
Int requestId,
@@ -49,7 +51,7 @@ public:
private:
- OutgoingMessageCallback* _out;
+ OutgoingBase* _out;
BasicStream* _os;
CollocatedRequestHandlerPtr _handler;
Int _requestId;
@@ -61,7 +63,7 @@ class InvokeAllAsync : public DispatchWorkItem
{
public:
- InvokeAllAsync(const OutgoingAsyncMessageCallbackPtr& outAsync,
+ InvokeAllAsync(const OutgoingAsyncBasePtr& outAsync,
BasicStream* os,
CollocatedRequestHandler* handler,
Int requestId,
@@ -82,7 +84,7 @@ public:
private:
- OutgoingAsyncMessageCallbackPtr _outAsync;
+ OutgoingAsyncBasePtr _outAsync;
BasicStream* _os;
CollocatedRequestHandlerPtr _handler;
Int _requestId;
@@ -113,7 +115,7 @@ public:
private:
const CollocatedRequestHandlerPtr _handler;
- const OutgoingAsyncMessageCallbackPtr _outAsync;
+ const OutgoingAsyncBasePtr _outAsync;
BasicStream _stream;
Int _invokeNum;
};
@@ -261,24 +263,24 @@ CollocatedRequestHandler::abortBatchRequest()
}
bool
-CollocatedRequestHandler::sendRequest(OutgoingMessageCallback* out)
+CollocatedRequestHandler::sendRequest(OutgoingBase* out)
{
out->invokeCollocated(this);
return !_response && _reference->getInvocationTimeout() == 0;
}
AsyncStatus
-CollocatedRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& outAsync)
+CollocatedRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& outAsync)
{
- return outAsync->__invokeCollocated(this);
+ return outAsync->invokeCollocated(this);
}
void
-CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
+CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalException& ex)
{
Lock sync(*this);
- map<OutgoingMessageCallback*, Int>::iterator p = _sendRequests.find(out);
+ map<OutgoingBase*, Int>::iterator p = _sendRequests.find(out);
if(p != _sendRequests.end())
{
if(p->second > 0)
@@ -286,7 +288,7 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
_requests.erase(p->second);
}
InvocationTimeoutException ex(__FILE__, __LINE__);
- out->finished(ex);
+ out->completed(ex);
_sendRequests.erase(p);
return;
}
@@ -299,7 +301,7 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
if(q->second == o)
{
InvocationTimeoutException ex(__FILE__, __LINE__);
- o->finished(ex);
+ o->completed(ex);
_requests.erase(q);
return; // We're done.
}
@@ -307,12 +309,12 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
}
}
-void
-CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
+void
+CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex)
{
Lock sync(*this);
-
- map<OutgoingAsyncMessageCallbackPtr, Int>::iterator p = _sendAsyncRequests.find(outAsync);
+
+ map<OutgoingAsyncBasePtr, Int>::iterator p = _sendAsyncRequests.find(outAsync);
if(p != _sendAsyncRequests.end())
{
if(p->second > 0)
@@ -320,7 +322,10 @@ CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbac
_asyncRequests.erase(p->second);
}
_sendAsyncRequests.erase(p);
- outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0);
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompletedAsync();
+ }
return;
}
@@ -332,7 +337,10 @@ CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbac
if(q->second.get() == o.get())
{
_asyncRequests.erase(q);
- outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0);
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompletedAsync();
+ }
return;
}
}
@@ -391,16 +399,17 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync)
{
_sendAsyncRequests.insert(make_pair(outAsync, requestId));
}
+ outAsync->cancelable(this);
}
- outAsync->__attachCollocatedObserver(_adapter, requestId);
+ outAsync->attachCollocatedObserver(_adapter, requestId);
- _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, requestId, 1, false));
+ _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, requestId, 1, false));
return AsyncStatusQueued;
}
void
-CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out)
+CollocatedRequestHandler::invokeBatchRequests(OutgoingBase* out)
{
Int invokeNum;
{
@@ -457,7 +466,7 @@ CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out)
}
AsyncStatus
-CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync)
+CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync)
{
Int invokeNum;
{
@@ -473,10 +482,12 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync)
if(_reference->getInvocationTimeout() > 0)
{
_sendAsyncRequests.insert(make_pair(outAsync, 0));
+
+ outAsync->cancelable(this);
}
assert(!_batchStream.b.empty());
- _batchStream.swap(*outAsync->__getOs());
+ _batchStream.swap(*outAsync->getOs());
//
// Reset the batch stream.
@@ -488,14 +499,14 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync)
}
}
- outAsync->__attachCollocatedObserver(_adapter, 0);
+ outAsync->attachCollocatedObserver(_adapter, 0);
if(invokeNum > 0)
{
- _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, 0, invokeNum,true));
+ _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, 0, invokeNum,true));
return AsyncStatusQueued;
}
- else if(outAsync->__sent())
+ else if(outAsync->sent())
{
return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
}
@@ -512,7 +523,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte)
{
Lock sync(*this);
assert(_response);
-
+
os->i = os->b.begin() + sizeof(replyHdr) + 4;
if(_traceLevels->protocol >= 1)
@@ -524,7 +535,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte)
map<int, Outgoing*>::iterator p = _requests.find(requestId);
if(p != _requests.end())
{
- p->second->finished(*os);
+ p->second->completed(*os);
_requests.erase(p);
}
else
@@ -532,17 +543,21 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte)
map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId);
if(q != _asyncRequests.end())
{
- os->swap(*q->second->__getIs());
- outAsync = q->second;
+ os->swap(*q->second->getIs());
+ if(q->second->completed())
+ {
+ outAsync = q->second;
+ }
_asyncRequests.erase(q);
}
}
}
- if(outAsync && outAsync->__finished())
+ if(outAsync)
{
- outAsync->__invokeCompleted();
+ outAsync->invokeCompleted();
}
+
_adapter->decDirectCount();
}
@@ -563,12 +578,7 @@ CollocatedRequestHandler::systemException(Int requestId, const SystemException&
void
CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum)
{
- if(requestId > 0)
- {
- Lock sync(*this);
- _requests.erase(requestId);
- _asyncRequests.erase(requestId);
- }
+ handleException(requestId, ex);
_adapter->decDirectCount();
}
@@ -585,7 +595,7 @@ CollocatedRequestHandler::waitForConnection()
}
bool
-CollocatedRequestHandler::sent(OutgoingMessageCallback* out)
+CollocatedRequestHandler::sent(OutgoingBase* out)
{
if(_reference->getInvocationTimeout() > 0)
{
@@ -600,7 +610,7 @@ CollocatedRequestHandler::sent(OutgoingMessageCallback* out)
}
bool
-CollocatedRequestHandler::sentAsync(OutgoingAsyncMessageCallback* outAsync)
+CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync)
{
if(_reference->getInvocationTimeout() > 0)
{
@@ -610,9 +620,9 @@ CollocatedRequestHandler::sentAsync(OutgoingAsyncMessageCallback* outAsync)
return false; // The request timed-out.
}
}
- if(outAsync->__sent())
+ if(outAsync->sent())
{
- outAsync->__invokeSent();
+ outAsync->invokeSent();
}
return true;
}
@@ -684,7 +694,7 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex)
map<int, Outgoing*>::iterator p = _requests.find(requestId);
if(p != _requests.end())
{
- p->second->finished(ex);
+ p->second->completed(ex);
_requests.erase(p);
}
else
@@ -692,13 +702,17 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex)
map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId);
if(q != _asyncRequests.end())
{
- outAsync = q->second;
+ if(q->second->completed(ex))
+ {
+ outAsync = q->second;
+ }
_asyncRequests.erase(q);
}
}
}
+
if(outAsync)
{
- outAsync->__finished(ex);
+ outAsync->invokeCompleted();
}
}
diff --git a/cpp/src/Ice/CollocatedRequestHandler.h b/cpp/src/Ice/CollocatedRequestHandler.h
index a3ac1387045..3930c12ce1c 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.h
+++ b/cpp/src/Ice/CollocatedRequestHandler.h
@@ -31,10 +31,10 @@ typedef IceUtil::Handle<ObjectAdapterI> ObjectAdapterIPtr;
namespace IceInternal
{
+class OutgoingBase;
class Outgoing;
-class BatchOutgoing;
+class OutgoingAsyncBase;
class OutgoingAsync;
-class BatchOutgoingAsync;
class CollocatedRequestHandler : public RequestHandler, public ResponseHandler, private IceUtil::Monitor<IceUtil::Mutex>
{
@@ -50,11 +50,11 @@ public:
virtual void finishBatchRequest(BasicStream*);
virtual void abortBatchRequest();
- virtual bool sendRequest(OutgoingMessageCallback*);
- virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr&);
+ virtual bool sendRequest(OutgoingBase*);
+ virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncBasePtr&);
- virtual void requestTimedOut(OutgoingMessageCallback*);
- virtual void asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr&);
+ virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&);
+ virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&);
virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte);
virtual void sendNoResponse();
@@ -68,11 +68,11 @@ public:
void invokeRequest(Outgoing*);
AsyncStatus invokeAsyncRequest(OutgoingAsync*);
- void invokeBatchRequests(BatchOutgoing*);
- AsyncStatus invokeAsyncBatchRequests(BatchOutgoingAsync*);
+ void invokeBatchRequests(OutgoingBase*);
+ AsyncStatus invokeAsyncBatchRequests(OutgoingAsyncBase*);
- bool sent(OutgoingMessageCallback*);
- bool sentAsync(OutgoingAsyncMessageCallback*);
+ bool sent(OutgoingBase*);
+ bool sentAsync(OutgoingAsyncBase*);
void invokeAll(BasicStream*, Ice::Int, Ice::Int, bool);
@@ -88,8 +88,8 @@ private:
int _requestId;
- std::map<OutgoingMessageCallback*, Ice::Int> _sendRequests;
- std::map<OutgoingAsyncMessageCallbackPtr, Ice::Int> _sendAsyncRequests;
+ std::map<OutgoingBase*, Ice::Int> _sendRequests;
+ std::map<OutgoingAsyncBasePtr, Ice::Int> _sendAsyncRequests;
std::map<Ice::Int, Outgoing*> _requests;
std::map<Ice::Int, OutgoingAsyncPtr> _asyncRequests;
diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp
index 9b7ce8b8cd8..a7c2aee35d2 100644
--- a/cpp/src/Ice/CommunicatorI.cpp
+++ b/cpp/src/Ice/CommunicatorI.cpp
@@ -21,6 +21,7 @@
#include <Ice/DefaultsAndOverrides.h>
#include <Ice/TraceLevels.h>
#include <Ice/Router.h>
+#include <Ice/OutgoingAsync.h>
#include <IceUtil/Mutex.h>
#include <IceUtil/MutexPtrLock.h>
#include <IceUtil/UUID.h>
@@ -198,8 +199,7 @@ Ice::CommunicatorI::getPluginManager() const
void
Ice::CommunicatorI::flushBatchRequests()
{
- AsyncResultPtr r = begin_flushBatchRequests();
- end_flushBatchRequests(r);
+ end_flushBatchRequests(begin_flushBatchRequests());
}
AsyncResultPtr
@@ -223,9 +223,8 @@ Ice::CommunicatorI::begin_flushBatchRequests(const Callback_Communicator_flushBa
#ifdef ICE_CPP11
AsyncResultPtr
-Ice::CommunicatorI::begin_flushBatchRequests(
- const IceInternal::Function<void (const Exception&)>& exception,
- const IceInternal::Function<void (bool)>& sent)
+Ice::CommunicatorI::begin_flushBatchRequests(const IceInternal::Function<void (const Exception&)>& exception,
+ const IceInternal::Function<void (bool)>& sent)
{
class Cpp11CB : public IceInternal::Cpp11FnCallbackNC
{
@@ -268,8 +267,7 @@ const ::std::string __flushBatchRequests_name = "flushBatchRequests";
}
AsyncResultPtr
-Ice::CommunicatorI::__begin_flushBatchRequests(const IceInternal::CallbackBasePtr& cb,
- const LocalObjectPtr& cookie)
+Ice::CommunicatorI::__begin_flushBatchRequests(const IceInternal::CallbackBasePtr& cb, const LocalObjectPtr& cookie)
{
OutgoingConnectionFactoryPtr connectionFactory = _instance->outgoingConnectionFactory();
ObjectAdapterFactoryPtr adapterFactory = _instance->objectAdapterFactory();
@@ -278,8 +276,11 @@ Ice::CommunicatorI::__begin_flushBatchRequests(const IceInternal::CallbackBasePt
// This callback object receives the results of all invocations
// of Connection::begin_flushBatchRequests.
//
- CommunicatorBatchOutgoingAsyncPtr result =
- new CommunicatorBatchOutgoingAsync(this, _instance, __flushBatchRequests_name, cb, cookie);
+ CommunicatorFlushBatchPtr result = new CommunicatorFlushBatch(this,
+ _instance,
+ __flushBatchRequests_name,
+ cb,
+ cookie);
connectionFactory->flushAsyncBatchRequests(result);
adapterFactory->flushAsyncBatchRequests(result);
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp
index 5a9a84c83b1..7d7f4a9291c 100644
--- a/cpp/src/Ice/ConnectRequestHandler.cpp
+++ b/cpp/src/Ice/ConnectRequestHandler.cpp
@@ -49,7 +49,7 @@ class FlushSentRequests : public DispatchWorkItem
{
public:
- FlushSentRequests(const Ice::ConnectionPtr& connection, const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) :
+ FlushSentRequests(const Ice::ConnectionPtr& connection, const vector<OutgoingAsyncBasePtr>& callbacks) :
DispatchWorkItem(connection), _callbacks(callbacks)
{
}
@@ -57,15 +57,15 @@ public:
virtual void
run()
{
- for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = _callbacks.begin(); p != _callbacks.end(); ++p)
+ for(vector<OutgoingAsyncBasePtr>::const_iterator p = _callbacks.begin(); p != _callbacks.end(); ++p)
{
- (*p)->__invokeSent();
+ (*p)->invokeSent();
}
}
private:
- vector<OutgoingAsyncMessageCallbackPtr> _callbacks;
+ vector<OutgoingAsyncBasePtr> _callbacks;
};
};
@@ -202,7 +202,7 @@ ConnectRequestHandler::abortBatchRequest()
}
bool
-ConnectRequestHandler::sendRequest(OutgoingMessageCallback* out)
+ConnectRequestHandler::sendRequest(OutgoingBase* out)
{
{
Lock sync(*this);
@@ -225,7 +225,7 @@ ConnectRequestHandler::sendRequest(OutgoingMessageCallback* out)
}
AsyncStatus
-ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& out)
+ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out)
{
{
Lock sync(*this);
@@ -236,6 +236,7 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& o
Request req;
req.outAsync = out;
_requests.push_back(req);
+ out->cancelable(this);
return AsyncStatusQueued;
}
}
@@ -244,11 +245,11 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& o
throw RetryException(ex);
}
}
- return out->__send(_connection, _compress, _response);
+ return out->send(_connection, _compress, _response);
}
void
-ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
+ConnectRequestHandler::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex)
{
{
Lock sync(*this);
@@ -263,8 +264,7 @@ ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
{
if(p->out == out)
{
- Ice::InvocationTimeoutException ex(__FILE__, __LINE__);
- out->finished(ex);
+ out->completed(ex);
_requests.erase(p);
return;
}
@@ -272,11 +272,11 @@ ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
assert(false); // The request has to be queued if it timed out and we're not initialized yet.
}
}
- _connection->requestTimedOut(out);
+ _connection->requestCanceled(out, ex);
}
void
-ConnectRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
+ConnectRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex)
{
{
Lock sync(*this);
@@ -292,14 +292,16 @@ ConnectRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPt
if(p->outAsync.get() == outAsync.get())
{
_requests.erase(p);
- outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0);
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompletedAsync();
+ }
return;
}
}
- assert(false); // The request has to be queued if it timed out and we're not initialized yet.
}
}
- _connection->asyncRequestTimedOut(outAsync);
+ _connection->asyncRequestCanceled(outAsync, ex);
}
Ice::ConnectionIPtr
@@ -451,7 +453,7 @@ ConnectRequestHandler::flushRequests()
_flushing = true;
}
- vector<OutgoingAsyncMessageCallbackPtr> sentCallbacks;
+ vector<OutgoingAsyncBasePtr> sentCallbacks;
try
{
while(!_requests.empty()) // _requests is immutable when _flushing = true
@@ -463,7 +465,7 @@ ConnectRequestHandler::flushRequests()
}
else if(req.outAsync)
{
- if(req.outAsync->__send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback)
+ if(req.outAsync->send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback)
{
sentCallbacks.push_back(req.outAsync);
}
@@ -551,11 +553,14 @@ ConnectRequestHandler::flushRequestsWithException()
{
if(p->out)
{
- p->out->finished(*_exception.get());
+ p->out->completed(*_exception.get());
}
else if(p->outAsync)
{
- p->outAsync->__finished(*_exception.get());
+ if(p->outAsync->completed(*_exception.get()))
+ {
+ p->outAsync->invokeCompleted();
+ }
}
else
{
diff --git a/cpp/src/Ice/ConnectRequestHandler.h b/cpp/src/Ice/ConnectRequestHandler.h
index 095edda6123..c5bc6602766 100644
--- a/cpp/src/Ice/ConnectRequestHandler.h
+++ b/cpp/src/Ice/ConnectRequestHandler.h
@@ -42,11 +42,11 @@ public:
virtual void finishBatchRequest(BasicStream*);
virtual void abortBatchRequest();
- virtual bool sendRequest(OutgoingMessageCallback*);
- virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr&);
+ virtual bool sendRequest(OutgoingBase*);
+ virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncBasePtr&);
- virtual void requestTimedOut(OutgoingMessageCallback*);
- virtual void asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr&);
+ virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&);
+ virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&);
virtual Ice::ConnectionIPtr getConnection();
virtual Ice::ConnectionIPtr waitForConnection();
@@ -69,8 +69,8 @@ private:
{
}
- OutgoingMessageCallback* out;
- OutgoingAsyncMessageCallbackPtr outAsync;
+ OutgoingBase* out;
+ OutgoingAsyncBasePtr outAsync;
BasicStream* os;
};
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index f859ac3fa58..f1fc0380727 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -24,6 +24,7 @@
#include <Ice/RouterInfo.h>
#include <Ice/LocalException.h>
#include <Ice/Functional.h>
+#include <Ice/OutgoingAsync.h>
#include <IceUtil/Random.h>
#include <iterator>
@@ -432,7 +433,7 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad
}
void
-IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync)
+IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchPtr& outAsync)
{
list<ConnectionIPtr> c;
@@ -1357,7 +1358,7 @@ IceInternal::IncomingConnectionFactory::connections() const
}
void
-IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync)
+IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchPtr& outAsync)
{
list<ConnectionIPtr> c = connections(); // connections() is synchronized, so no need to synchronize here.
diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h
index bd0bbe30804..92603a55b04 100644
--- a/cpp/src/Ice/ConnectionFactory.h
+++ b/cpp/src/Ice/ConnectionFactory.h
@@ -66,7 +66,7 @@ public:
const CreateConnectionCallbackPtr&);
void setRouterInfo(const RouterInfoPtr&);
void removeAdapter(const Ice::ObjectAdapterPtr&);
- void flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr&);
+ void flushAsyncBatchRequests(const CommunicatorFlushBatchPtr&);
private:
@@ -178,7 +178,7 @@ public:
EndpointIPtr endpoint() const;
std::list<Ice::ConnectionIPtr> connections() const;
- void flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr&);
+ void flushAsyncBatchRequests(const CommunicatorFlushBatchPtr&);
//
// Operations from EventHandler
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 1bdbeb12448..92dc4a1693d 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -242,7 +242,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
}
void
-Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream)
+Ice::ConnectionI::OutgoingMessage::canceled(bool adoptStream)
{
assert((out || outAsync)); // Only requests can timeout.
out = 0;
@@ -253,7 +253,7 @@ Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream)
}
else
{
- assert(!adopted && !stream);
+ assert(!adopted);
}
}
@@ -273,25 +273,28 @@ Ice::ConnectionI::OutgoingMessage::sent()
else if(outAsync)
{
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- invokeSent = outAsync->__sent();
+ invokeSent = outAsync->sent();
return invokeSent || receivedReply;
#else
- return outAsync->__sent();
+ return outAsync->sent();
#endif
}
return false;
}
void
-Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex)
+Ice::ConnectionI::OutgoingMessage::completed(const Ice::LocalException& ex)
{
if(out)
{
- out->finished(ex);
+ out->completed(ex);
}
else if(outAsync)
{
- outAsync->__finished(ex);
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompleted();
+ }
}
if(adopted)
@@ -651,8 +654,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
#endif
}
- out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
- static_cast<Int>(os->b.size() - headerSize - 4));
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
//
// Send the message. If it can't be sent without blocking the message is added
@@ -685,7 +687,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
AsyncStatus
Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response)
{
- BasicStream* os = out->__getOs();
+ BasicStream* os = out->getOs();
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_exception.get())
@@ -731,8 +733,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
#endif
}
- out->__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
- static_cast<Int>(os->b.size() - headerSize - 4));
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
AsyncStatus status = AsyncStatusQueued;
try
@@ -747,6 +748,11 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
_exception->ice_throw();
}
+ if(response || status & AsyncStatusQueued)
+ {
+ out->cancelable(this); // Notify the request that it's cancelable
+ }
+
if(response)
{
//
@@ -961,7 +967,7 @@ Ice::ConnectionI::abortBatchRequest()
void
Ice::ConnectionI::flushBatchRequests()
{
- BatchOutgoing out(this, _instance.get(), __flushBatchRequests_name);
+ FlushBatch out(this, _instance.get(), __flushBatchRequests_name);
out.invoke();
}
@@ -986,9 +992,8 @@ Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchR
#ifdef ICE_CPP11
AsyncResultPtr
-Ice::ConnectionI::begin_flushBatchRequests(
- const IceInternal::Function<void (const Exception&)>& exception,
- const IceInternal::Function<void (bool)>& sent)
+Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (const Exception&)>& exception,
+ const IceInternal::Function<void (bool)>& sent)
{
class Cpp11CB : public IceInternal::Cpp11FnCallbackNC
@@ -1026,16 +1031,13 @@ Ice::ConnectionI::begin_flushBatchRequests(
AsyncResultPtr
Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
{
- ConnectionBatchOutgoingAsyncPtr result =
- new ConnectionBatchOutgoingAsync(this, _communicator, _instance, __flushBatchRequests_name, cb, cookie);
- try
- {
- result->__invoke();
- }
- catch(const LocalException& __ex)
- {
- result->__invokeExceptionAsync(__ex);
- }
+ ConnectionFlushBatchPtr result = new ConnectionFlushBatch(this,
+ _communicator,
+ _instance,
+ __flushBatchRequests_name,
+ cb,
+ cookie);
+ result->invoke();
return result;
}
@@ -1047,7 +1049,7 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r)
}
bool
-Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
+Ice::ConnectionI::flushBatchRequests(OutgoingBase* out)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
while(_batchStreamInUse && !_exception.get())
@@ -1075,12 +1077,10 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
#else
copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
#endif
-
- out->attachRemoteObserver(initConnectionInfo(), _endpoint,
- static_cast<Int>(_batchStream.b.size() - headerSize - 4));
-
_batchStream.swap(*out->os());
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint, 0);
+
//
// Send the batch stream.
//
@@ -1109,7 +1109,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
}
AsyncStatus
-Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
+Ice::ConnectionI::flushAsyncBatchRequests(const OutgoingAsyncBasePtr& outAsync)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
while(_batchStreamInUse && !_exception.get())
@@ -1125,7 +1125,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
if(_batchRequestNum == 0)
{
AsyncStatus status = AsyncStatusSent;
- if(outAsync->__sent())
+ if(outAsync->sent())
{
status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
}
@@ -1141,11 +1141,9 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
#else
copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
#endif
+ _batchStream.swap(*outAsync->getOs());
- outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint, 0,
- static_cast<Int>(_batchStream.b.size() - headerSize - 4));
-
- _batchStream.swap(*outAsync->__getOs());
+ outAsync->attachRemoteObserver(initConnectionInfo(), _endpoint, 0);
//
// Send the batch stream.
@@ -1153,7 +1151,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
AsyncStatus status = AsyncStatusQueued;
try
{
- OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, 0);
+ OutgoingMessage message(outAsync, outAsync->getOs(), _batchRequestCompress, 0);
status = sendMessage(message);
}
catch(const Ice::LocalException& ex)
@@ -1163,6 +1161,11 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
_exception->ice_throw();
}
+ if(status & AsyncStatusQueued)
+ {
+ outAsync->cancelable(this); // Notify the request that it's cancelable.
+ }
+
//
// Reset the batch stream.
//
@@ -1276,9 +1279,14 @@ Ice::ConnectionI::getACM()
}
void
-Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
+Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state >= StateClosed)
+ {
+ return; // The request has already been or will be shortly notified of the failure.
+ }
+
for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
{
if(o->out == out)
@@ -1302,16 +1310,15 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
//
if(o == _sendStreams.begin())
{
- o->timedOut(true); // true = adopt the stream.
+ o->canceled(true); // true = adopt the stream.
}
else
{
- o->timedOut(false);
+ o->canceled(false);
_sendStreams.erase(o);
}
- InvocationTimeoutException ex(__FILE__, __LINE__);
- out->finished(ex);
+ out->completed(ex);
return;
}
}
@@ -1321,8 +1328,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
{
if(_requestsHint != _requests.end() && _requestsHint->second == o)
{
- InvocationTimeoutException ex(__FILE__, __LINE__);
- o->finished(ex);
+ o->completed(ex);
_requests.erase(_requestsHint);
_requestsHint = _requests.end();
}
@@ -1332,8 +1338,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
{
if(p->second == o)
{
- InvocationTimeoutException ex(__FILE__, __LINE__);
- o->finished(ex);
+ o->completed(ex);
assert(p != _requestsHint);
_requests.erase(p);
return; // We're done.
@@ -1344,10 +1349,18 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
}
void
-Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
+Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ //
+ // NOTE: This isn't called from a thread pool thread.
+ //
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_state >= StateClosed)
+ {
+ return; // The request has already been or will be shortly notified of the failure.
+ }
+
for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
{
if(o->outAsync.get() == outAsync.get())
@@ -1365,25 +1378,29 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou
_asyncRequests.erase(o->requestId);
}
}
-
+
//
// If the request is being sent, don't remove it from the send streams,
// it will be removed once the sending is finished.
//
if(o == _sendStreams.begin())
{
- o->timedOut(true); // true = adopt the stream
+ o->canceled(true); // true = adopt the stream
}
else
{
- o->timedOut(false);
+ o->canceled(false);
_sendStreams.erase(o);
}
- outAsync->__dispatchInvocationTimeout(_threadPool, this);
- return; // We're done
+ if(outAsync->completed(ex))
+ {
+ sync.release();
+ outAsync->invokeCompleted();
+ }
+ return;
}
}
-
+
OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync);
if(o)
{
@@ -1393,19 +1410,25 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou
{
_asyncRequests.erase(_asyncRequestsHint);
_asyncRequestsHint = _asyncRequests.end();
- outAsync->__dispatchInvocationTimeout(_threadPool, this);
- return; // We're done
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompletedAsync();
+ }
+ return;
}
}
-
+
for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p)
{
if(p->second.get() == o.get())
{
assert(p != _asyncRequestsHint);
_asyncRequests.erase(p);
- outAsync->__dispatchInvocationTimeout(_threadPool, this);
- return; // We're done
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompletedAsync();
+ }
+ return;
}
}
}
@@ -1972,18 +1995,18 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
if(p->invokeSent)
{
- p->outAsync->__invokeSent();
+ p->outAsync->invokeSent();
}
if(p->receivedReply)
{
OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync);
- if(outAsync->__finished())
+ if(outAsync->completed())
{
- outAsync->__invokeCompleted();
+ outAsync->invokeCompleted();
}
}
#else
- p->outAsync->__invokeSent();
+ p->outAsync->invokeSent();
#endif
}
++dispatchedCount;
@@ -1995,7 +2018,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
//
if(outAsync)
{
- outAsync->__invokeCompleted();
+ outAsync->invokeCompleted();
++dispatchedCount;
}
@@ -2147,14 +2170,14 @@ Ice::ConnectionI::finish()
{
if(message->sent() && message->invokeSent)
{
- message->outAsync->__invokeSent();
+ message->outAsync->invokeSent();
}
if(message->receivedReply)
{
OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync);
- if(outAsync->__finished())
+ if(outAsync->completed())
{
- outAsync->__invokeCompleted();
+ outAsync->invokeCompleted();
}
}
_sendStreams.pop_front();
@@ -2164,7 +2187,7 @@ Ice::ConnectionI::finish()
for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
{
- o->finished(*_exception.get());
+ o->completed(*_exception.get());
if(o->requestId) // Make sure finished isn't called twice.
{
if(o->out)
@@ -2182,13 +2205,16 @@ Ice::ConnectionI::finish()
for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
{
- p->second->finished(*_exception.get());
+ p->second->completed(*_exception.get());
}
_requests.clear();
for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
{
- q->second->__finished(*_exception.get());
+ if(q->second->completed(*_exception.get()))
+ {
+ q->second->invokeCompleted();
+ }
}
_asyncRequests.clear();
@@ -3481,7 +3507,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
if(p != _requests.end())
{
- p->second->finished(stream);
+ p->second->completed(stream);
if(p == _requestsHint)
{
@@ -3508,7 +3534,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
_asyncRequests.erase(q);
}
- stream.swap(*outAsync->__getIs());
+ stream.swap(*outAsync->getIs());
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
//
@@ -3522,7 +3548,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
message->receivedReply = true;
outAsync = 0;
}
- else if(outAsync->__finished())
+ else if(outAsync->completed())
{
++dispatchCount;
}
@@ -3531,7 +3557,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
outAsync = 0;
}
#else
- if(outAsync->__finished())
+ if(outAsync->completed())
{
++dispatchCount;
}
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 32c474d20da..3ecec79a247 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -30,6 +30,7 @@
#include <Ice/TraceLevelsF.h>
#include <Ice/OutgoingAsyncF.h>
#include <Ice/EventHandler.h>
+#include <Ice/RequestHandler.h>
#include <Ice/ResponseHandler.h>
#include <Ice/Dispatcher.h>
#include <Ice/ObserverHelper.h>
@@ -43,8 +44,7 @@ namespace IceInternal
{
class Outgoing;
-class BatchOutgoing;
-class OutgoingMessageCallback;
+class OutgoingBase;
}
@@ -56,6 +56,7 @@ class LocalException;
class ConnectionI : public Connection,
public IceInternal::EventHandler,
public IceInternal::ResponseHandler,
+ public IceInternal::CancellationHandler,
public IceUtil::Monitor<IceUtil::Mutex>
{
class Observer : public IceInternal::ObserverHelperT<Ice::Instrumentation::ConnectionObserver>
@@ -89,7 +90,7 @@ public:
{
}
- OutgoingMessage(IceInternal::OutgoingMessageCallback* o, IceInternal::BasicStream* str, bool comp, int rid) :
+ OutgoingMessage(IceInternal::OutgoingBase* o, IceInternal::BasicStream* str, bool comp, int rid) :
stream(str), out(o), compress(comp), requestId(rid), adopted(false)
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
, isSent(false), invokeSent(false), receivedReply(false)
@@ -97,7 +98,7 @@ public:
{
}
- OutgoingMessage(const IceInternal::OutgoingAsyncMessageCallbackPtr& o, IceInternal::BasicStream* str,
+ OutgoingMessage(const IceInternal::OutgoingAsyncBasePtr& o, IceInternal::BasicStream* str,
bool comp, int rid) :
stream(str), out(0), outAsync(o), compress(comp), requestId(rid), adopted(false)
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
@@ -107,13 +108,13 @@ public:
}
void adopt(IceInternal::BasicStream*);
- void timedOut(bool);
+ void canceled(bool);
bool sent();
- void finished(const Ice::LocalException&);
+ void completed(const Ice::LocalException&);
IceInternal::BasicStream* stream;
- IceInternal::OutgoingMessageCallback* out;
- IceInternal::OutgoingAsyncMessageCallbackPtr outAsync;
+ IceInternal::OutgoingBase* out;
+ IceInternal::OutgoingAsyncBasePtr outAsync;
bool compress;
int requestId;
bool adopted;
@@ -178,8 +179,8 @@ public:
virtual void end_flushBatchRequests(const AsyncResultPtr&);
- bool flushBatchRequests(IceInternal::BatchOutgoing*);
- IceInternal::AsyncStatus flushAsyncBatchRequests(const IceInternal::BatchOutgoingAsyncPtr&);
+ bool flushBatchRequests(IceInternal::OutgoingBase*);
+ IceInternal::AsyncStatus flushAsyncBatchRequests(const IceInternal::OutgoingAsyncBasePtr&);
virtual void setCallback(const ConnectionCallbackPtr&);
virtual void setACM(const IceUtil::Optional<int>&,
@@ -187,8 +188,8 @@ public:
const IceUtil::Optional<ACMHeartbeat>&);
virtual ACM getACM();
- void requestTimedOut(IceInternal::OutgoingMessageCallback*);
- void asyncRequestTimedOut(const IceInternal::OutgoingAsyncMessageCallbackPtr&);
+ virtual void requestCanceled(IceInternal::OutgoingBase*, const LocalException&);
+ virtual void asyncRequestCanceled(const IceInternal::OutgoingAsyncBasePtr&, const LocalException&);
virtual void sendResponse(Int, IceInternal::BasicStream*, Byte);
virtual void sendNoResponse();
diff --git a/cpp/src/Ice/ConnectionRequestHandler.cpp b/cpp/src/Ice/ConnectionRequestHandler.cpp
index 43dda1b88ed..a94d3e7180a 100644
--- a/cpp/src/Ice/ConnectionRequestHandler.cpp
+++ b/cpp/src/Ice/ConnectionRequestHandler.cpp
@@ -91,27 +91,27 @@ ConnectionRequestHandler::abortBatchRequest()
}
bool
-ConnectionRequestHandler::sendRequest(OutgoingMessageCallback* out)
+ConnectionRequestHandler::sendRequest(OutgoingBase* out)
{
return out->send(_connection, _compress, _response) && !_response; // Finished if sent and no response
}
AsyncStatus
-ConnectionRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& out)
+ConnectionRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out)
{
- return out->__send(_connection, _compress, _response);
+ return out->send(_connection, _compress, _response);
}
void
-ConnectionRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
+ConnectionRequestHandler::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex)
{
- _connection->requestTimedOut(out);
+ _connection->requestCanceled(out, ex);
}
void
-ConnectionRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
+ConnectionRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex)
{
- _connection->asyncRequestTimedOut(outAsync);
+ _connection->asyncRequestCanceled(outAsync, ex);
}
Ice::ConnectionIPtr
diff --git a/cpp/src/Ice/ConnectionRequestHandler.h b/cpp/src/Ice/ConnectionRequestHandler.h
index 5ab5a4c9ea7..211e8f02819 100644
--- a/cpp/src/Ice/ConnectionRequestHandler.h
+++ b/cpp/src/Ice/ConnectionRequestHandler.h
@@ -31,11 +31,11 @@ public:
virtual void finishBatchRequest(BasicStream*);
virtual void abortBatchRequest();
- virtual bool sendRequest(OutgoingMessageCallback*);
- virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr&);
+ virtual bool sendRequest(OutgoingBase*);
+ virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncBasePtr&);
- virtual void requestTimedOut(OutgoingMessageCallback*);
- virtual void asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr&);
+ virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&);
+ virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&);
virtual Ice::ConnectionIPtr getConnection();
virtual Ice::ConnectionIPtr waitForConnection();
diff --git a/cpp/src/Ice/Exception.cpp b/cpp/src/Ice/Exception.cpp
index a8bb8c6f81a..e5a3aed5b9f 100644
--- a/cpp/src/Ice/Exception.cpp
+++ b/cpp/src/Ice/Exception.cpp
@@ -502,6 +502,13 @@ Ice::InvocationTimeoutException::ice_print(ostream& out) const
}
void
+Ice::InvocationCanceledException::ice_print(ostream& out) const
+{
+ Exception::ice_print(out);
+ out << ":\ninvocation canceled";
+}
+
+void
Ice::ProtocolException::ice_print(ostream& out) const
{
Exception::ice_print(out);
diff --git a/cpp/src/Ice/Makefile b/cpp/src/Ice/Makefile
index 1398ab5d07a..f44e0c977a8 100644
--- a/cpp/src/Ice/Makefile
+++ b/cpp/src/Ice/Makefile
@@ -57,6 +57,7 @@ SLICE_OBJS = BuiltinSequences.o \
OBJS = Acceptor.o \
ACM.o \
Application.o \
+ AsyncResult.o \
Base64.o \
BasicStream.o \
Buffer.o \
diff --git a/cpp/src/Ice/Makefile.mak b/cpp/src/Ice/Makefile.mak
index 75cf8c26a68..fcc4d3eb3e9 100644
--- a/cpp/src/Ice/Makefile.mak
+++ b/cpp/src/Ice/Makefile.mak
@@ -59,6 +59,7 @@ WINDOWS_OBJS = .\DLLMain.obj
OBJS = .\Acceptor.obj \
.\ACM.obj \
.\Application.obj \
+ .\AsyncResult.obj \
.\Base64.obj \
.\BasicStream.obj \
.\Buffer.obj \
diff --git a/cpp/src/Ice/ObjectAdapterFactory.cpp b/cpp/src/Ice/ObjectAdapterFactory.cpp
index ca7938bcc55..35ca16ea03e 100644
--- a/cpp/src/Ice/ObjectAdapterFactory.cpp
+++ b/cpp/src/Ice/ObjectAdapterFactory.cpp
@@ -211,7 +211,7 @@ IceInternal::ObjectAdapterFactory::removeObjectAdapter(const ObjectAdapterPtr& a
}
void
-IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync) const
+IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchPtr& outAsync) const
{
list<ObjectAdapterIPtr> adapters;
{
diff --git a/cpp/src/Ice/ObjectAdapterFactory.h b/cpp/src/Ice/ObjectAdapterFactory.h
index 7faa8ef73f4..37b3853497c 100644
--- a/cpp/src/Ice/ObjectAdapterFactory.h
+++ b/cpp/src/Ice/ObjectAdapterFactory.h
@@ -33,7 +33,7 @@ public:
::Ice::ObjectAdapterPtr createObjectAdapter(const std::string&, const Ice::RouterPrx&);
::Ice::ObjectAdapterPtr findObjectAdapter(const ::Ice::ObjectPrx&);
void removeObjectAdapter(const ::Ice::ObjectAdapterPtr&);
- void flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr&) const;
+ void flushAsyncBatchRequests(const CommunicatorFlushBatchPtr&) const;
private:
diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp
index 422cdb8fd01..c9a05c091ca 100644
--- a/cpp/src/Ice/ObjectAdapterI.cpp
+++ b/cpp/src/Ice/ObjectAdapterI.cpp
@@ -743,7 +743,7 @@ Ice::ObjectAdapterI::isLocal(const ObjectPrx& proxy) const
}
void
-Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync)
+Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorFlushBatchPtr& outAsync)
{
vector<IncomingConnectionFactoryPtr> f;
{
diff --git a/cpp/src/Ice/ObjectAdapterI.h b/cpp/src/Ice/ObjectAdapterI.h
index 3897c0ef557..30f499d96c0 100644
--- a/cpp/src/Ice/ObjectAdapterI.h
+++ b/cpp/src/Ice/ObjectAdapterI.h
@@ -87,7 +87,7 @@ public:
bool isLocal(const ObjectPrx&) const;
- void flushAsyncBatchRequests(const IceInternal::CommunicatorBatchOutgoingAsyncPtr&);
+ void flushAsyncBatchRequests(const IceInternal::CommunicatorFlushBatchPtr&);
void updateConnectionObservers();
void updateThreadObservers();
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp
index 66509a2bedc..4815b9796fb 100644
--- a/cpp/src/Ice/Outgoing.cpp
+++ b/cpp/src/Ice/Outgoing.cpp
@@ -7,36 +7,44 @@
//
// **********************************************************************
+#include <IceUtil/DisableWarnings.h>
#include <Ice/Outgoing.h>
-#include <Ice/Object.h>
-#include <Ice/CollocatedRequestHandler.h>
#include <Ice/ConnectionI.h>
+#include <Ice/CollocatedRequestHandler.h>
#include <Ice/Reference.h>
-#include <Ice/Endpoint.h>
-#include <Ice/LocalException.h>
-#include <Ice/Protocol.h>
#include <Ice/Instance.h>
+#include <Ice/LocalException.h>
#include <Ice/ReplyStatus.h>
-#include <Ice/ProxyFactory.h>
+#include <Ice/ImplicitContextI.h>
using namespace std;
using namespace Ice;
using namespace Ice::Instrumentation;
using namespace IceInternal;
-IceInternal::Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode,
- const Context* context) :
+OutgoingBase::OutgoingBase(Instance* instance, const string& operation) :
+ _os(instance, Ice::currentProtocolEncoding), _sent(false)
+{
+}
+
+Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode, const Context* context) :
+ OutgoingBase(proxy->__reference()->getInstance().get(), operation),
_proxy(proxy),
_mode(mode),
- _observer(proxy, operation, context),
_state(StateUnsent),
_encoding(getCompatibleEncoding(proxy->__reference()->getEncoding())),
- _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding),
- _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding),
- _sent(false)
+ _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding)
{
checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol()));
+ _observer.attach(proxy, operation, context);
+
+ int invocationTimeout = _proxy->__reference()->getInvocationTimeout();
+ if(invocationTimeout > 0)
+ {
+ _invocationTimeoutDeadline = IceUtil::Time::now() + IceUtil::Time::milliSeconds(invocationTimeout);
+ }
+
switch(_proxy->__reference()->getMode())
{
case Reference::ModeTwoway:
@@ -129,7 +137,66 @@ Outgoing::~Outgoing()
}
bool
-IceInternal::Outgoing::invoke()
+Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response)
+{
+ return connection->sendRequest(this, compress, response);
+}
+
+void
+Outgoing::invokeCollocated(CollocatedRequestHandler* handler)
+{
+ handler->invokeRequest(this);
+}
+
+void
+Outgoing::sent()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ if(_proxy->__reference()->getMode() != Reference::ModeTwoway)
+ {
+ _childObserver.detach();
+ _state = StateOK;
+ }
+ _sent = true;
+ _monitor.notify();
+
+ //
+ // NOTE: At this point the stack allocated Outgoing object can be destroyed
+ // since the notify() on the monitor will release the thread waiting on the
+ // synchronous Ice call.
+ //
+}
+
+void
+Outgoing::completed(const Exception& ex)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ //assert(_state <= StateInProgress);
+ if(_state > StateInProgress)
+ {
+ //
+ // Response was already received but message
+ // didn't get removed first from the connection
+ // send message queue so it's possible we can be
+ // notified of failures. In this case, ignore the
+ // failure and assume the outgoing has been sent.
+ //
+ assert(_state != StateFailed);
+ _sent = true;
+ _monitor.notify();
+ return;
+ }
+
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
+
+ _state = StateFailed;
+ _exception.reset(ex.ice_clone());
+ _monitor.notify();
+}
+
+bool
+Outgoing::invoke()
{
assert(_state == StateUnsent);
@@ -146,6 +213,11 @@ IceInternal::Outgoing::invoke()
{
try
{
+ if(_invocationTimeoutDeadline != IceUtil::Time() && _invocationTimeoutDeadline <= IceUtil::Time::now())
+ {
+ throw Ice::InvocationTimeoutException(__FILE__, __LINE__);
+ }
+
_state = StateInProgress;
_exception.reset(0);
_sent = false;
@@ -164,19 +236,18 @@ IceInternal::Outgoing::invoke()
//
// If the handler says it's not finished, we wait until we're done.
//
- int invocationTimeout = _proxy->__reference()->getInvocationTimeout();
- if(invocationTimeout > 0)
+ if(_invocationTimeoutDeadline != IceUtil::Time())
{
IceUtil::Time now = IceUtil::Time::now();
- IceUtil::Time deadline = now + IceUtil::Time::milliSeconds(invocationTimeout);
+ timedOut = now >= _invocationTimeoutDeadline;
while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut)
{
- _monitor.timedWait(deadline - now);
+ _monitor.timedWait(_invocationTimeoutDeadline - now);
if((_state == StateInProgress || !_sent) && _state != StateFailed)
{
now = IceUtil::Time::now();
- timedOut = now >= deadline;
+ timedOut = now >= _invocationTimeoutDeadline;
}
}
}
@@ -191,15 +262,15 @@ IceInternal::Outgoing::invoke()
if(timedOut)
{
- _handler->requestTimedOut(this);
+ _handler->requestCanceled(this, InvocationTimeoutException(__FILE__, __LINE__));
//
// Wait for the exception to propagate. It's possible the request handler ignores
- // the timeout if there was a failure shortly before requestTimedOut got called.
+ // the timeout if there was a failure shortly before requestCanceled got called.
// In this case, the exception should be set on the Outgoing.
//
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- while(!_exception.get())
+ while(_state == StateInProgress)
{
_monitor.wait();
}
@@ -223,11 +294,23 @@ IceInternal::Outgoing::invoke()
{
try
{
- int interval = _proxy->__handleException(ex, _handler, _mode, _sent, cnt);
- _observer.retried(); // Invocation is being retried.
- if(interval > 0)
+ IceUtil::Time interval;
+ interval = IceUtil::Time::milliSeconds(_proxy->__handleException(ex, _handler, _mode, _sent, cnt));
+ if(interval > IceUtil::Time())
+ {
+ if(_invocationTimeoutDeadline != IceUtil::Time())
+ {
+ IceUtil::Time deadline = _invocationTimeoutDeadline - IceUtil::Time::now();
+ if(deadline < interval)
+ {
+ interval = deadline;
+ }
+ }
+ IceUtil::ThreadControl::sleep(interval);
+ }
+ if(_invocationTimeoutDeadline == IceUtil::Time() || _invocationTimeoutDeadline > IceUtil::Time::now())
{
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval));
+ _observer.retried();
}
}
catch(const Ice::Exception& ex)
@@ -243,7 +326,7 @@ IceInternal::Outgoing::invoke()
}
void
-IceInternal::Outgoing::abort(const LocalException& ex)
+Outgoing::abort(const LocalException& ex)
{
assert(_state == StateUnsent);
@@ -261,67 +344,8 @@ IceInternal::Outgoing::abort(const LocalException& ex)
ex.ice_throw();
}
-bool
-IceInternal::Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response)
-{
- return connection->sendRequest(this, compress, response);
-}
-
-void
-IceInternal::Outgoing::invokeCollocated(CollocatedRequestHandler* handler)
-{
- handler->invokeRequest(this);
-}
-
-void
-IceInternal::Outgoing::sent()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- if(_proxy->__reference()->getMode() != Reference::ModeTwoway)
- {
- _childObserver.detach();
- _state = StateOK;
- }
- _sent = true;
- _monitor.notify();
-
- //
- // NOTE: At this point the stack allocated Outgoing object can be destroyed
- // since the notify() on the monitor will release the thread waiting on the
- // synchronous Ice call.
- //
-}
-
void
-IceInternal::Outgoing::finished(const Exception& ex)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- //assert(_state <= StateInProgress);
- if(_state > StateInProgress)
- {
- //
- // Response was already received but message
- // didn't get removed first from the connection
- // send message queue so it's possible we can be
- // notified of failures. In this case, ignore the
- // failure and assume the outgoing has been sent.
- //
- assert(_state != StateFailed);
- _sent = true;
- _monitor.notify();
- return;
- }
-
- _childObserver.failed(ex.ice_name());
- _childObserver.detach();
-
- _state = StateFailed;
- _exception.reset(ex.ice_clone());
- _monitor.notify();
-}
-
-void
-IceInternal::Outgoing::finished(BasicStream& is)
+Outgoing::completed(BasicStream& is)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
@@ -482,7 +506,7 @@ IceInternal::Outgoing::finished(BasicStream& is)
}
void
-IceInternal::Outgoing::throwUserException()
+Outgoing::throwUserException()
{
try
{
@@ -496,27 +520,22 @@ IceInternal::Outgoing::throwUserException()
}
}
-IceInternal::BatchOutgoing::BatchOutgoing(IceProxy::Ice::Object* proxy, const string& name) :
- _proxy(proxy),
- _connection(0),
- _sent(false),
- _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding),
- _observer(proxy, name, 0)
+FlushBatch::FlushBatch(IceProxy::Ice::Object* proxy, const string& operation) :
+ OutgoingBase(proxy->__reference()->getInstance().get(), operation), _proxy(proxy), _connection(0)
{
checkSupportedProtocol(proxy->__reference()->getProtocol());
+
+ _observer.attach(proxy->__reference()->getInstance().get(), operation);
}
-IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance, const string& name) :
- _proxy(0),
- _connection(connection),
- _sent(false),
- _os(instance, Ice::currentProtocolEncoding),
- _observer(instance, name)
+FlushBatch::FlushBatch(ConnectionI* connection, Instance* instance, const string& operation) :
+ OutgoingBase(instance, operation), _proxy(0), _connection(connection)
{
+ _observer.attach(instance, operation);
}
void
-IceInternal::BatchOutgoing::invoke()
+FlushBatch::invoke()
{
assert(_proxy || _connection);
@@ -577,7 +596,8 @@ IceInternal::BatchOutgoing::invoke()
if(timedOut)
{
- handler->requestTimedOut(this);
+ Ice::InvocationTimeoutException ex(__FILE__, __LINE__);
+ handler->requestCanceled(this, ex);
//
// Wait for the exception to propagate. It's possible the request handler ignores
@@ -614,19 +634,19 @@ IceInternal::BatchOutgoing::invoke()
}
bool
-IceInternal::BatchOutgoing::send(const Ice::ConnectionIPtr& connection, bool, bool)
+FlushBatch::send(const Ice::ConnectionIPtr& connection, bool, bool)
{
return connection->flushBatchRequests(this);
}
void
-IceInternal::BatchOutgoing::invokeCollocated(CollocatedRequestHandler* handler)
+FlushBatch::invokeCollocated(CollocatedRequestHandler* handler)
{
handler->invokeBatchRequests(this);
}
void
-IceInternal::BatchOutgoing::sent()
+FlushBatch::sent()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
_childObserver.detach();
@@ -635,14 +655,14 @@ IceInternal::BatchOutgoing::sent()
_monitor.notify();
//
- // NOTE: At this point the stack allocated BatchOutgoing object
+ // NOTE: At this point the stack allocated FlushBatch object
// can be destroyed since the notify() on the monitor will release
// the thread waiting on the synchronous Ice call.
//
}
void
-IceInternal::BatchOutgoing::finished(const Ice::Exception& ex)
+FlushBatch::completed(const Ice::Exception& ex)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
_childObserver.failed(ex.ice_name());
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index fb57965b60f..b348b1c8cc8 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -9,18 +9,11 @@
#include <IceUtil/DisableWarnings.h>
#include <Ice/OutgoingAsync.h>
-#include <Ice/Object.h>
#include <Ice/ConnectionI.h>
#include <Ice/CollocatedRequestHandler.h>
#include <Ice/Reference.h>
#include <Ice/Instance.h>
#include <Ice/LocalException.h>
-#include <Ice/Properties.h>
-#include <Ice/LoggerUtil.h>
-#include <Ice/LocatorInfo.h>
-#include <Ice/ProxyFactory.h>
-#include <Ice/RouterInfo.h>
-#include <Ice/Protocol.h>
#include <Ice/ReplyStatus.h>
#include <Ice/ImplicitContextI.h>
#include <Ice/ThreadPool.h>
@@ -30,431 +23,289 @@ using namespace std;
using namespace Ice;
using namespace IceInternal;
-IceUtil::Shared* Ice::upCast(AsyncResult* p) { return p; }
-
-IceUtil::Shared* IceInternal::upCast(OutgoingAsyncMessageCallback* p) { return p; }
+IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; }
+IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; }
IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(ProxyBatchOutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(ConnectionBatchOutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(CommunicatorBatchOutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(GetConnectionOutgoingAsync* p) { return p; }
-
-const unsigned char Ice::AsyncResult::OK = 0x1;
-const unsigned char Ice::AsyncResult::Done = 0x2;
-const unsigned char Ice::AsyncResult::Sent = 0x4;
-const unsigned char Ice::AsyncResult::EndCalled = 0x8;
-
-namespace
-{
-
-class AsynchronousException : public DispatchWorkItem
-{
-public:
-
- AsynchronousException(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result,
- const Ice::Exception& ex) :
- DispatchWorkItem(connection), _result(result), _exception(ex.ice_clone())
- {
- }
-
- virtual void
- run()
- {
- _result->__invokeException(*_exception.get());
- }
-
-private:
-
- const Ice::AsyncResultPtr _result;
- const IceUtil::UniquePtr<Ice::Exception> _exception;
-};
-
-class AsynchronousSent : public DispatchWorkItem
-{
-public:
-
- AsynchronousSent(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result) :
- DispatchWorkItem(connection), _result(result)
- {
- }
-
- virtual void
- run()
- {
- _result->__invokeSent();
- }
+IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatch* p) { return p; }
-private:
-
- const Ice::AsyncResultPtr _result;
-};
-
-};
-
-Ice::AsyncResult::AsyncResult(const CommunicatorPtr& communicator,
- const IceInternal::InstancePtr& instance,
- const string& op,
- const CallbackBasePtr& del,
- const LocalObjectPtr& cookie) :
- _communicator(communicator),
- _instance(instance),
- _operation(op),
- _callback(del),
- _cookie(cookie),
- _is(instance.get(), Ice::currentProtocolEncoding),
- _os(instance.get(), Ice::currentProtocolEncoding),
- _state(0),
- _sentSynchronously(false),
- _exception(0)
+bool
+OutgoingAsyncBase::sent()
{
- if(!_callback)
- {
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__);
- }
- const_cast<CallbackBasePtr&>(_callback) = _callback->verify(_cookie);
+ return sent(true);
}
-Ice::AsyncResult::~AsyncResult()
+bool
+OutgoingAsyncBase::completed(const Exception& ex)
{
+ return finished(ex);
}
-Int
-Ice::AsyncResult::getHash() const
+OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
+ AsyncResult(communicator, instance, operation, delegate, cookie),
+ _os(instance.get(), Ice::currentProtocolEncoding)
{
- return static_cast<Int>(reinterpret_cast<Long>(this) >> 4);
}
bool
-Ice::AsyncResult::isCompleted() const
+OutgoingAsyncBase::sent(bool done)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- return _state & Done;
-}
-
-void
-Ice::AsyncResult::waitForCompleted()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- while(!(_state & Done))
+ if(done)
{
- _monitor.wait();
+ _childObserver.detach();
}
+ return AsyncResult::sent(done);
}
bool
-Ice::AsyncResult::isSent() const
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- return _state & Sent;
-}
-
-void
-Ice::AsyncResult::waitForSent()
+OutgoingAsyncBase::finished(const Exception& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- while(!(_state & Sent) && !_exception.get())
+ if(_childObserver)
{
- _monitor.wait();
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
}
+ return AsyncResult::finished(ex);
}
-void
-Ice::AsyncResult::throwLocalException() const
+Ice::ObjectPrx
+ProxyOutgoingAsyncBase::getProxy() const
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- if(_exception.get())
- {
- _exception.get()->ice_throw();
- }
+ return _proxy;
}
bool
-Ice::AsyncResult::__wait()
+ProxyOutgoingAsyncBase::sent()
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- if(_state & EndCalled)
- {
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once");
- }
- _state |= EndCalled;
- while(!(_state & Done))
- {
- _monitor.wait();
- }
- if(_exception.get())
- {
- _exception.get()->ice_throw();
- }
- return _state & OK;
+ return sent(!_proxy->ice_isTwoway()); // Done if it's not a two-way proxy (no response expected).
}
-void
-Ice::AsyncResult::__throwUserException()
+bool
+ProxyOutgoingAsyncBase::completed(const Exception& exc)
{
- try
- {
- _is.startReadEncaps();
- _is.throwException();
- }
- catch(const Ice::UserException&)
+ if(_childObserver)
{
- _is.endReadEncaps();
- throw;
+ _childObserver.failed(exc.ice_name());
+ _childObserver.detach();
}
-}
-void
-Ice::AsyncResult::__invokeSent()
-{
//
- // Note: no need to change the _state here, specializations are responsible for
- // changing the state.
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback.
//
-
- if(_callback)
+ try
{
- try
- {
- AsyncResultPtr self(this);
- _callback->sent(self);
- }
- catch(const std::exception& ex)
- {
- __warning(ex);
- }
- catch(...)
- {
- __warning();
- }
+ _instance->retryQueue()->add(this, handleException(exc));
+ return false;
}
-
- if(_observer)
+ catch(const Exception& ex)
{
- Ice::ObjectPrx proxy = getProxy();
- if(!proxy || !proxy->ice_isTwoway())
- {
- _observer.detach();
- }
+ return finished(ex); // No retries, we're done
}
}
void
-Ice::AsyncResult::__invokeSentAsync()
+ProxyOutgoingAsyncBase::retry()
{
- //
- // This is called when it's not safe to call the sent callback synchronously
- // from this thread. Instead the exception callback is called asynchronously from
- // the client thread pool.
- //
- try
- {
- _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, this));
- }
- catch(const Ice::CommunicatorDestroyedException&)
- {
- }
+ invokeImpl(false);
}
void
-Ice::AsyncResult::__invokeException(const Ice::Exception& ex)
+ProxyOutgoingAsyncBase::abort(const Ice::Exception& ex)
{
+ assert(!_childObserver);
+
+ if(finished(ex))
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- _state |= Done;
- _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
- _exception.reset(ex.ice_clone());
- _monitor.notifyAll();
+ invokeCompletedAsync();
+ }
+ else if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex))
+ {
+ //
+ // If it's a communicator destroyed exception, don't swallow
+ // it but instead notify the user thread. Even if no callback
+ // was provided.
+ //
+ ex.ice_throw();
}
-
- __invokeCompleted();
}
-void
-Ice::AsyncResult::__invokeExceptionAsync(const Ice::Exception& ex)
+ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrx& prx,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
+ OutgoingAsyncBase(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie),
+ _proxy(prx),
+ _mode(Normal),
+ _cnt(0),
+ _sent(false)
{
- //
- // This is called when it's not safe to call the exception callback synchronously
- // from this thread. Instead the exception callback is called asynchronously from
- // the client thread pool.
- //
- // CommunicatorDestroyedException is the only exception that can propagate directly
- // from this method.
- //
- _instance->clientThreadPool()->dispatch(new AsynchronousException(_cachedConnection, this, ex));
}
void
-Ice::AsyncResult::__invokeCompleted()
+ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
{
- //
- // Note: no need to change the _state here, specializations are responsible for
- // changing the state.
- //
-
- if(_callback)
+ try
{
- try
+ if(userThread)
{
- AsyncResultPtr self(this);
- _callback->completed(self);
+ int invocationTimeout = _proxy->__reference()->getInvocationTimeout();
+ if(invocationTimeout > 0)
+ {
+ _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout));
+ }
}
- catch(const std::exception& ex)
+ else
{
- __warning(ex);
+ checkCanceled(); // Cancellation exception aren't retriable
+ _observer.retried();
}
- catch(...)
+
+ while(true)
{
- __warning();
+ try
+ {
+ _sent = false;
+ _handler = _proxy->__getRequestHandler();
+ AsyncStatus status = _handler->sendAsyncRequest(this);
+ if(status & AsyncStatusSent)
+ {
+ if(userThread)
+ {
+ _sentSynchronously = true;
+ if(status & AsyncStatusInvokeSentCallback)
+ {
+ invokeSent(); // Call the sent callback from the user thread.
+ }
+ }
+ else
+ {
+ if(status & AsyncStatusInvokeSentCallback)
+ {
+ invokeSentAsync(); // Call the sent callback from a client thread pool thread.
+ }
+ }
+ }
+ return; // We're done!
+ }
+ catch(const RetryException& ex)
+ {
+ handleRetryException(ex);
+ }
+ catch(const Exception& ex)
+ {
+ if(_childObserver)
+ {
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
+ }
+ int interval = handleException(ex);
+ if(interval > 0)
+ {
+ _instance->retryQueue()->add(this, interval);
+ return;
+ }
+ else
+ {
+ checkCanceled(); // Cancellation exception aren't retriable
+ _observer.retried();
+ }
+ }
}
}
-
- _observer.detach();
-}
-
-void
-Ice::AsyncResult::runTimerTask() // Implementation of TimerTask::runTimerTask()
-{
- RequestHandlerPtr handler;
+ catch(const Exception& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- swap(handler, _timeoutRequestHandler);
- }
-
- if(handler)
- {
- handler->asyncRequestTimedOut(OutgoingAsyncMessageCallbackPtr::dynamicCast(this));
+ //
+ // If called from the user thread we re-throw, the exception
+ // will be catch by the caller and abort() will be called.
+ //
+ if(userThread)
+ {
+ throw;
+ }
+ else if(finished(ex)) // No retries, we're done
+ {
+ invokeCompletedAsync();
+ }
}
}
-
-void
-Ice::AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation)
+bool
+ProxyOutgoingAsyncBase::sent(bool done)
{
- __check(r, operation);
- if(r->getProxy().get() != prx)
+ _sent = true;
+ if(done)
{
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Proxy for call to end_" + operation +
- " does not match proxy that was used to call corresponding begin_" +
- operation + " method");
+ if(_proxy->__reference()->getInvocationTimeout() > 0)
+ {
+ _instance->timer()->cancel(this);
+ }
}
+ return OutgoingAsyncBase::sent(done);
}
-void
-Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation)
+bool
+ProxyOutgoingAsyncBase::finished(const Exception& ex)
{
- __check(r, operation);
- if(r->getCommunicator().get() != com)
+ if(_proxy->__reference()->getInvocationTimeout() > 0)
{
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Communicator for call to end_" + operation +
- " does not match communicator that was used to call corresponding " +
- "begin_" + operation + " method");
+ _instance->timer()->cancel(this);
}
+ return OutgoingAsyncBase::finished(ex);
}
-void
-Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation)
+bool
+ProxyOutgoingAsyncBase::finished(bool ok)
{
- __check(r, operation);
- if(r->getConnection().get() != con)
+ if(_proxy->__reference()->getInvocationTimeout() > 0)
{
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Connection for call to end_" + operation +
- " does not match connection that was used to call corresponding " +
- "begin_" + operation + " method");
+ _instance->timer()->cancel(this);
}
+ return AsyncResult::finished(ok);
}
void
-Ice::AsyncResult::__check(const AsyncResultPtr& r, const string& operation)
+ProxyOutgoingAsyncBase::handleRetryException(const RetryException& exc)
{
- if(!r)
- {
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "AsyncResult == null");
- }
- else if(&r->_operation != &operation)
- {
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Incorrect operation for end_" + operation +
- " method: " + r->_operation);
- }
+ _proxy->__setRequestHandler(_handler, 0); // Clear request handler and always retry.
}
-
-void
-Ice::AsyncResult::__warning(const std::exception& exc) const
+int
+ProxyOutgoingAsyncBase::handleException(const Exception& exc)
{
- if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
- {
- Warning out(_instance->initializationData().logger);
- const Exception* ex = dynamic_cast<const Exception*>(&exc);
- if(ex)
- {
- out << "Ice::Exception raised by AMI callback:\n" << *ex;
- }
- else
- {
- out << "std::exception raised by AMI callback:\n" << exc.what();
- }
- }
+ return _proxy->__handleException(exc, _handler, _mode, _sent, _cnt);
}
void
-Ice::AsyncResult::__warning() const
+ProxyOutgoingAsyncBase::runTimerTask()
{
- if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ try
{
- Warning out(_instance->initializationData().logger);
- out << "unknown exception raised by AMI callback";
+ cancel(InvocationTimeoutException(__FILE__, __LINE__));
}
-}
-
-void
-IceInternal::OutgoingAsyncMessageCallback::__dispatchInvocationTimeout(const ThreadPoolPtr& threadPool,
- const Ice::ConnectionPtr& connection)
-{
- class InvocationTimeoutCall : public DispatchWorkItem
+ catch(const CommunicatorDestroyedException&)
{
- public:
-
- InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) :
- DispatchWorkItem(connection), _outAsync(outAsync)
- {
- }
-
- virtual void
- run()
- {
- InvocationTimeoutException ex(__FILE__, __LINE__);
- _outAsync->__finished(ex);
- }
-
- private:
-
- const OutgoingAsyncMessageCallbackPtr _outAsync;
- };
- threadPool->dispatch(new InvocationTimeoutCall(this, connection));
+ }
}
-IceInternal::OutgoingAsync::OutgoingAsync(const ObjectPrx& prx,
- const std::string& operation,
- const CallbackBasePtr& delegate,
- const Ice::LocalObjectPtr& cookie) :
- AsyncResult(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie),
- _proxy(prx),
+OutgoingAsync::OutgoingAsync(const ObjectPrx& prx,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
+ ProxyOutgoingAsyncBase(prx, operation, delegate, cookie),
_encoding(getCompatibleEncoding(prx->__reference()->getEncoding()))
{
}
void
-IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMode mode, const Context* context)
+OutgoingAsync::prepare(const string& operation, OperationMode mode, const Context* context)
{
- _handler = 0;
- _cnt = 0;
- _sent = false;
- _mode = mode;
- _sentSynchronously = false;
-
checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol()));
+ _mode = mode;
_observer.attach(_proxy.get(), operation, context);
switch(_proxy->__reference()->getMode())
@@ -482,7 +333,7 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod
{
_proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
}
- catch(const Ice::LocalException& ex)
+ catch(const LocalException& ex)
{
_observer.failed(ex.ice_name());
_proxy->__setRequestHandler(_handler, 0); // Clear request handler
@@ -541,109 +392,63 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod
}
AsyncStatus
-IceInternal::OutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool compress, bool response)
+OutgoingAsync::send(const ConnectionIPtr& connection, bool compress, bool response)
{
_cachedConnection = connection;
return connection->sendAsyncRequest(this, compress, response);
}
AsyncStatus
-IceInternal::OutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler)
+OutgoingAsync::invokeCollocated(CollocatedRequestHandler* handler)
{
return handler->invokeAsyncRequest(this);
}
-bool
-IceInternal::OutgoingAsync::__sent()
+void
+OutgoingAsync::abort(const Exception& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
-
- bool alreadySent = _state & Sent; // Expected in case of a retry.
- _state |= Sent;
- _sent = true;
-
- assert(!(_state & Done));
- if(!_proxy->ice_isTwoway())
+ const Reference::Mode mode = _proxy->__reference()->getMode();
+ if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
{
- _childObserver.detach();
- if(!_callback || !_callback->hasSentCallback())
+ if(_handler)
{
- _observer.detach();
- }
- if(_timeoutRequestHandler)
- {
- _instance->timer()->cancel(this);
- _timeoutRequestHandler = 0;
+ //
+ // If we didn't finish a batch oneway or datagram request, we
+ // must notify the connection about that we give up ownership
+ // of the batch stream.
+ //
+ _handler->abortBatchRequest();
}
- _state |= Done | OK;
- //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization.
}
- _monitor.notifyAll();
- return !alreadySent && _callback && _callback->hasSentCallback();
-}
-
-void
-IceInternal::OutgoingAsync::__invokeSent()
-{
- ::Ice::AsyncResult::__invokeSent();
+
+ ProxyOutgoingAsyncBase::abort(ex);
}
void
-IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc)
+OutgoingAsync::invoke()
{
+ const Reference::Mode mode = _proxy->__reference()->getMode();
+ if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(!(_state & Done));
- _childObserver.failed(exc.ice_name());
- _childObserver.detach();
- if(_timeoutRequestHandler)
+ if(_handler)
{
- _instance->timer()->cancel(this);
- _timeoutRequestHandler = 0;
+ _sentSynchronously = true;
+ _handler->finishBatchRequest(&_os);
+ finished(true);
}
+ return; // Don't call sent/completed callback for batch AMI requests
}
//
- // NOTE: at this point, synchronization isn't needed, no other threads should be
- // calling on the callback.
+ // NOTE: invokeImpl doesn't throw so this can be called from the
+ // try block with the catch block calling abort() in case of an
+ // exception.
//
- try
- {
- handleException(exc);
- }
- catch(const Ice::Exception& ex)
- {
- __invokeException(ex);
- }
-}
-
-void
-IceInternal::OutgoingAsync::__invokeExceptionAsync(const Ice::Exception& ex)
-{
- if((_state & Done) == 0 && _handler)
- {
- //
- // If we didn't finish a batch oneway or datagram request, we
- // must notify the connection about that we give up ownership
- // of the batch stream.
- //
- int mode = _proxy->__reference()->getMode();
- if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
- {
- _handler->abortBatchRequest();
- }
- }
- AsyncResult::__invokeExceptionAsync(ex);
-}
-
-void
-IceInternal::OutgoingAsync::__processRetry()
-{
- __invoke(false);
+ invokeImpl(true); // userThread = true
}
bool
-IceInternal::OutgoingAsync::__finished()
+OutgoingAsync::completed()
{
//
// NOTE: this method is called from ConnectionI.parseMessage
@@ -652,25 +457,15 @@ IceInternal::OutgoingAsync::__finished()
//
assert(_proxy->ice_isTwoway()); // Can only be called for twoways.
- Ice::Byte replyStatus;
- try
+ if(_childObserver)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(!_exception.get() && !(_state & Done));
- assert(!_is.b.empty());
-
- if(_childObserver)
- {
- _childObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4));
- }
+ _childObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4));
_childObserver.detach();
+ }
- if(_timeoutRequestHandler)
- {
- _instance->timer()->cancel(this);
- _timeoutRequestHandler = 0;
- }
-
+ Byte replyStatus;
+ try
+ {
_is.read(replyStatus);
switch(replyStatus)
@@ -789,373 +584,197 @@ IceInternal::OutgoingAsync::__finished()
}
}
- _state |= Done;
- _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
- if(replyStatus == replyOK)
- {
- _state |= OK;
- }
- _monitor.notifyAll();
-
- if(!_callback)
- {
- _observer.detach();
- return false;
- }
- return true;
+ return finished(replyStatus == replyOK);
}
- catch(const LocalException& exc)
+ catch(const Exception& ex)
{
- //
- // We don't call finished(exc) here because we don't want
- // to invoke the completion callback. The completion
- // callback is invoked by the connection is this method
- // returns true.
- //
- try
- {
- handleException(exc);
- return false; // Invocation will be retried.
- }
- catch(const Ice::Exception& ex)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- _state |= Done;
- _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
- _exception.reset(ex.ice_clone());
- _monitor.notifyAll();
-
- if(!_callback)
- {
- _observer.detach();
- return false;
- }
- return true;
- }
+ return completed(ex);
}
}
+ProxyFlushBatch::ProxyFlushBatch(const ObjectPrx& proxy,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
+ ProxyOutgoingAsyncBase(proxy, operation, delegate, cookie)
+{
+ _observer.attach(proxy.get(), operation, 0);
+}
+
bool
-IceInternal::OutgoingAsync::__invoke(bool userThread)
+ProxyFlushBatch::sent()
{
- const Reference::Mode mode = _proxy->__reference()->getMode();
- if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
- {
- _state |= Done | OK;
- _handler->finishBatchRequest(&_os);
- _observer.detach();
- return true;
- }
+ return ProxyOutgoingAsyncBase::sent(true); // Overriden because the flush is done even if using a two-way proxy.
+}
- while(true)
- {
- try
- {
- _sent = false;
- _handler = _proxy->__getRequestHandler();
- AsyncStatus status = _handler->sendAsyncRequest(this);
- if(status & AsyncStatusSent)
- {
- if(userThread)
- {
- _sentSynchronously = true;
- if(status & AsyncStatusInvokeSentCallback)
- {
- __invokeSent(); // Call the sent callback from the user thread.
- }
- }
- else
- {
- if(status & AsyncStatusInvokeSentCallback)
- {
- __invokeSentAsync(); // Call the sent callback from a client thread pool thread.
- }
- }
- }
+AsyncStatus
+ProxyFlushBatch::send(const ConnectionIPtr& connection, bool, bool)
+{
+ _cachedConnection = connection;
+ return connection->flushAsyncBatchRequests(this);
+}
- if(mode == Reference::ModeTwoway || !(status & AsyncStatusSent))
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- if(!(_state & Done))
- {
- int invocationTimeout = _handler->getReference()->getInvocationTimeout();
- if(invocationTimeout > 0)
- {
- _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout));
- _timeoutRequestHandler = _handler;
- }
- }
- }
- }
- catch(const RetryException&)
- {
- _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
- continue;
- }
- catch(const Ice::Exception& ex)
- {
- handleException(ex);
- }
- break;
- }
- return _sentSynchronously;
+AsyncStatus
+ProxyFlushBatch::invokeCollocated(CollocatedRequestHandler* handler)
+{
+ return handler->invokeAsyncBatchRequests(this);
}
void
-IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc)
+ProxyFlushBatch::invoke()
{
- try
- {
- int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt);
- _observer.retried(); // Invocation is being retried.
-
- //
- // Schedule the retry. Note that we always schedule the retry
- // on the retry queue even if the invocation can be retried
- // immediately. This is required because it might not be safe
- // to retry from this thread (this is for instance called by
- // finished(BasicStream) which is called with the connection
- // locked.
- //
- _instance->retryQueue()->add(this, interval);
- }
- catch(const Ice::Exception& ex)
- {
- _observer.failed(ex.ice_name());
- throw;
- }
+ checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol()));
+ invokeImpl(true); // userThread = true
}
-IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& communicator,
- const InstancePtr& instance,
- const std::string& operation,
- const CallbackBasePtr& delegate,
- const Ice::LocalObjectPtr& cookie) :
- AsyncResult(communicator, instance, operation, delegate, cookie)
+void
+ProxyFlushBatch::handleRetryException(const RetryException& ex)
{
+ _proxy->__setRequestHandler(_handler, 0); // Clear request handler
+ ex.get()->ice_throw(); // No retries, we want to notify the user of potentially lost batch requests
}
-AsyncStatus
-IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool, bool)
+int
+ProxyFlushBatch::handleException(const Exception& ex)
{
- _cachedConnection = connection;
- return connection->flushAsyncBatchRequests(this);
+ _proxy->__setRequestHandler(_handler, 0); // Clear request handler
+ ex.ice_throw(); // No retries, we want to notify the user of potentially lost batch requests
+ return 0;
}
-AsyncStatus
-IceInternal::BatchOutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler)
+ProxyGetConnection::ProxyGetConnection(const ObjectPrx& prx,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
+ ProxyOutgoingAsyncBase(prx, operation, delegate, cookie)
{
- return handler->invokeAsyncBatchRequests(this);
+ _observer.attach(prx.get(), operation, 0);
}
-bool
-IceInternal::BatchOutgoingAsync::__sent()
+AsyncStatus
+ProxyGetConnection::send(const ConnectionIPtr& connection, bool, bool)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(!_exception.get());
- _state |= Done | OK | Sent;
- //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization.
- _childObserver.detach();
- if(_timeoutRequestHandler)
- {
- _instance->timer()->cancel(this);
- _timeoutRequestHandler = 0;
- }
- _monitor.notifyAll();
- if(!_callback || !_callback->hasSentCallback())
+ _cachedConnection = connection;
+ if(finished(true))
{
- _observer.detach();
- return false;
+ invokeCompletedAsync();
}
- return true;
+ return AsyncStatusSent;
}
-void
-IceInternal::BatchOutgoingAsync::__invokeSent()
+AsyncStatus
+ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*)
{
- ::Ice::AsyncResult::__invokeSent();
+ if(finished(true))
+ {
+ invokeCompletedAsync();
+ }
+ return AsyncStatusSent;
}
void
-IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc)
+ProxyGetConnection::invoke()
{
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- _childObserver.failed(exc.ice_name());
- _childObserver.detach();
- if(_timeoutRequestHandler)
- {
- _instance->timer()->cancel(this);
- _timeoutRequestHandler = 0;
- }
- }
- __invokeException(exc);
+ invokeImpl(true); // userThread = true
}
-void
-IceInternal::BatchOutgoingAsync::__processRetry()
+ConnectionFlushBatch::ConnectionFlushBatch(const ConnectionIPtr& connection,
+ const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
+ OutgoingAsyncBase(communicator, instance, operation, delegate, cookie), _connection(connection)
{
- assert(false); // Retries are never scheduled
+ _observer.attach(instance.get(), operation);
}
-IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectPrx& proxy,
- const std::string& operation,
- const CallbackBasePtr& delegate,
- const Ice::LocalObjectPtr& cookie) :
- BatchOutgoingAsync(proxy->ice_getCommunicator(), proxy->__reference()->getInstance(), operation, delegate, cookie),
- _proxy(proxy)
+ConnectionPtr
+ConnectionFlushBatch::getConnection() const
{
- _observer.attach(proxy.get(), operation, 0);
+ return _connection;
}
void
-IceInternal::ProxyBatchOutgoingAsync::__invoke()
+ConnectionFlushBatch::invoke()
{
- checkSupportedProtocol(_proxy->__reference()->getProtocol());
-
- RequestHandlerPtr handler;
try
{
- handler = _proxy->__getRequestHandler();
- AsyncStatus status = handler->sendAsyncRequest(this);
+ AsyncStatus status = _connection->flushAsyncBatchRequests(this);
if(status & AsyncStatusSent)
{
_sentSynchronously = true;
if(status & AsyncStatusInvokeSentCallback)
{
- __invokeSent();
- }
- }
- else
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- if(!(_state & Done))
- {
- int invocationTimeout = handler->getReference()->getInvocationTimeout();
- if(invocationTimeout > 0)
- {
- _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout));
- _timeoutRequestHandler = handler;
- }
+ invokeSent();
}
}
}
- catch(const RetryException&)
- {
- //
- // Clear request handler but don't retry or throw. Retrying
- // isn't useful, there were no batch requests associated with
- // the proxy's request handler.
- //
- _proxy->__setRequestHandler(handler, 0);
- }
- catch(const Ice::Exception& ex)
+ catch(const Exception& ex)
{
- _observer.failed(ex.ice_name());
- _proxy->__setRequestHandler(handler, 0); // Clear request handler
- throw; // Throw to notify the user that batch requests were potentially lost.
- }
-}
-
-IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const ConnectionIPtr& con,
- const CommunicatorPtr& communicator,
- const InstancePtr& instance,
- const string& operation,
- const CallbackBasePtr& delegate,
- const Ice::LocalObjectPtr& cookie) :
- BatchOutgoingAsync(communicator, instance, operation, delegate, cookie),
- _connection(con)
-{
- _observer.attach(instance.get(), operation);
-}
-
-void
-IceInternal::ConnectionBatchOutgoingAsync::__invoke()
-{
- AsyncStatus status = _connection->flushAsyncBatchRequests(this);
- if(status & AsyncStatusSent)
- {
- _sentSynchronously = true;
- if(status & AsyncStatusInvokeSentCallback)
+ if(completed(ex))
{
- __invokeSent();
+ invokeCompletedAsync();
}
}
}
-Ice::ConnectionPtr
-IceInternal::ConnectionBatchOutgoingAsync::getConnection() const
+CommunicatorFlushBatch::CommunicatorFlushBatch(const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const string& operation,
+ const CallbackBasePtr& cb,
+ const LocalObjectPtr& cookie) :
+ AsyncResult(communicator, instance, operation, cb, cookie)
{
- return _connection;
-}
+ _observer.attach(instance.get(), operation);
-IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(const CommunicatorPtr& communicator,
- const InstancePtr& instance,
- const string& operation,
- const CallbackBasePtr& delegate,
- const Ice::LocalObjectPtr& cookie) :
- AsyncResult(communicator, instance, operation, delegate, cookie)
-{
//
// _useCount is initialized to 1 to prevent premature callbacks.
// The caller must invoke ready() after all flush requests have
// been initiated.
//
_useCount = 1;
-
- //
- // Assume all connections are flushed synchronously.
- //
- _sentSynchronously = true;
-
- //
- // Attach observer
- //
- _observer.attach(instance.get(), operation);
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPtr& con)
+CommunicatorFlushBatch::flushConnection(const ConnectionIPtr& con)
{
- class BatchOutgoingAsyncI : public BatchOutgoingAsync
+ class FlushBatch : public OutgoingAsyncBase
{
public:
-
- BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync,
- const InstancePtr& instance,
- InvocationObserver& observer) :
- BatchOutgoingAsync(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0),
- _outAsync(outAsync), _observer(observer)
+
+ FlushBatch(const CommunicatorFlushBatchPtr& outAsync,
+ const InstancePtr& instance,
+ InvocationObserver& observer) :
+ OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0),
+ _outAsync(outAsync),
+ _observer(observer)
{
}
- virtual bool __sent()
+ virtual bool sent()
{
_childObserver.detach();
_outAsync->check(false);
return false;
}
-#ifdef __SUNPRO_CC
- using BatchOutgoingAsync::__sent;
-#endif
-
- virtual void __finished(const Ice::Exception& ex)
+ virtual bool completed(const Exception& ex)
{
_childObserver.failed(ex.ice_name());
_childObserver.detach();
_outAsync->check(false);
+ return false;
}
- virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt,
- Ice::Int requestId, Ice::Int sz)
+ private:
+
+ virtual InvocationObserver& getObserver()
{
- _childObserver.attach(_observer.getRemoteObserver(connection, endpt, requestId, sz));
+ return _observer;
}
- private:
-
- const CommunicatorBatchOutgoingAsyncPtr _outAsync;
+ const CommunicatorFlushBatchPtr _outAsync;
InvocationObserver& _observer;
};
@@ -1166,13 +785,9 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt
try
{
- AsyncStatus status = con->flushAsyncBatchRequests(new BatchOutgoingAsyncI(this, _instance, _observer));
- if(!(status & AsyncStatusSent))
- {
- _sentSynchronously = false;
- }
+ con->flushAsyncBatchRequests(new FlushBatch(this, _instance, _observer));
}
- catch(const Ice::LocalException&)
+ catch(const LocalException&)
{
check(false);
throw;
@@ -1180,19 +795,13 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::ready()
+CommunicatorFlushBatch::ready()
{
check(true);
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::__processRetry()
-{
- assert(false); // Retries are never scheduled
-}
-
-void
-IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread)
+CommunicatorFlushBatch::check(bool userThread)
{
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
@@ -1201,255 +810,18 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread)
{
return;
}
- _state |= Done | OK | Sent;
- _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
- _monitor.notifyAll();
}
- if(!_callback || !_callback->hasSentCallback())
+ if(sent(true))
{
- _observer.detach();
- }
- else
- {
- //
- // _sentSynchronously is immutable here.
- //
- if(!_sentSynchronously || !userThread)
+ if(userThread)
{
- __invokeSentAsync();
+ _sentSynchronously = true;
+ invokeSent();
}
else
{
- AsyncResult::__invokeSent();
- }
- }
-}
-
-IceInternal::GetConnectionOutgoingAsync::GetConnectionOutgoingAsync(const Ice::ObjectPrx& prx,
- const std::string& operation,
- const CallbackBasePtr& delegate,
- const Ice::LocalObjectPtr& cookie) :
- AsyncResult(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie),
- _proxy(prx),
- _cnt(0)
-{
- _observer.attach(prx.get(), operation, 0);
-}
-
-void
-IceInternal::GetConnectionOutgoingAsync::__invoke()
-{
- while(true)
- {
- try
- {
- _handler = _proxy->__getRequestHandler();
- _handler->sendAsyncRequest(this);
- }
- catch(const RetryException&)
- {
- _proxy->__setRequestHandler(_handler, 0);
- }
- catch(const Ice::Exception& ex)
- {
- handleException(ex);
+ invokeSentAsync();
}
- break;
- }
-}
-
-AsyncStatus
-IceInternal::GetConnectionOutgoingAsync::__send(const Ice::ConnectionIPtr&, bool, bool)
-{
- __sent();
- return AsyncStatusSent;
-}
-
-AsyncStatus
-IceInternal::GetConnectionOutgoingAsync::__invokeCollocated(CollocatedRequestHandler*)
-{
- __sent();
- return AsyncStatusSent;
-}
-
-bool
-IceInternal::GetConnectionOutgoingAsync::__sent()
-{
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- _state |= Done;
- _monitor.notifyAll();
- }
- __invokeCompleted();
- return false;
-}
-
-void
-IceInternal::GetConnectionOutgoingAsync::__invokeSent()
-{
- // No sent callback
-}
-
-void
-IceInternal::GetConnectionOutgoingAsync::__finished(const Ice::Exception& exc)
-{
- try
- {
- handleException(exc);
- }
- catch(const Ice::Exception& ex)
- {
- __invokeException(ex);
- }
-}
-
-void
-IceInternal::GetConnectionOutgoingAsync::__processRetry()
-{
- __invoke();
-}
-
-void
-IceInternal::GetConnectionOutgoingAsync::handleException(const Ice::Exception& exc)
-{
- try
- {
- _instance->retryQueue()->add(this, _proxy->__handleException(exc, _handler, Ice::Idempotent, false, _cnt));
- _observer.retried(); // Invocation is being retried.
- }
- catch(const Ice::Exception& ex)
- {
- _observer.failed(ex.ice_name());
- throw;
- }
-}
-
-namespace
-{
-
-//
-// Dummy class derived from CallbackBase
-// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn,
-// this allows us to test whether the user supplied a null delegate instance to the
-// generated begin_ method without having to generate a separate test to throw IllegalArgumentException
-// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated
-// object code.
-//
-class DummyCallback : public CallbackBase
-{
-public:
-
- DummyCallback()
- {
- }
-
- virtual void
- completed(const Ice::AsyncResultPtr&) const
- {
- assert(false);
- }
-
- virtual CallbackBasePtr
- verify(const Ice::LocalObjectPtr&)
- {
- //
- // Called by the AsyncResult constructor to verify the delegate. The dummy
- // delegate is passed when the user used a begin_ method without delegate.
- // By returning 0 here, we tell the AsyncResult that no delegates was
- // provided.
- //
- return 0;
- }
-
- virtual void
- sent(const AsyncResultPtr&) const
- {
- assert(false);
- }
-
- virtual bool
- hasSentCallback() const
- {
- assert(false);
- return false;
- }
-};
-
-}
-
-//
-// This gives a pointer value to compare against in the generated
-// begin_ method to decide whether the caller passed a null pointer
-// versus the generated inline version of the begin_ method having
-// passed a pointer to the dummy delegate.
-//
-CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback;
-
-#ifdef ICE_CPP11
-
-Ice::CallbackPtr
-Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& completed,
- const ::IceInternal::Function<void (const AsyncResultPtr&)>& sent)
-{
- class Cpp11CB : public GenericCallbackBase
- {
- public:
-
- Cpp11CB(const ::std::function<void (const AsyncResultPtr&)>& completed,
- const ::std::function<void (const AsyncResultPtr&)>& sent) :
- _completed(completed),
- _sent(sent)
- {
- checkCallback(true, completed != nullptr);
- }
-
- virtual void
- completed(const AsyncResultPtr& result) const
- {
- _completed(result);
- }
-
- virtual CallbackBasePtr
- verify(const LocalObjectPtr&)
- {
- return this; // Nothing to do, the cookie is not type-safe.
- }
-
- virtual void
- sent(const AsyncResultPtr& result) const
- {
- if(_sent != nullptr)
- {
- _sent(result);
- }
- }
-
- virtual bool
- hasSentCallback() const
- {
- return _sent != nullptr;
- }
-
- private:
-
- ::std::function< void (const AsyncResultPtr&)> _completed;
- ::std::function< void (const AsyncResultPtr&)> _sent;
- };
-
- return new Cpp11CB(completed, sent);
-}
-#endif
-
-void
-IceInternal::CallbackBase::checkCallback(bool obj, bool cb)
-{
- if(!obj)
- {
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback object cannot be null");
- }
- if(!cb)
- {
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback cannot be null");
}
}
diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp
index f455f5e6361..13bd0120a8f 100644
--- a/cpp/src/Ice/Proxy.cpp
+++ b/cpp/src/Ice/Proxy.cpp
@@ -217,15 +217,15 @@ IceProxy::Ice::Object::begin_ice_isA(const string& typeId,
OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_isA_name, del, cookie);
try
{
- __result->__prepare(ice_isA_name, Nonmutating, ctx);
- IceInternal::BasicStream* __os = __result->__startWriteParams(DefaultFormat);
+ __result->prepare(ice_isA_name, Nonmutating, ctx);
+ IceInternal::BasicStream* __os = __result->startWriteParams(DefaultFormat);
__os->write(typeId);
- __result->__endWriteParams();
- __result->__invoke(true);
+ __result->endWriteParams();
+ __result->invoke();
}
catch(const Exception& __ex)
{
- __result->__invokeExceptionAsync(__ex);
+ __result->abort(__ex);
}
return __result;
}
@@ -233,12 +233,11 @@ IceProxy::Ice::Object::begin_ice_isA(const string& typeId,
#ifdef ICE_CPP11
Ice::AsyncResultPtr
-IceProxy::Ice::Object::__begin_ice_isA(
- const ::std::string& typeId,
- const ::Ice::Context* ctx,
- const ::IceInternal::Function<void (bool)>& response,
- const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception,
- const ::IceInternal::Function<void (bool)>& sent)
+IceProxy::Ice::Object::__begin_ice_isA(const ::std::string& typeId,
+ const ::Ice::Context* ctx,
+ const ::IceInternal::Function<void (bool)>& response,
+ const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception,
+ const ::IceInternal::Function<void (bool)>& sent)
{
class Cpp11CB : public ::IceInternal::Cpp11FnCallbackNC
{
@@ -281,11 +280,10 @@ IceProxy::Ice::Object::__begin_ice_isA(
}
Ice::AsyncResultPtr
-IceProxy::Ice::Object::__begin_ice_id(
- const ::Ice::Context* ctx,
- const ::IceInternal::Function<void (const ::std::string&)>& response,
- const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception,
- const ::IceInternal::Function<void (bool)>& sent)
+IceProxy::Ice::Object::__begin_ice_id(const ::Ice::Context* ctx,
+ const ::IceInternal::Function<void (const ::std::string&)>& response,
+ const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception,
+ const ::IceInternal::Function<void (bool)>& sent)
{
class Cpp11CB : public ::IceInternal::Cpp11FnCallbackNC
{
@@ -573,13 +571,13 @@ IceProxy::Ice::Object::begin_ice_ping(const Context* ctx,
OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_ping_name, del, cookie);
try
{
- __result->__prepare(ice_ping_name, Nonmutating, ctx);
- __result->__writeEmptyParams();
- __result->__invoke(true);
+ __result->prepare(ice_ping_name, Nonmutating, ctx);
+ __result->writeEmptyParams();
+ __result->invoke();
}
catch(const Exception& __ex)
{
- __result->__invokeExceptionAsync(__ex);
+ __result->abort(__ex);
}
return __result;
}
@@ -647,13 +645,13 @@ IceProxy::Ice::Object::begin_ice_ids(const Context* ctx,
OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_ids_name, del, cookie);
try
{
- __result->__prepare(ice_ids_name, Nonmutating, ctx);
- __result->__writeEmptyParams();
- __result->__invoke(true);
+ __result->prepare(ice_ids_name, Nonmutating, ctx);
+ __result->writeEmptyParams();
+ __result->invoke();
}
catch(const Exception& __ex)
{
- __result->__invokeExceptionAsync(__ex);
+ __result->abort(__ex);
}
return __result;
}
@@ -690,13 +688,13 @@ IceProxy::Ice::Object::begin_ice_id(const Context* ctx,
OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_id_name, del, cookie);
try
{
- __result->__prepare(ice_id_name, Nonmutating, ctx);
- __result->__writeEmptyParams();
- __result->__invoke(true);
+ __result->prepare(ice_id_name, Nonmutating, ctx);
+ __result->writeEmptyParams();
+ __result->invoke();
}
catch(const Exception& __ex)
{
- __result->__invokeExceptionAsync(__ex);
+ __result->abort(__ex);
}
return __result;
}
@@ -818,13 +816,13 @@ IceProxy::Ice::Object::begin_ice_invoke(const string& operation,
OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_invoke_name, del, cookie);
try
{
- __result->__prepare(operation, mode, ctx);
- __result->__writeParamEncaps(inEncaps.first, static_cast<Int>(inEncaps.second - inEncaps.first));
- __result->__invoke(true);
+ __result->prepare(operation, mode, ctx);
+ __result->writeParamEncaps(inEncaps.first, static_cast<Int>(inEncaps.second - inEncaps.first));
+ __result->invoke();
}
catch(const Exception& __ex)
{
- __result->__invokeExceptionAsync(__ex);
+ __result->abort(__ex);
}
return __result;
}
@@ -1389,17 +1387,16 @@ AsyncResultPtr
IceProxy::Ice::Object::begin_ice_getConnectionInternal(const ::IceInternal::CallbackBasePtr& del,
const ::Ice::LocalObjectPtr& cookie)
{
- ::IceInternal::GetConnectionOutgoingAsyncPtr __result =
- new ::IceInternal::GetConnectionOutgoingAsync(this, ice_getConnection_name, del, cookie);
+ ProxyGetConnectionPtr result = new ProxyGetConnection(this, ice_getConnection_name, del, cookie);
try
{
- __result->__invoke();
+ result->invoke();
}
- catch(const Exception& __ex)
+ catch(const Exception& ex)
{
- __result->__invokeExceptionAsync(__ex);
+ result->abort(ex);
}
- return __result;
+ return result;
}
ConnectionPtr
@@ -1435,32 +1432,31 @@ IceProxy::Ice::Object::ice_getCachedConnection() const
void
IceProxy::Ice::Object::ice_flushBatchRequests()
{
- BatchOutgoing __og(this, ice_flushBatchRequests_name);
- __og.invoke();
+ FlushBatch og(this, ice_flushBatchRequests_name);
+ og.invoke();
}
::Ice::AsyncResultPtr
IceProxy::Ice::Object::begin_ice_flushBatchRequestsInternal(const ::IceInternal::CallbackBasePtr& del,
const ::Ice::LocalObjectPtr& cookie)
{
- ::IceInternal::ProxyBatchOutgoingAsyncPtr __result =
- new ::IceInternal::ProxyBatchOutgoingAsync(this, ice_flushBatchRequests_name, del, cookie);
+ ProxyFlushBatchPtr result = new ProxyFlushBatch(this, ice_flushBatchRequests_name, del, cookie);
try
{
- __result->__invoke();
+ result->invoke();
}
- catch(const Exception& __ex)
+ catch(const Exception& ex)
{
- __result->__invokeExceptionAsync(__ex);
+ result->abort(ex);
}
- return __result;
+ return result;
}
void
-IceProxy::Ice::Object::end_ice_flushBatchRequests(const AsyncResultPtr& __result)
+IceProxy::Ice::Object::end_ice_flushBatchRequests(const AsyncResultPtr& result)
{
- AsyncResult::__check(__result, this, ice_flushBatchRequests_name);
- __result->__wait();
+ AsyncResult::__check(result, this, ice_flushBatchRequests_name);
+ result->__wait();
}
Int
diff --git a/cpp/src/Ice/ProxyFactory.cpp b/cpp/src/Ice/ProxyFactory.cpp
index 68ca8b02598..fb2aea3e337 100644
--- a/cpp/src/Ice/ProxyFactory.cpp
+++ b/cpp/src/Ice/ProxyFactory.cpp
@@ -223,7 +223,7 @@ IceInternal::ProxyFactory::checkRetryAfterException(const LocalException& ex, co
//
// Don't retry invocation timeouts.
//
- if(dynamic_cast<const InvocationTimeoutException*>(&ex))
+ if(dynamic_cast<const InvocationTimeoutException*>(&ex) || dynamic_cast<const InvocationCanceledException*>(&ex))
{
ex.ice_throw();
}
diff --git a/cpp/src/Ice/RequestHandler.cpp b/cpp/src/Ice/RequestHandler.cpp
index 2cbf7826213..07ec753912d 100644
--- a/cpp/src/Ice/RequestHandler.cpp
+++ b/cpp/src/Ice/RequestHandler.cpp
@@ -14,6 +14,7 @@ using namespace std;
using namespace IceInternal;
IceUtil::Shared* IceInternal::upCast(RequestHandler* p) { return p; }
+IceUtil::Shared* IceInternal::upCast(CancellationHandler* p) { return p; }
RetryException::RetryException(const Ice::LocalException& ex)
{
@@ -32,10 +33,6 @@ RetryException::get() const
return _ex.get();
}
-RequestHandler::~RequestHandler()
-{
-}
-
RequestHandler::RequestHandler(const ReferencePtr& reference) :
_reference(reference),
_response(reference->getMode() == Reference::ModeTwoway)
diff --git a/cpp/src/Ice/RequestHandler.h b/cpp/src/Ice/RequestHandler.h
index 73c3e818b56..68ff00d647d 100644
--- a/cpp/src/Ice/RequestHandler.h
+++ b/cpp/src/Ice/RequestHandler.h
@@ -31,7 +31,7 @@ namespace IceInternal
class BasicStream;
-class OutgoingMessageCallback;
+class OutgoingBase;
//
// An exception wrapper, which is used to notify that the request
@@ -51,11 +51,17 @@ private:
IceUtil::UniquePtr<Ice::LocalException> _ex;
};
-class RequestHandler : virtual public ::IceUtil::Shared
+class CancellationHandler : virtual public IceUtil::Shared
{
public:
- virtual ~RequestHandler();
+ virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&) = 0;
+ virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&) = 0;
+};
+
+class RequestHandler : public CancellationHandler
+{
+public:
virtual RequestHandlerPtr connect() = 0;
virtual RequestHandlerPtr update(const RequestHandlerPtr&, const RequestHandlerPtr&) = 0;
@@ -64,11 +70,8 @@ public:
virtual void finishBatchRequest(BasicStream*) = 0;
virtual void abortBatchRequest() = 0;
- virtual bool sendRequest(OutgoingMessageCallback*) = 0;
- virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr&) = 0;
-
- virtual void requestTimedOut(OutgoingMessageCallback*) = 0;
- virtual void asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr&) = 0;
+ virtual bool sendRequest(OutgoingBase*) = 0;
+ virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncBasePtr&) = 0;
const ReferencePtr& getReference() const { return _reference; } // Inlined for performances.
diff --git a/cpp/src/Ice/RetryQueue.cpp b/cpp/src/Ice/RetryQueue.cpp
index adfb6bf9440..c70ed379edd 100644
--- a/cpp/src/Ice/RetryQueue.cpp
+++ b/cpp/src/Ice/RetryQueue.cpp
@@ -18,7 +18,7 @@ using namespace IceInternal;
IceUtil::Shared* IceInternal::upCast(RetryQueue* p) { return p; }
-IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const AsyncResultPtr& outAsync) :
+IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const ProxyOutgoingAsyncBasePtr& outAsync) :
_queue(queue), _outAsync(outAsync)
{
}
@@ -26,14 +26,7 @@ IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const AsyncResultP
void
IceInternal::RetryTask::runTimerTask()
{
- try
- {
- _outAsync->__processRetry();
- }
- catch(const Ice::LocalException& ex)
- {
- _outAsync->__invokeExceptionAsync(ex);
- }
+ _outAsync->retry(); // Retry again the invocation.
//
// NOTE: this must be called last, destroy() blocks until all task
@@ -44,10 +37,37 @@ IceInternal::RetryTask::runTimerTask()
_queue->remove(this);
}
+void
+IceInternal::RetryTask::requestCanceled(OutgoingBase*, const Ice::LocalException&)
+{
+ assert(false);
+}
+
+void
+IceInternal::RetryTask::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException&)
+{
+ if(_queue->cancel(this))
+ {
+ //
+ // We just retry the outgoing async now rather than marking it
+ // as finished. The retry will check for the cancellation
+ // exception and terminate appropriately the request.
+ //
+ _outAsync->retry();
+ }
+}
+
void
IceInternal::RetryTask::destroy()
{
- _outAsync->__invokeExceptionAsync(CommunicatorDestroyedException(__FILE__, __LINE__));
+ try
+ {
+ _outAsync->abort(CommunicatorDestroyedException(__FILE__, __LINE__));
+ }
+ catch(const CommunicatorDestroyedException&)
+ {
+ // Abort shouldn't throw if there's no callback, ignore.
+ }
}
bool
@@ -61,7 +81,7 @@ IceInternal::RetryQueue::RetryQueue(const InstancePtr& instance) : _instance(ins
}
void
-IceInternal::RetryQueue::add(const AsyncResultPtr& out, int interval)
+IceInternal::RetryQueue::add(const ProxyOutgoingAsyncBasePtr& out, int interval)
{
Lock sync(*this);
if(!_instance)
@@ -78,6 +98,7 @@ IceInternal::RetryQueue::add(const AsyncResultPtr& out, int interval)
throw CommunicatorDestroyedException(__FILE__, __LINE__);
}
_requests.insert(task);
+ out->cancelable(task);
}
void
@@ -119,4 +140,17 @@ IceInternal::RetryQueue::remove(const RetryTaskPtr& task)
}
}
-
+bool
+IceInternal::RetryQueue::cancel(const RetryTaskPtr& task)
+{
+ Lock sync(*this);
+ if(_requests.erase(task) > 0)
+ {
+ if(!_instance && _requests.empty())
+ {
+ notify(); // If we are destroying the queue, destroy is probably waiting on the queue to be empty.
+ }
+ return _instance->timer()->cancel(task);
+ }
+ return false;
+}
diff --git a/cpp/src/Ice/RetryQueue.h b/cpp/src/Ice/RetryQueue.h
index 5a0b7cf52a2..71c9c946d8d 100644
--- a/cpp/src/Ice/RetryQueue.h
+++ b/cpp/src/Ice/RetryQueue.h
@@ -16,25 +16,31 @@
#include <Ice/RetryQueueF.h>
#include <Ice/OutgoingAsyncF.h>
#include <Ice/InstanceF.h>
+#include <Ice/RequestHandler.h> // For CancellationHandler
namespace IceInternal
{
-class RetryTask : public IceUtil::TimerTask
+class RetryTask : public IceUtil::TimerTask, public CancellationHandler
{
public:
- RetryTask(const RetryQueuePtr&, const Ice::AsyncResultPtr&);
+ RetryTask(const RetryQueuePtr&, const ProxyOutgoingAsyncBasePtr&);
virtual void runTimerTask();
+
+ virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&);
+ virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&);
+
void destroy();
bool operator<(const RetryTask&) const;
+
private:
const RetryQueuePtr _queue;
- const Ice::AsyncResultPtr _outAsync;
+ const ProxyOutgoingAsyncBasePtr _outAsync;
};
typedef IceUtil::Handle<RetryTask> RetryTaskPtr;
@@ -44,12 +50,13 @@ public:
RetryQueue(const InstancePtr&);
- void add(const Ice::AsyncResultPtr&, int);
+ void add(const ProxyOutgoingAsyncBasePtr&, int);
void destroy();
private:
void remove(const RetryTaskPtr&);
+ bool cancel(const RetryTaskPtr&);
friend class RetryTask;
InstancePtr _instance;
diff --git a/cpp/src/Ice/winrt/Makefile.mak b/cpp/src/Ice/winrt/Makefile.mak
index 7eda814e8f6..c05ba0bc7e0 100644
--- a/cpp/src/Ice/winrt/Makefile.mak
+++ b/cpp/src/Ice/winrt/Makefile.mak
@@ -7,13 +7,14 @@
#
# **********************************************************************
-top_srcdir = ..\..\..
+top_srcdir = ..\..\..
LIBNAME = $(SDK_LIBRARY_PATH)\ice.lib
TARGETS = $(LIBNAME)
SOURCE_DIR = ..
OBJS = $(ARCH)\$(CONFIG)\Acceptor.obj \
$(ARCH)\$(CONFIG)\ACM.obj \
+ $(ARCH)\$(CONFIG)\AsyncResult.obj \
$(ARCH)\$(CONFIG)\Base64.obj \
$(ARCH)\$(CONFIG)\Buffer.obj \
$(ARCH)\$(CONFIG)\BasicStream.obj \
diff --git a/cpp/src/IceDiscovery/LocatorI.h b/cpp/src/IceDiscovery/LocatorI.h
index 700f4305129..bb48d8cb509 100644
--- a/cpp/src/IceDiscovery/LocatorI.h
+++ b/cpp/src/IceDiscovery/LocatorI.h
@@ -13,6 +13,8 @@
#include <Ice/Locator.h>
#include <Ice/ProxyF.h>
+#include <set>
+
namespace IceDiscovery
{
diff --git a/cpp/src/IceDiscovery/LookupI.h b/cpp/src/IceDiscovery/LookupI.h
index 1a249b4e0da..58e1e7ca044 100644
--- a/cpp/src/IceDiscovery/LookupI.h
+++ b/cpp/src/IceDiscovery/LookupI.h
@@ -13,6 +13,7 @@
#include <IceDiscovery/IceDiscovery.h>
#include <IceDiscovery/LocatorI.h>
+#include <IceUtil/Timer.h>
#include <Ice/Properties.h>
namespace IceDiscovery
diff --git a/cpp/src/IceGrid/AdapterCache.h b/cpp/src/IceGrid/AdapterCache.h
index 8c37a623def..78258c2ff54 100644
--- a/cpp/src/IceGrid/AdapterCache.h
+++ b/cpp/src/IceGrid/AdapterCache.h
@@ -16,6 +16,8 @@
#include <IceGrid/Query.h>
#include <IceGrid/Internal.h>
+#include <set>
+
namespace IceGrid
{
diff --git a/cpp/src/slice2cpp/Gen.cpp b/cpp/src/slice2cpp/Gen.cpp
index 5bf6c662656..57f3337c06f 100644
--- a/cpp/src/slice2cpp/Gen.cpp
+++ b/cpp/src/slice2cpp/Gen.cpp
@@ -373,8 +373,7 @@ Slice::Gen::generate(const UnitPtr& p)
{
H << "\n#include <Ice/Proxy.h>";
H << "\n#include <Ice/GCObject.h>";
- H << "\n#include <Ice/Outgoing.h>";
- H << "\n#include <Ice/OutgoingAsync.h>";
+ H << "\n#include <Ice/AsyncResult.h>";
H << "\n#include <Ice/Incoming.h>";
if(p->hasContentsWithMetaData("amd"))
{
@@ -382,11 +381,14 @@ Slice::Gen::generate(const UnitPtr& p)
}
C << "\n#include <Ice/LocalException.h>";
C << "\n#include <Ice/ObjectFactory.h>";
+ C << "\n#include <Ice/Outgoing.h>";
+ C << "\n#include <Ice/OutgoingAsync.h>";
}
else if(p->hasLocalClassDefsWithAsync())
{
H << "\n#include <Ice/Proxy.h>";
- H << "\n#include <Ice/OutgoingAsync.h>";
+ H << "\n#include <Ice/AsyncResult.h>";
+ C << "\n#include <Ice/OutgoingAsync.h>";
}
else if(p->hasNonLocalClassDecls())
{
@@ -2184,26 +2186,26 @@ Slice::Gen::ProxyVisitor::visitOperation(const OperationPtr& p)
C << flatName << ", __del, __cookie);";
C << nl << "try";
C << sb;
- C << nl << "__result->__prepare(" << flatName << ", " << operationModeToString(p->sendMode()) << ", __ctx);";
+ C << nl << "__result->prepare(" << flatName << ", " << operationModeToString(p->sendMode()) << ", __ctx);";
if(inParams.empty())
{
- C << nl << "__result->__writeEmptyParams();";
+ C << nl << "__result->writeEmptyParams();";
}
else
{
- C << nl << "::IceInternal::BasicStream* __os = __result->__startWriteParams(" << opFormatTypeToString(p) <<");";
+ C << nl << "::IceInternal::BasicStream* __os = __result->startWriteParams(" << opFormatTypeToString(p) <<");";
writeMarshalCode(C, inParams, 0, TypeContextInParam);
if(p->sendsClasses(false))
{
C << nl << "__os->writePendingObjects();";
}
- C << nl << "__result->__endWriteParams();";
+ C << nl << "__result->endWriteParams();";
}
- C << nl << "__result->__invoke(true);";
+ C << nl << "__result->invoke();";
C << eb;
C << nl << "catch(const ::Ice::Exception& __ex)";
C << sb;
- C << nl << "__result->__invokeExceptionAsync(__ex);";
+ C << nl << "__result->abort(__ex);";
C << eb;
C << nl << "return __result;";
C << eb;
diff --git a/cpp/src/slice2cs/Gen.cpp b/cpp/src/slice2cs/Gen.cpp
index b877d04ce69..8ba2d479750 100644
--- a/cpp/src/slice2cs/Gen.cpp
+++ b/cpp/src/slice2cs/Gen.cpp
@@ -5312,8 +5312,8 @@ Slice::Gen::HelperVisitor::visitClassDefStart(const ClassDefPtr& p)
_out << sb;
if(op->returnsData())
{
- _out << nl << "IceInternal.OutgoingAsync outAsync__ = (IceInternal.OutgoingAsync)r__;";
- _out << nl << "IceInternal.OutgoingAsync.check(outAsync__, this, " << flatName << ");";
+ _out << nl << "IceInternal.OutgoingAsync outAsync__ = IceInternal.OutgoingAsync.check(r__, this, "
+ << flatName << ");";
_out << nl << "try";
_out << sb;
@@ -5480,11 +5480,11 @@ Slice::Gen::HelperVisitor::visitClassDefStart(const ClassDefPtr& p)
_out << nl << "result__.writeEmptyParams();";
}
- _out << nl << "result__.invoke(true);";
+ _out << nl << "result__.invoke();";
_out << eb;
_out << nl << "catch(Ice.Exception ex__)";
_out << sb;
- _out << nl << "result__.invokeExceptionAsync(ex__);";
+ _out << nl << "result__.abort(ex__);";
_out << eb;
_out << nl << "return result__;";
_out << eb;
diff --git a/cpp/src/slice2java/Gen.cpp b/cpp/src/slice2java/Gen.cpp
index a288cfd8290..8eaccb5650e 100644
--- a/cpp/src/slice2java/Gen.cpp
+++ b/cpp/src/slice2java/Gen.cpp
@@ -4752,8 +4752,8 @@ Slice::Gen::HelperVisitor::visitClassDefStart(const ClassDefPtr& p)
out << sb;
if(op->returnsData())
{
- out << nl << "IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult;";
- out << nl << "IceInternal.OutgoingAsyncBase.check(__result, this, __" << op->name() << "_name);";
+ out << nl << "IceInternal.OutgoingAsync __result = IceInternal.OutgoingAsync.check(__iresult, this, __"
+ << op->name() << "_name);";
out << nl << "try";
out << sb;
@@ -5737,11 +5737,11 @@ Slice::Gen::HelperVisitor::writeOperation(const ClassDefPtr& p, const string& pa
out << nl << "__result.writeEmptyParams();";
}
- out << nl << "__result.invoke(true);";
+ out << nl << "__result.invoke();";
out << eb;
out << nl << "catch(Ice.Exception __ex)";
out << sb;
- out << nl << "__result.invokeExceptionAsync(__ex);";
+ out << nl << "__result.abort(__ex);";
out << eb;
out << nl << "return __result;";
out << eb;