summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp1080
1 files changed, 886 insertions, 194 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index e7cc8c65540..d7ffc2f4d6d 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -17,91 +17,566 @@
#include <Ice/ImplicitContextI.h>
#include <Ice/ThreadPool.h>
#include <Ice/RetryQueue.h>
+#include <Ice/ConnectionFactory.h>
+#include <Ice/ObjectAdapterFactory.h>
+#include <Ice/LoggerUtil.h>
using namespace std;
using namespace Ice;
using namespace IceInternal;
+#ifndef ICE_CPP11_MAPPING
IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; }
IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; }
IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; }
IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; }
+#endif
+
+const unsigned char OutgoingAsyncBase::OK = 0x1;
+const unsigned char OutgoingAsyncBase::Sent = 0x2;
+#ifndef ICE_CPP11_MAPPING
+const unsigned char OutgoingAsyncBase::Done = 0x4;
+const unsigned char OutgoingAsyncBase::EndCalled = 0x8;
+#endif
+
+OutgoingAsyncCompletionCallback::~OutgoingAsyncCompletionCallback()
+{
+ // Out of line to avoid weak vtable
+}
bool
OutgoingAsyncBase::sent()
{
- return sent(true);
+ return sentImpl(true);
}
bool
-OutgoingAsyncBase::completed(const Exception& ex)
+OutgoingAsyncBase::exception(const Exception& ex)
{
- return finished(ex);
+ return exceptionImpl(ex);
}
bool
-OutgoingAsyncBase::completed()
+OutgoingAsyncBase::response()
{
assert(false); // Must be overriden by request that can handle responses
return false;
}
-BasicStream*
-OutgoingAsyncBase::getIs()
+void
+OutgoingAsyncBase::invokeSentAsync()
{
- return 0; // Must be overriden by request that can handle responses
+ class AsynchronousSent : public DispatchWorkItem
+ {
+ public:
+
+ AsynchronousSent(const ConnectionPtr& connection, const OutgoingAsyncBasePtr& outAsync) :
+ DispatchWorkItem(connection), _outAsync(outAsync)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ _outAsync->invokeSent();
+ }
+
+ private:
+
+ const OutgoingAsyncBasePtr _outAsync;
+ };
+
+ //
+ // This is called when it's not safe to call the sent callback
+ // synchronously from this thread. Instead the exception callback
+ // is called asynchronously from the client thread pool.
+ //
+ try
+ {
+ _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, ICE_SHARED_FROM_THIS));
+ }
+ catch(const Ice::CommunicatorDestroyedException&)
+ {
+ }
}
-OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator,
- const InstancePtr& instance,
- const string& operation,
- const CallbackBasePtr& delegate,
- const LocalObjectPtr& cookie) :
- AsyncResult(communicator, instance, operation, delegate, cookie),
- _os(instance.get(), Ice::currentProtocolEncoding)
+void
+OutgoingAsyncBase::invokeExceptionAsync()
+{
+ class AsynchronousException : public DispatchWorkItem
+ {
+ public:
+
+ AsynchronousException(const ConnectionPtr& c, const OutgoingAsyncBasePtr& outAsync) :
+ DispatchWorkItem(c), _outAsync(outAsync)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ _outAsync->invokeException();
+ }
+
+ private:
+
+ const OutgoingAsyncBasePtr _outAsync;
+ };
+
+ //
+ // CommunicatorDestroyedCompleted is the only exception that can propagate directly
+ // from this method.
+ //
+ _instance->clientThreadPool()->dispatch(new AsynchronousException(_cachedConnection, ICE_SHARED_FROM_THIS));
+}
+
+void
+OutgoingAsyncBase::invokeResponseAsync()
+{
+ class AsynchronousResponse : public DispatchWorkItem
+ {
+ public:
+
+ AsynchronousResponse(const ConnectionPtr& connection, const OutgoingAsyncBasePtr& outAsync) :
+ DispatchWorkItem(connection), _outAsync(outAsync)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ _outAsync->invokeResponse();
+ }
+
+ private:
+
+ const OutgoingAsyncBasePtr _outAsync;
+ };
+
+ //
+ // CommunicatorDestroyedCompleted is the only exception that can propagate directly
+ // from this method.
+ //
+ _instance->clientThreadPool()->dispatch(new AsynchronousResponse(_cachedConnection, ICE_SHARED_FROM_THIS));
+}
+
+void
+OutgoingAsyncBase::invokeSent()
+{
+ try
+ {
+ handleInvokeSent(_sentSynchronously, this);
+ }
+ catch(const std::exception& ex)
+ {
+ warning(ex);
+ }
+ catch(...)
+ {
+ warning();
+ }
+
+ if(_observer && _doneInSent)
+ {
+ _observer.detach();
+ }
+}
+
+void
+OutgoingAsyncBase::invokeException()
+{
+ try
+ {
+ handleInvokeException(*_ex, this);
+ }
+ catch(const std::exception& ex)
+ {
+ warning(ex);
+ }
+ catch(...)
+ {
+ warning();
+ }
+
+ _observer.detach();
+}
+
+void
+OutgoingAsyncBase::invokeResponse()
+{
+ if(_ex)
+ {
+ invokeException();
+ return;
+ }
+
+ try
+ {
+#ifdef ICE_CPP11_MAPPING
+ try
+ {
+ handleInvokeResponse(_state & OK, this);
+ }
+ catch(const Ice::Exception& ex)
+ {
+ if(handleException(ex))
+ {
+ handleInvokeException(ex, this);
+ }
+ }
+ catch(const exception_ptr& ex)
+ {
+ rethrow_exception(ex);
+ }
+#else
+ handleInvokeResponse(_state & OK, this);
+#endif
+ }
+ catch(const std::exception& ex)
+ {
+ warning(ex);
+ }
+ catch(...)
+ {
+ warning();
+ }
+
+ _observer.detach();
+}
+
+void
+OutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler)
+{
+ Lock sync(_m);
+ if(_cancellationException)
+ {
+ try
+ {
+ _cancellationException->ice_throw();
+ }
+ catch(const Ice::LocalException&)
+ {
+ _cancellationException.reset();
+ throw;
+ }
+ }
+ _cancellationHandler = handler;
+}
+
+void
+OutgoingAsyncBase::cancel()
+{
+ cancel(Ice::InvocationCanceledException(__FILE__, __LINE__));
+}
+
+OutgoingAsyncBase::OutgoingAsyncBase(const InstancePtr& instance) :
+ _instance(instance),
+ _sentSynchronously(false),
+ _doneInSent(false),
+ _state(0),
+ _os(instance.get(), Ice::currentProtocolEncoding),
+ _is(instance.get(), Ice::currentProtocolEncoding)
{
}
bool
-OutgoingAsyncBase::sent(bool done)
+OutgoingAsyncBase::sentImpl(bool done)
{
+ Lock sync(_m);
+ bool alreadySent = (_state & Sent) > 0;
+ _state |= Sent;
if(done)
{
+ _doneInSent = true;
_childObserver.detach();
+ _cancellationHandler = 0;
+ }
+
+#ifndef ICE_CPP11_MAPPING
+ if(done)
+ {
+ _state |= Done | OK;
}
- return AsyncResult::sent(done);
+ _m.notifyAll();
+#endif
+
+ bool invoke = handleSent(done, alreadySent);
+ if(!invoke && _doneInSent)
+ {
+ _observer.detach();
+ }
+ return invoke;
}
bool
-OutgoingAsyncBase::finished(const Exception& ex)
+OutgoingAsyncBase::exceptionImpl(const Exception& ex)
{
+ Lock sync(_m);
+ ICE_SET_EXCEPTION_FROM_CLONE(_ex, ex.ice_clone());
if(_childObserver)
{
- _childObserver.failed(ex.ice_name());
+ _childObserver.failed(ex.ice_id());
_childObserver.detach();
}
- return AsyncResult::finished(ex);
+ _cancellationHandler = 0;
+ _observer.failed(ex.ice_id());
+
+#ifndef ICE_CPP11_MAPPING
+ _state |= Done;
+ _m.notifyAll();
+#endif
+
+ bool invoke = handleException(ex);
+ if(!invoke)
+ {
+ _observer.detach();
+ }
+ return invoke;
}
-Ice::ObjectPrx
-ProxyOutgoingAsyncBase::getProxy() const
+bool
+OutgoingAsyncBase::responseImpl(bool ok)
{
- return _proxy;
+ Lock sync(_m);
+ if(ok)
+ {
+ _state |= OK;
+ }
+
+ _cancellationHandler = 0;
+
+#ifndef ICE_CPP11_MAPPING
+ _state |= Done;
+ _m.notifyAll();
+#endif
+
+ bool invoke;
+ try
+ {
+ invoke = handleResponse(ok);
+ }
+ catch(const Ice::Exception& ex)
+ {
+ ICE_SET_EXCEPTION_FROM_CLONE(_ex, ex.ice_clone());
+ invoke = handleException(ex);
+ }
+ if(!invoke)
+ {
+ _observer.detach();
+ }
+ return invoke;
+}
+
+void
+OutgoingAsyncBase::cancel(const Ice::LocalException& ex)
+{
+ CancellationHandlerPtr handler;
+ {
+ Lock sync(_m);
+ ICE_SET_EXCEPTION_FROM_CLONE(_cancellationException, ex.ice_clone());
+ if(!_cancellationHandler)
+ {
+ return;
+ }
+ handler = _cancellationHandler;
+ }
+ handler->asyncRequestCanceled(ICE_SHARED_FROM_THIS, ex);
+}
+
+#ifndef ICE_CPP11_MAPPING
+
+Int
+OutgoingAsyncBase::getHash() const
+{
+ return static_cast<Int>(reinterpret_cast<Long>(this) >> 4);
+}
+
+CommunicatorPtr
+OutgoingAsyncBase::getCommunicator() const
+{
+ return 0;
+}
+
+ConnectionPtr
+OutgoingAsyncBase::getConnection() const
+{
+ return 0;
+}
+
+ObjectPrxPtr
+OutgoingAsyncBase::getProxy() const
+{
+ return 0;
+}
+
+Ice::LocalObjectPtr
+OutgoingAsyncBase::getCookie() const
+{
+ return _cookie;
+}
+
+const std::string&
+OutgoingAsyncBase::getOperation() const
+{
+ assert(false); // Must be overriden
+ static string empty;
+ return empty;
+}
+
+bool
+OutgoingAsyncBase::isCompleted() const
+{
+ Lock sync(_m);
+ return (_state & Done) > 0;
+}
+
+void
+OutgoingAsyncBase::waitForCompleted()
+{
+ Lock sync(_m);
+ while(!(_state & Done))
+ {
+ _m.wait();
+ }
+}
+
+bool
+OutgoingAsyncBase::isSent() const
+{
+ Lock sync(_m);
+ return (_state & Sent) > 0;
+}
+
+void
+OutgoingAsyncBase::waitForSent()
+{
+ Lock sync(_m);
+ while(!(_state & Sent) && !_ex.get())
+ {
+ _m.wait();
+ }
}
bool
-ProxyOutgoingAsyncBase::completed(const Exception& exc)
+OutgoingAsyncBase::sentSynchronously() const
+{
+ return _sentSynchronously;
+}
+
+void
+OutgoingAsyncBase::throwLocalException() const
+{
+ Lock sync(_m);
+ if(_ex.get())
+ {
+ _ex->ice_throw();
+ }
+}
+
+bool
+OutgoingAsyncBase::__wait()
+{
+ Lock sync(_m);
+ if(_state & EndCalled)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once");
+ }
+ _state |= EndCalled;
+ while(!(_state & Done))
+ {
+ _m.wait();
+ }
+
+ if(_ex.get())
+ {
+ _ex->ice_throw();
+ }
+ return _state & OK;
+}
+
+Ice::InputStream*
+OutgoingAsyncBase::__startReadParams()
+{
+ _is.startEncapsulation();
+ return &_is;
+}
+
+void
+OutgoingAsyncBase::__endReadParams()
+{
+ _is.endEncapsulation();
+}
+
+void
+OutgoingAsyncBase::__readEmptyParams()
+{
+ _is.skipEmptyEncapsulation();
+}
+
+void
+OutgoingAsyncBase::__readParamEncaps(const ::Ice::Byte*& encaps, ::Ice::Int& sz)
+{
+ _is.readEncapsulation(encaps, sz);
+}
+
+void
+OutgoingAsyncBase::__throwUserException()
+{
+ try
+ {
+ _is.startEncapsulation();
+ _is.throwException();
+ }
+ catch(const Ice::UserException&)
+ {
+ _is.endEncapsulation();
+ throw;
+ }
+}
+
+#endif
+
+void
+OutgoingAsyncBase::warning(const std::exception& exc) const
+{
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ Ice::Warning out(_instance->initializationData().logger);
+ const Ice::Exception* ex = dynamic_cast<const Ice::Exception*>(&exc);
+ if(ex)
+ {
+ out << "Ice::Exception raised by AMI callback:\n" << *ex;
+ }
+ else
+ {
+ out << "std::exception raised by AMI callback:\n" << exc.what();
+ }
+ }
+}
+
+void
+OutgoingAsyncBase::warning() const
+{
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ Ice::Warning out(_instance->initializationData().logger);
+ out << "unknown exception raised by AMI callback";
+ }
+}
+
+bool
+ProxyOutgoingAsyncBase::exception(const Exception& exc)
{
if(_childObserver)
{
- _childObserver.failed(exc.ice_name());
+ _childObserver.failed(exc.ice_id());
_childObserver.detach();
}
_cachedConnection = 0;
if(_proxy->__reference()->getInvocationTimeout() == -2)
{
- _instance->timer()->cancel(this);
+ _instance->timer()->cancel(ICE_SHARED_FROM_THIS);
}
//
@@ -115,16 +590,30 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc)
// the retry interval is 0. This method can be called with the
// connection locked so we can't just retry here.
//
- _instance->retryQueue()->add(this, handleException(exc));
+ _instance->retryQueue()->add(ICE_SHARED_FROM_THIS, _proxy->__handleException(exc, _handler, _mode, _sent, _cnt));
return false;
}
catch(const Exception& ex)
{
- return finished(ex); // No retries, we're done
+ return exceptionImpl(ex); // No retries, we're done
}
}
void
+ProxyOutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler)
+{
+ if(_proxy->__reference()->getInvocationTimeout() == -2 && _cachedConnection)
+ {
+ const int timeout = _cachedConnection->timeout();
+ if(timeout > 0)
+ {
+ _instance->timer()->schedule(ICE_SHARED_FROM_THIS, IceUtil::Time::milliSeconds(timeout));
+ }
+ }
+ OutgoingAsyncBase::cancelable(handler);
+}
+
+void
ProxyOutgoingAsyncBase::retryException(const Exception& ex)
{
try
@@ -136,32 +625,18 @@ ProxyOutgoingAsyncBase::retryException(const Exception& ex)
// connection to be done.
//
_proxy->__updateRequestHandler(_handler, 0); // Clear request handler and always retry.
- _instance->retryQueue()->add(this, 0);
+ _instance->retryQueue()->add(ICE_SHARED_FROM_THIS, 0);
}
catch(const Ice::Exception& exc)
{
- if(completed(exc))
+ if(exception(exc))
{
- invokeCompletedAsync();
+ invokeExceptionAsync();
}
}
}
void
-ProxyOutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler)
-{
- if(_proxy->__reference()->getInvocationTimeout() == -2 && _cachedConnection)
- {
- const int timeout = _cachedConnection->timeout();
- if(timeout > 0)
- {
- _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(timeout));
- }
- }
- AsyncResult::cancelable(handler);
-}
-
-void
ProxyOutgoingAsyncBase::retry()
{
invokeImpl(false);
@@ -172,9 +647,9 @@ ProxyOutgoingAsyncBase::abort(const Ice::Exception& ex)
{
assert(!_childObserver);
- if(finished(ex))
+ if(exceptionImpl(ex))
{
- invokeCompletedAsync();
+ invokeExceptionAsync();
}
else if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex))
{
@@ -187,18 +662,33 @@ ProxyOutgoingAsyncBase::abort(const Ice::Exception& ex)
}
}
-ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrx& prx,
- const string& operation,
- const CallbackBasePtr& delegate,
- const LocalObjectPtr& cookie) :
- OutgoingAsyncBase(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie),
+#ifndef ICE_CPP11_MAPPING
+Ice::ObjectPrx
+ProxyOutgoingAsyncBase::getProxy() const
+{
+ return _proxy;
+}
+
+Ice::CommunicatorPtr
+ProxyOutgoingAsyncBase::getCommunicator() const
+{
+ return _proxy->ice_getCommunicator();
+}
+#endif
+
+ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrxPtr& prx) :
+ OutgoingAsyncBase(prx->__reference()->getInstance()),
_proxy(prx),
- _mode(Normal),
+ _mode(ICE_ENUM(OperationMode, Normal)),
_cnt(0),
_sent(false)
{
}
+ProxyOutgoingAsyncBase::~ProxyOutgoingAsyncBase()
+{
+}
+
void
ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
{
@@ -209,7 +699,7 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
int invocationTimeout = _proxy->__reference()->getInvocationTimeout();
if(invocationTimeout > 0)
{
- _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout));
+ _instance->timer()->schedule(ICE_SHARED_FROM_THIS, IceUtil::Time::milliSeconds(invocationTimeout));
}
}
else
@@ -223,7 +713,7 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
{
_sent = false;
_handler = _proxy->__getRequestHandler();
- AsyncStatus status = _handler->sendAsyncRequest(this);
+ AsyncStatus status = _handler->sendAsyncRequest(ICE_SHARED_FROM_THIS);
if(status & AsyncStatusSent)
{
if(userThread)
@@ -252,13 +742,13 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
{
if(_childObserver)
{
- _childObserver.failed(ex.ice_name());
+ _childObserver.failed(ex.ice_id());
_childObserver.detach();
}
- int interval = handleException(ex);
+ int interval = _proxy->__handleException(ex, _handler, _mode, _sent, _cnt);
if(interval > 0)
{
- _instance->retryQueue()->add(this, interval);
+ _instance->retryQueue()->add(ICE_SHARED_FROM_THIS, interval);
return;
}
else
@@ -278,51 +768,45 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
{
throw;
}
- else if(finished(ex)) // No retries, we're done
+ else if(exceptionImpl(ex)) // No retries, we're done
{
- invokeCompletedAsync();
+ invokeExceptionAsync();
}
}
}
bool
-ProxyOutgoingAsyncBase::sent(bool done)
+ProxyOutgoingAsyncBase::sentImpl(bool done)
{
_sent = true;
if(done)
{
if(_proxy->__reference()->getInvocationTimeout() != -1)
{
- _instance->timer()->cancel(this);
+ _instance->timer()->cancel(ICE_SHARED_FROM_THIS);
}
}
- return OutgoingAsyncBase::sent(done);
+ return OutgoingAsyncBase::sentImpl(done);
}
bool
-ProxyOutgoingAsyncBase::finished(const Exception& ex)
+ProxyOutgoingAsyncBase::exceptionImpl(const Exception& ex)
{
if(_proxy->__reference()->getInvocationTimeout() != -1)
{
- _instance->timer()->cancel(this);
+ _instance->timer()->cancel(ICE_SHARED_FROM_THIS);
}
- return OutgoingAsyncBase::finished(ex);
+ return OutgoingAsyncBase::exceptionImpl(ex);
}
bool
-ProxyOutgoingAsyncBase::finished(bool ok)
+ProxyOutgoingAsyncBase::responseImpl(bool ok)
{
if(_proxy->__reference()->getInvocationTimeout() != -1)
{
- _instance->timer()->cancel(this);
+ _instance->timer()->cancel(ICE_SHARED_FROM_THIS);
}
- return AsyncResult::finished(ok);
-}
-
-int
-ProxyOutgoingAsyncBase::handleException(const Exception& exc)
-{
- return _proxy->__handleException(exc, _handler, _mode, _sent, _cnt);
+ return OutgoingAsyncBase::responseImpl(ok);
}
void
@@ -338,22 +822,20 @@ ProxyOutgoingAsyncBase::runTimerTask()
}
}
-OutgoingAsync::OutgoingAsync(const ObjectPrx& prx,
- const string& operation,
- const CallbackBasePtr& delegate,
- const LocalObjectPtr& cookie) :
- ProxyOutgoingAsyncBase(prx, operation, delegate, cookie),
- _encoding(getCompatibleEncoding(prx->__reference()->getEncoding()))
+OutgoingAsync::OutgoingAsync(const ObjectPrxPtr& prx, bool synchronous) :
+ ProxyOutgoingAsyncBase(prx),
+ _encoding(getCompatibleEncoding(prx->__reference()->getEncoding())),
+ _synchronous(synchronous)
{
}
void
-OutgoingAsync::prepare(const string& operation, OperationMode mode, const Context* context)
+OutgoingAsync::prepare(const string& operation, OperationMode mode, const Context& context)
{
checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol()));
_mode = mode;
- _observer.attach(_proxy.get(), operation, context);
+ _observer.attach(_proxy, operation, context);
switch(_proxy->__reference()->getMode())
{
@@ -394,12 +876,12 @@ OutgoingAsync::prepare(const string& operation, OperationMode mode, const Contex
_os.write(static_cast<Byte>(_mode));
- if(context != 0)
+ if(&context != &Ice::noExplicitContext)
{
//
// Explicit context
//
- _os.write(*context);
+ _os.write(context);
}
else
{
@@ -422,61 +904,11 @@ OutgoingAsync::prepare(const string& operation, OperationMode mode, const Contex
bool
OutgoingAsync::sent()
{
- return ProxyOutgoingAsyncBase::sent(!_proxy->ice_isTwoway()); // done = true if it's not a two-way proxy
-}
-
-AsyncStatus
-OutgoingAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool response)
-{
- _cachedConnection = connection;
- return connection->sendAsyncRequest(this, compress, response, 0);
-}
-
-AsyncStatus
-OutgoingAsync::invokeCollocated(CollocatedRequestHandler* handler)
-{
- return handler->invokeAsyncRequest(this, 0);
-}
-
-void
-OutgoingAsync::abort(const Exception& ex)
-{
- const Reference::Mode mode = _proxy->__reference()->getMode();
- if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
- {
- //
- // If we didn't finish a batch oneway or datagram request, we
- // must notify the connection about that we give up ownership
- // of the batch stream.
- //
- _proxy->__getBatchRequestQueue()->abortBatchRequest(&_os);
- }
-
- ProxyOutgoingAsyncBase::abort(ex);
-}
-
-void
-OutgoingAsync::invoke()
-{
- const Reference::Mode mode = _proxy->__reference()->getMode();
- if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
- {
- _sentSynchronously = true;
- _proxy->__getBatchRequestQueue()->finishBatchRequest(&_os, _proxy, getOperation());
- finished(true);
- return; // Don't call sent/completed callback for batch AMI requests
- }
-
- //
- // NOTE: invokeImpl doesn't throw so this can be called from the
- // try block with the catch block calling abort() in case of an
- // exception.
- //
- invokeImpl(true); // userThread = true
+ return ProxyOutgoingAsyncBase::sentImpl(!_proxy->ice_isTwoway()); // done = true if it's not a two-way proxy
}
bool
-OutgoingAsync::completed()
+OutgoingAsync::response()
{
//
// NOTE: this method is called from ConnectionI.parseMessage
@@ -612,22 +1044,116 @@ OutgoingAsync::completed()
}
}
- return finished(replyStatus == replyOK);
+ return responseImpl(replyStatus == replyOK);
}
catch(const Exception& ex)
{
- return completed(ex);
+ return exception(ex);
+ }
+}
+
+AsyncStatus
+OutgoingAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool response)
+{
+ _cachedConnection = connection;
+ return connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, response, 0);
+}
+
+AsyncStatus
+OutgoingAsync::invokeCollocated(CollocatedRequestHandler* handler)
+{
+ return handler->invokeAsyncRequest(this, 0, _synchronous);
+}
+
+void
+OutgoingAsync::abort(const Exception& ex)
+{
+ const Reference::Mode mode = _proxy->__reference()->getMode();
+ if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
+ {
+ //
+ // If we didn't finish a batch oneway or datagram request, we
+ // must notify the connection about that we give up ownership
+ // of the batch stream.
+ //
+ _proxy->__getBatchRequestQueue()->abortBatchRequest(&_os);
+ }
+
+ ProxyOutgoingAsyncBase::abort(ex);
+}
+
+void
+OutgoingAsync::invoke(const string& operation)
+{
+ const Reference::Mode mode = _proxy->__reference()->getMode();
+ if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
+ {
+ _sentSynchronously = true;
+ _proxy->__getBatchRequestQueue()->finishBatchRequest(&_os, _proxy, operation);
+ responseImpl(true);
+ return; // Don't call sent/completed callback for batch AMI requests
+ }
+
+ //
+ // NOTE: invokeImpl doesn't throw so this can be called from the
+ // try block with the catch block calling abort() in case of an
+ // exception.
+ //
+ invokeImpl(true); // userThread = true
+}
+
+#ifdef ICE_CPP11_MAPPING
+void
+OutgoingAsync::invoke(const string& operation,
+ Ice::OperationMode mode,
+ Ice::FormatType format,
+ const Ice::Context& context,
+ function<void(Ice::OutputStream*)> write)
+{
+ try
+ {
+ prepare(operation, mode, context);
+ if(write)
+ {
+ _os.startEncapsulation(_encoding, format);
+ write(&_os);
+ _os.endEncapsulation();
+ }
+ else
+ {
+ _os.writeEmptyEncapsulation(_encoding);
+ }
+ invoke(operation);
+ }
+ catch(const Ice::Exception& ex)
+ {
+ abort(ex);
}
}
-ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrx& proxy,
- const string& operation,
- const CallbackBasePtr& delegate,
- const LocalObjectPtr& cookie) :
- ProxyOutgoingAsyncBase(proxy, operation, delegate, cookie)
+void
+OutgoingAsync::throwUserException()
+{
+ try
+ {
+ _is.startEncapsulation();
+ _is.throwException();
+ }
+ catch(const UserException& ex)
+ {
+ _is.endEncapsulation();
+ if(_userException)
+ {
+ _userException(ex);
+ }
+ throw UnknownUserException(__FILE__, __LINE__, ex.ice_id());
+ }
+}
+
+#endif
+
+ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy) : ProxyOutgoingAsyncBase(proxy)
{
- _observer.attach(proxy.get(), operation, 0);
- _batchRequestNum = proxy->__getBatchRequestQueue()->swap(&_os);
}
AsyncStatus
@@ -645,7 +1171,7 @@ ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compre
}
}
_cachedConnection = connection;
- return connection->sendAsyncRequest(this, compress, false, _batchRequestNum);
+ return connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, _batchRequestNum);
}
AsyncStatus
@@ -662,32 +1188,29 @@ ProxyFlushBatchAsync::invokeCollocated(CollocatedRequestHandler* handler)
return AsyncStatusSent;
}
}
- return handler->invokeAsyncRequest(this, _batchRequestNum);
+ return handler->invokeAsyncRequest(this, _batchRequestNum, false);
}
void
-ProxyFlushBatchAsync::invoke()
+ProxyFlushBatchAsync::invoke(const string& operation)
{
checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol()));
+ _observer.attach(_proxy, operation, ::Ice::noExplicitContext);
+ _batchRequestNum = _proxy->__getBatchRequestQueue()->swap(&_os);
invokeImpl(true); // userThread = true
}
-ProxyGetConnection::ProxyGetConnection(const ObjectPrx& prx,
- const string& operation,
- const CallbackBasePtr& delegate,
- const LocalObjectPtr& cookie) :
- ProxyOutgoingAsyncBase(prx, operation, delegate, cookie)
+ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx) : ProxyOutgoingAsyncBase(prx)
{
- _observer.attach(prx.get(), operation, 0);
}
AsyncStatus
ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool)
{
_cachedConnection = connection;
- if(finished(true))
+ if(responseImpl(true))
{
- invokeCompletedAsync();
+ invokeResponseAsync();
}
return AsyncStatusSent;
}
@@ -695,28 +1218,29 @@ ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool)
AsyncStatus
ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*)
{
- if(finished(true))
+ if(responseImpl(true))
{
- invokeCompletedAsync();
+ invokeResponseAsync();
}
return AsyncStatusSent;
}
+Ice::ConnectionPtr
+ProxyGetConnection::getConnection() const
+{
+ return _cachedConnection;
+}
+
void
-ProxyGetConnection::invoke()
+ProxyGetConnection::invoke(const string& operation)
{
+ _observer.attach(_proxy, operation, ::Ice::noExplicitContext);
invokeImpl(true); // userThread = true
}
-ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection,
- const CommunicatorPtr& communicator,
- const InstancePtr& instance,
- const string& operation,
- const CallbackBasePtr& delegate,
- const LocalObjectPtr& cookie) :
- OutgoingAsyncBase(communicator, instance, operation, delegate, cookie), _connection(connection)
+ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, const InstancePtr& instance) :
+ OutgoingAsyncBase(instance), _connection(connection)
{
- _observer.attach(instance.get(), operation);
}
ConnectionPtr
@@ -726,8 +1250,9 @@ ConnectionFlushBatchAsync::getConnection() const
}
void
-ConnectionFlushBatchAsync::invoke()
+ConnectionFlushBatchAsync::invoke(const string& operation)
{
+ _observer.attach(_instance.get(), operation);
try
{
AsyncStatus status;
@@ -742,7 +1267,7 @@ ConnectionFlushBatchAsync::invoke()
}
else
{
- status = _connection->sendAsyncRequest(this, false, false, batchRequestNum);
+ status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, false, false, batchRequestNum);
}
if(status & AsyncStatusSent)
@@ -756,29 +1281,28 @@ ConnectionFlushBatchAsync::invoke()
}
catch(const RetryException& ex)
{
- if(completed(*ex.get()))
+ if(exception(*ex.get()))
{
- invokeCompletedAsync();
+ invokeExceptionAsync();
}
}
catch(const Exception& ex)
{
- if(completed(ex))
+ if(exception(ex))
{
- invokeCompletedAsync();
+ invokeExceptionAsync();
}
}
}
-CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const CommunicatorPtr& communicator,
- const InstancePtr& instance,
- const string& operation,
- const CallbackBasePtr& cb,
- const LocalObjectPtr& cookie) :
- AsyncResult(communicator, instance, operation, cb, cookie)
+CommunicatorFlushBatchAsync::~CommunicatorFlushBatchAsync()
{
- _observer.attach(instance.get(), operation);
+ // Out of line to avoid weak vtable
+}
+CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const InstancePtr& instance) :
+ OutgoingAsyncBase(instance)
+{
//
// _useCount is initialized to 1 to prevent premature callbacks.
// The caller must invoke ready() after all flush requests have
@@ -797,46 +1321,77 @@ CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con)
FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync,
const InstancePtr& instance,
InvocationObserver& observer) :
- OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0),
- _outAsync(outAsync),
- _observer(observer)
+ OutgoingAsyncBase(instance), _outAsync(outAsync), _observer(observer)
{
}
- virtual bool sent()
+ virtual bool
+ sent()
{
_childObserver.detach();
_outAsync->check(false);
return false;
}
- virtual bool completed(const Exception& ex)
+ virtual bool
+ exception(const Exception& ex)
{
- _childObserver.failed(ex.ice_name());
+ _childObserver.failed(ex.ice_id());
_childObserver.detach();
_outAsync->check(false);
return false;
}
- private:
-
- virtual InvocationObserver& getObserver()
+ virtual InvocationObserver&
+ getObserver()
{
return _observer;
}
+ virtual bool handleSent(bool, bool)
+ {
+ return false;
+ }
+
+ virtual bool handleException(const Ice::Exception&)
+ {
+ return false;
+ }
+
+ virtual bool handleResponse(bool)
+ {
+ return false;
+ }
+
+ virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const
+ {
+ assert(false);
+ }
+
+ virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const
+ {
+ assert(false);
+ }
+
+ virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const
+ {
+ assert(false);
+ }
+
+ private:
+
const CommunicatorFlushBatchAsyncPtr _outAsync;
InvocationObserver& _observer;
};
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ Lock sync(_m);
++_useCount;
}
try
{
- OutgoingAsyncBasePtr flushBatch = new FlushBatch(this, _instance, _observer);
+ OutgoingAsyncBasePtr flushBatch = ICE_MAKE_SHARED(FlushBatch, ICE_SHARED_FROM_THIS, _instance, _observer);
int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs());
if(batchRequestNum == 0)
{
@@ -855,8 +1410,11 @@ CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con)
}
void
-CommunicatorFlushBatchAsync::ready()
+CommunicatorFlushBatchAsync::invoke(const string& operation)
{
+ _observer.attach(_instance.get(), operation);
+ _instance->outgoingConnectionFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS);
+ _instance->objectAdapterFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS);
check(true);
}
@@ -864,7 +1422,7 @@ void
CommunicatorFlushBatchAsync::check(bool userThread)
{
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ Lock sync(_m);
assert(_useCount > 0);
if(--_useCount > 0)
{
@@ -872,7 +1430,7 @@ CommunicatorFlushBatchAsync::check(bool userThread)
}
}
- if(sent(true))
+ if(sentImpl(true))
{
if(userThread)
{
@@ -885,3 +1443,137 @@ CommunicatorFlushBatchAsync::check(bool userThread)
}
}
}
+
+#ifdef ICE_CPP11_MAPPING
+
+bool
+LambdaInvoke::handleSent(bool, bool alreadySent)
+{
+ return _sent != nullptr && !alreadySent; // Invoke the sent callback only if not already invoked.
+}
+
+bool
+LambdaInvoke::handleException(const Ice::Exception&)
+{
+ return _exception != nullptr; // Invoke the callback
+}
+
+bool
+LambdaInvoke::handleResponse(bool)
+{
+ return _response != nullptr;
+}
+
+void
+LambdaInvoke::handleInvokeSent(bool sentSynchronously, OutgoingAsyncBase*) const
+{
+ _sent(sentSynchronously);
+}
+
+void
+LambdaInvoke::handleInvokeException(const Ice::Exception& ex, OutgoingAsyncBase*) const
+{
+ try
+ {
+ ex.ice_throw();
+ }
+ catch(const Ice::Exception&)
+ {
+ _exception(current_exception());
+ }
+}
+
+void
+LambdaInvoke::handleInvokeResponse(bool ok, OutgoingAsyncBase*) const
+{
+ _response(ok);
+}
+
+#else // C++98
+
+namespace
+{
+
+//
+// Dummy class derived from CallbackBase
+// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn,
+// this allows us to test whether the user supplied a null delegate instance to the
+// generated begin_ method without having to generate a separate test to throw IllegalArgumentException
+// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated
+// object code.
+//
+class DummyCallback : public CallbackBase
+{
+public:
+
+ DummyCallback()
+ {
+ }
+
+ virtual void
+ completed(const Ice::AsyncResultPtr&) const
+ {
+ assert(false);
+ }
+
+ virtual CallbackBasePtr
+ verify(const Ice::LocalObjectPtr&)
+ {
+ //
+ // Called by the AsyncResult constructor to verify the delegate. The dummy
+ // delegate is passed when the user used a begin_ method without delegate.
+ // By returning 0 here, we tell the AsyncResult that no delegates was
+ // provided.
+ //
+ return 0;
+ }
+
+ virtual void
+ sent(const AsyncResultPtr&) const
+ {
+ assert(false);
+ }
+
+ virtual bool
+ hasSentCallback() const
+ {
+ assert(false);
+ return false;
+ }
+};
+
+}
+
+//
+// This gives a pointer value to compare against in the generated
+// begin_ method to decide whether the caller passed a null pointer
+// versus the generated inline version of the begin_ method having
+// passed a pointer to the dummy delegate.
+//
+CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback;
+
+CallbackBase::~CallbackBase()
+{
+ // Out of line to avoid weak vtable
+}
+
+void
+CallbackBase::checkCallback(bool obj, bool cb)
+{
+ if(!obj)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback object cannot be null");
+ }
+ if(!cb)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback cannot be null");
+ }
+}
+
+GenericCallbackBase::~GenericCallbackBase()
+{
+ // Out of line to avoid weak vtable
+}
+
+
+#endif