summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-10-10 12:03:07 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-10-10 12:03:07 +0200
commit570455a381e6620f8ddfcca448559d3fa545ba38 (patch)
treefe3fa45e6a643b473d9370babff6224b1a9d4dcb /cpp/src/Ice/OutgoingAsync.cpp
parentFixed ICE-5726: provide deprecated public StringConverterPlugin (diff)
downloadice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.bz2
ice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.xz
ice-570455a381e6620f8ddfcca448559d3fa545ba38.zip
Fixed invocation timeouts/interrupt issues, addded AsyncResult.cancel()
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp1268
1 files changed, 320 insertions, 948 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index fb57965b60f..b348b1c8cc8 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -9,18 +9,11 @@
#include <IceUtil/DisableWarnings.h>
#include <Ice/OutgoingAsync.h>
-#include <Ice/Object.h>
#include <Ice/ConnectionI.h>
#include <Ice/CollocatedRequestHandler.h>
#include <Ice/Reference.h>
#include <Ice/Instance.h>
#include <Ice/LocalException.h>
-#include <Ice/Properties.h>
-#include <Ice/LoggerUtil.h>
-#include <Ice/LocatorInfo.h>
-#include <Ice/ProxyFactory.h>
-#include <Ice/RouterInfo.h>
-#include <Ice/Protocol.h>
#include <Ice/ReplyStatus.h>
#include <Ice/ImplicitContextI.h>
#include <Ice/ThreadPool.h>
@@ -30,431 +23,289 @@ using namespace std;
using namespace Ice;
using namespace IceInternal;
-IceUtil::Shared* Ice::upCast(AsyncResult* p) { return p; }
-
-IceUtil::Shared* IceInternal::upCast(OutgoingAsyncMessageCallback* p) { return p; }
+IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; }
+IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; }
IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(ProxyBatchOutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(ConnectionBatchOutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(CommunicatorBatchOutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(GetConnectionOutgoingAsync* p) { return p; }
-
-const unsigned char Ice::AsyncResult::OK = 0x1;
-const unsigned char Ice::AsyncResult::Done = 0x2;
-const unsigned char Ice::AsyncResult::Sent = 0x4;
-const unsigned char Ice::AsyncResult::EndCalled = 0x8;
-
-namespace
-{
-
-class AsynchronousException : public DispatchWorkItem
-{
-public:
-
- AsynchronousException(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result,
- const Ice::Exception& ex) :
- DispatchWorkItem(connection), _result(result), _exception(ex.ice_clone())
- {
- }
-
- virtual void
- run()
- {
- _result->__invokeException(*_exception.get());
- }
-
-private:
-
- const Ice::AsyncResultPtr _result;
- const IceUtil::UniquePtr<Ice::Exception> _exception;
-};
-
-class AsynchronousSent : public DispatchWorkItem
-{
-public:
-
- AsynchronousSent(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result) :
- DispatchWorkItem(connection), _result(result)
- {
- }
-
- virtual void
- run()
- {
- _result->__invokeSent();
- }
+IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatch* p) { return p; }
-private:
-
- const Ice::AsyncResultPtr _result;
-};
-
-};
-
-Ice::AsyncResult::AsyncResult(const CommunicatorPtr& communicator,
- const IceInternal::InstancePtr& instance,
- const string& op,
- const CallbackBasePtr& del,
- const LocalObjectPtr& cookie) :
- _communicator(communicator),
- _instance(instance),
- _operation(op),
- _callback(del),
- _cookie(cookie),
- _is(instance.get(), Ice::currentProtocolEncoding),
- _os(instance.get(), Ice::currentProtocolEncoding),
- _state(0),
- _sentSynchronously(false),
- _exception(0)
+bool
+OutgoingAsyncBase::sent()
{
- if(!_callback)
- {
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__);
- }
- const_cast<CallbackBasePtr&>(_callback) = _callback->verify(_cookie);
+ return sent(true);
}
-Ice::AsyncResult::~AsyncResult()
+bool
+OutgoingAsyncBase::completed(const Exception& ex)
{
+ return finished(ex);
}
-Int
-Ice::AsyncResult::getHash() const
+OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
+ AsyncResult(communicator, instance, operation, delegate, cookie),
+ _os(instance.get(), Ice::currentProtocolEncoding)
{
- return static_cast<Int>(reinterpret_cast<Long>(this) >> 4);
}
bool
-Ice::AsyncResult::isCompleted() const
+OutgoingAsyncBase::sent(bool done)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- return _state & Done;
-}
-
-void
-Ice::AsyncResult::waitForCompleted()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- while(!(_state & Done))
+ if(done)
{
- _monitor.wait();
+ _childObserver.detach();
}
+ return AsyncResult::sent(done);
}
bool
-Ice::AsyncResult::isSent() const
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- return _state & Sent;
-}
-
-void
-Ice::AsyncResult::waitForSent()
+OutgoingAsyncBase::finished(const Exception& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- while(!(_state & Sent) && !_exception.get())
+ if(_childObserver)
{
- _monitor.wait();
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
}
+ return AsyncResult::finished(ex);
}
-void
-Ice::AsyncResult::throwLocalException() const
+Ice::ObjectPrx
+ProxyOutgoingAsyncBase::getProxy() const
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- if(_exception.get())
- {
- _exception.get()->ice_throw();
- }
+ return _proxy;
}
bool
-Ice::AsyncResult::__wait()
+ProxyOutgoingAsyncBase::sent()
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- if(_state & EndCalled)
- {
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once");
- }
- _state |= EndCalled;
- while(!(_state & Done))
- {
- _monitor.wait();
- }
- if(_exception.get())
- {
- _exception.get()->ice_throw();
- }
- return _state & OK;
+ return sent(!_proxy->ice_isTwoway()); // Done if it's not a two-way proxy (no response expected).
}
-void
-Ice::AsyncResult::__throwUserException()
+bool
+ProxyOutgoingAsyncBase::completed(const Exception& exc)
{
- try
- {
- _is.startReadEncaps();
- _is.throwException();
- }
- catch(const Ice::UserException&)
+ if(_childObserver)
{
- _is.endReadEncaps();
- throw;
+ _childObserver.failed(exc.ice_name());
+ _childObserver.detach();
}
-}
-void
-Ice::AsyncResult::__invokeSent()
-{
//
- // Note: no need to change the _state here, specializations are responsible for
- // changing the state.
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback.
//
-
- if(_callback)
+ try
{
- try
- {
- AsyncResultPtr self(this);
- _callback->sent(self);
- }
- catch(const std::exception& ex)
- {
- __warning(ex);
- }
- catch(...)
- {
- __warning();
- }
+ _instance->retryQueue()->add(this, handleException(exc));
+ return false;
}
-
- if(_observer)
+ catch(const Exception& ex)
{
- Ice::ObjectPrx proxy = getProxy();
- if(!proxy || !proxy->ice_isTwoway())
- {
- _observer.detach();
- }
+ return finished(ex); // No retries, we're done
}
}
void
-Ice::AsyncResult::__invokeSentAsync()
+ProxyOutgoingAsyncBase::retry()
{
- //
- // This is called when it's not safe to call the sent callback synchronously
- // from this thread. Instead the exception callback is called asynchronously from
- // the client thread pool.
- //
- try
- {
- _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, this));
- }
- catch(const Ice::CommunicatorDestroyedException&)
- {
- }
+ invokeImpl(false);
}
void
-Ice::AsyncResult::__invokeException(const Ice::Exception& ex)
+ProxyOutgoingAsyncBase::abort(const Ice::Exception& ex)
{
+ assert(!_childObserver);
+
+ if(finished(ex))
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- _state |= Done;
- _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
- _exception.reset(ex.ice_clone());
- _monitor.notifyAll();
+ invokeCompletedAsync();
+ }
+ else if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex))
+ {
+ //
+ // If it's a communicator destroyed exception, don't swallow
+ // it but instead notify the user thread. Even if no callback
+ // was provided.
+ //
+ ex.ice_throw();
}
-
- __invokeCompleted();
}
-void
-Ice::AsyncResult::__invokeExceptionAsync(const Ice::Exception& ex)
+ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrx& prx,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
+ OutgoingAsyncBase(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie),
+ _proxy(prx),
+ _mode(Normal),
+ _cnt(0),
+ _sent(false)
{
- //
- // This is called when it's not safe to call the exception callback synchronously
- // from this thread. Instead the exception callback is called asynchronously from
- // the client thread pool.
- //
- // CommunicatorDestroyedException is the only exception that can propagate directly
- // from this method.
- //
- _instance->clientThreadPool()->dispatch(new AsynchronousException(_cachedConnection, this, ex));
}
void
-Ice::AsyncResult::__invokeCompleted()
+ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
{
- //
- // Note: no need to change the _state here, specializations are responsible for
- // changing the state.
- //
-
- if(_callback)
+ try
{
- try
+ if(userThread)
{
- AsyncResultPtr self(this);
- _callback->completed(self);
+ int invocationTimeout = _proxy->__reference()->getInvocationTimeout();
+ if(invocationTimeout > 0)
+ {
+ _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout));
+ }
}
- catch(const std::exception& ex)
+ else
{
- __warning(ex);
+ checkCanceled(); // Cancellation exception aren't retriable
+ _observer.retried();
}
- catch(...)
+
+ while(true)
{
- __warning();
+ try
+ {
+ _sent = false;
+ _handler = _proxy->__getRequestHandler();
+ AsyncStatus status = _handler->sendAsyncRequest(this);
+ if(status & AsyncStatusSent)
+ {
+ if(userThread)
+ {
+ _sentSynchronously = true;
+ if(status & AsyncStatusInvokeSentCallback)
+ {
+ invokeSent(); // Call the sent callback from the user thread.
+ }
+ }
+ else
+ {
+ if(status & AsyncStatusInvokeSentCallback)
+ {
+ invokeSentAsync(); // Call the sent callback from a client thread pool thread.
+ }
+ }
+ }
+ return; // We're done!
+ }
+ catch(const RetryException& ex)
+ {
+ handleRetryException(ex);
+ }
+ catch(const Exception& ex)
+ {
+ if(_childObserver)
+ {
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
+ }
+ int interval = handleException(ex);
+ if(interval > 0)
+ {
+ _instance->retryQueue()->add(this, interval);
+ return;
+ }
+ else
+ {
+ checkCanceled(); // Cancellation exception aren't retriable
+ _observer.retried();
+ }
+ }
}
}
-
- _observer.detach();
-}
-
-void
-Ice::AsyncResult::runTimerTask() // Implementation of TimerTask::runTimerTask()
-{
- RequestHandlerPtr handler;
+ catch(const Exception& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- swap(handler, _timeoutRequestHandler);
- }
-
- if(handler)
- {
- handler->asyncRequestTimedOut(OutgoingAsyncMessageCallbackPtr::dynamicCast(this));
+ //
+ // If called from the user thread we re-throw, the exception
+ // will be catch by the caller and abort() will be called.
+ //
+ if(userThread)
+ {
+ throw;
+ }
+ else if(finished(ex)) // No retries, we're done
+ {
+ invokeCompletedAsync();
+ }
}
}
-
-void
-Ice::AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation)
+bool
+ProxyOutgoingAsyncBase::sent(bool done)
{
- __check(r, operation);
- if(r->getProxy().get() != prx)
+ _sent = true;
+ if(done)
{
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Proxy for call to end_" + operation +
- " does not match proxy that was used to call corresponding begin_" +
- operation + " method");
+ if(_proxy->__reference()->getInvocationTimeout() > 0)
+ {
+ _instance->timer()->cancel(this);
+ }
}
+ return OutgoingAsyncBase::sent(done);
}
-void
-Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation)
+bool
+ProxyOutgoingAsyncBase::finished(const Exception& ex)
{
- __check(r, operation);
- if(r->getCommunicator().get() != com)
+ if(_proxy->__reference()->getInvocationTimeout() > 0)
{
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Communicator for call to end_" + operation +
- " does not match communicator that was used to call corresponding " +
- "begin_" + operation + " method");
+ _instance->timer()->cancel(this);
}
+ return OutgoingAsyncBase::finished(ex);
}
-void
-Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation)
+bool
+ProxyOutgoingAsyncBase::finished(bool ok)
{
- __check(r, operation);
- if(r->getConnection().get() != con)
+ if(_proxy->__reference()->getInvocationTimeout() > 0)
{
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Connection for call to end_" + operation +
- " does not match connection that was used to call corresponding " +
- "begin_" + operation + " method");
+ _instance->timer()->cancel(this);
}
+ return AsyncResult::finished(ok);
}
void
-Ice::AsyncResult::__check(const AsyncResultPtr& r, const string& operation)
+ProxyOutgoingAsyncBase::handleRetryException(const RetryException& exc)
{
- if(!r)
- {
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "AsyncResult == null");
- }
- else if(&r->_operation != &operation)
- {
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Incorrect operation for end_" + operation +
- " method: " + r->_operation);
- }
+ _proxy->__setRequestHandler(_handler, 0); // Clear request handler and always retry.
}
-
-void
-Ice::AsyncResult::__warning(const std::exception& exc) const
+int
+ProxyOutgoingAsyncBase::handleException(const Exception& exc)
{
- if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
- {
- Warning out(_instance->initializationData().logger);
- const Exception* ex = dynamic_cast<const Exception*>(&exc);
- if(ex)
- {
- out << "Ice::Exception raised by AMI callback:\n" << *ex;
- }
- else
- {
- out << "std::exception raised by AMI callback:\n" << exc.what();
- }
- }
+ return _proxy->__handleException(exc, _handler, _mode, _sent, _cnt);
}
void
-Ice::AsyncResult::__warning() const
+ProxyOutgoingAsyncBase::runTimerTask()
{
- if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ try
{
- Warning out(_instance->initializationData().logger);
- out << "unknown exception raised by AMI callback";
+ cancel(InvocationTimeoutException(__FILE__, __LINE__));
}
-}
-
-void
-IceInternal::OutgoingAsyncMessageCallback::__dispatchInvocationTimeout(const ThreadPoolPtr& threadPool,
- const Ice::ConnectionPtr& connection)
-{
- class InvocationTimeoutCall : public DispatchWorkItem
+ catch(const CommunicatorDestroyedException&)
{
- public:
-
- InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) :
- DispatchWorkItem(connection), _outAsync(outAsync)
- {
- }
-
- virtual void
- run()
- {
- InvocationTimeoutException ex(__FILE__, __LINE__);
- _outAsync->__finished(ex);
- }
-
- private:
-
- const OutgoingAsyncMessageCallbackPtr _outAsync;
- };
- threadPool->dispatch(new InvocationTimeoutCall(this, connection));
+ }
}
-IceInternal::OutgoingAsync::OutgoingAsync(const ObjectPrx& prx,
- const std::string& operation,
- const CallbackBasePtr& delegate,
- const Ice::LocalObjectPtr& cookie) :
- AsyncResult(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie),
- _proxy(prx),
+OutgoingAsync::OutgoingAsync(const ObjectPrx& prx,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
+ ProxyOutgoingAsyncBase(prx, operation, delegate, cookie),
_encoding(getCompatibleEncoding(prx->__reference()->getEncoding()))
{
}
void
-IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMode mode, const Context* context)
+OutgoingAsync::prepare(const string& operation, OperationMode mode, const Context* context)
{
- _handler = 0;
- _cnt = 0;
- _sent = false;
- _mode = mode;
- _sentSynchronously = false;
-
checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol()));
+ _mode = mode;
_observer.attach(_proxy.get(), operation, context);
switch(_proxy->__reference()->getMode())
@@ -482,7 +333,7 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod
{
_proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
}
- catch(const Ice::LocalException& ex)
+ catch(const LocalException& ex)
{
_observer.failed(ex.ice_name());
_proxy->__setRequestHandler(_handler, 0); // Clear request handler
@@ -541,109 +392,63 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod
}
AsyncStatus
-IceInternal::OutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool compress, bool response)
+OutgoingAsync::send(const ConnectionIPtr& connection, bool compress, bool response)
{
_cachedConnection = connection;
return connection->sendAsyncRequest(this, compress, response);
}
AsyncStatus
-IceInternal::OutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler)
+OutgoingAsync::invokeCollocated(CollocatedRequestHandler* handler)
{
return handler->invokeAsyncRequest(this);
}
-bool
-IceInternal::OutgoingAsync::__sent()
+void
+OutgoingAsync::abort(const Exception& ex)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
-
- bool alreadySent = _state & Sent; // Expected in case of a retry.
- _state |= Sent;
- _sent = true;
-
- assert(!(_state & Done));
- if(!_proxy->ice_isTwoway())
+ const Reference::Mode mode = _proxy->__reference()->getMode();
+ if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
{
- _childObserver.detach();
- if(!_callback || !_callback->hasSentCallback())
+ if(_handler)
{
- _observer.detach();
- }
- if(_timeoutRequestHandler)
- {
- _instance->timer()->cancel(this);
- _timeoutRequestHandler = 0;
+ //
+ // If we didn't finish a batch oneway or datagram request, we
+ // must notify the connection about that we give up ownership
+ // of the batch stream.
+ //
+ _handler->abortBatchRequest();
}
- _state |= Done | OK;
- //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization.
}
- _monitor.notifyAll();
- return !alreadySent && _callback && _callback->hasSentCallback();
-}
-
-void
-IceInternal::OutgoingAsync::__invokeSent()
-{
- ::Ice::AsyncResult::__invokeSent();
+
+ ProxyOutgoingAsyncBase::abort(ex);
}
void
-IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc)
+OutgoingAsync::invoke()
{
+ const Reference::Mode mode = _proxy->__reference()->getMode();
+ if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(!(_state & Done));
- _childObserver.failed(exc.ice_name());
- _childObserver.detach();
- if(_timeoutRequestHandler)
+ if(_handler)
{
- _instance->timer()->cancel(this);
- _timeoutRequestHandler = 0;
+ _sentSynchronously = true;
+ _handler->finishBatchRequest(&_os);
+ finished(true);
}
+ return; // Don't call sent/completed callback for batch AMI requests
}
//
- // NOTE: at this point, synchronization isn't needed, no other threads should be
- // calling on the callback.
+ // NOTE: invokeImpl doesn't throw so this can be called from the
+ // try block with the catch block calling abort() in case of an
+ // exception.
//
- try
- {
- handleException(exc);
- }
- catch(const Ice::Exception& ex)
- {
- __invokeException(ex);
- }
-}
-
-void
-IceInternal::OutgoingAsync::__invokeExceptionAsync(const Ice::Exception& ex)
-{
- if((_state & Done) == 0 && _handler)
- {
- //
- // If we didn't finish a batch oneway or datagram request, we
- // must notify the connection about that we give up ownership
- // of the batch stream.
- //
- int mode = _proxy->__reference()->getMode();
- if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
- {
- _handler->abortBatchRequest();
- }
- }
- AsyncResult::__invokeExceptionAsync(ex);
-}
-
-void
-IceInternal::OutgoingAsync::__processRetry()
-{
- __invoke(false);
+ invokeImpl(true); // userThread = true
}
bool
-IceInternal::OutgoingAsync::__finished()
+OutgoingAsync::completed()
{
//
// NOTE: this method is called from ConnectionI.parseMessage
@@ -652,25 +457,15 @@ IceInternal::OutgoingAsync::__finished()
//
assert(_proxy->ice_isTwoway()); // Can only be called for twoways.
- Ice::Byte replyStatus;
- try
+ if(_childObserver)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(!_exception.get() && !(_state & Done));
- assert(!_is.b.empty());
-
- if(_childObserver)
- {
- _childObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4));
- }
+ _childObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4));
_childObserver.detach();
+ }
- if(_timeoutRequestHandler)
- {
- _instance->timer()->cancel(this);
- _timeoutRequestHandler = 0;
- }
-
+ Byte replyStatus;
+ try
+ {
_is.read(replyStatus);
switch(replyStatus)
@@ -789,373 +584,197 @@ IceInternal::OutgoingAsync::__finished()
}
}
- _state |= Done;
- _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
- if(replyStatus == replyOK)
- {
- _state |= OK;
- }
- _monitor.notifyAll();
-
- if(!_callback)
- {
- _observer.detach();
- return false;
- }
- return true;
+ return finished(replyStatus == replyOK);
}
- catch(const LocalException& exc)
+ catch(const Exception& ex)
{
- //
- // We don't call finished(exc) here because we don't want
- // to invoke the completion callback. The completion
- // callback is invoked by the connection is this method
- // returns true.
- //
- try
- {
- handleException(exc);
- return false; // Invocation will be retried.
- }
- catch(const Ice::Exception& ex)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- _state |= Done;
- _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
- _exception.reset(ex.ice_clone());
- _monitor.notifyAll();
-
- if(!_callback)
- {
- _observer.detach();
- return false;
- }
- return true;
- }
+ return completed(ex);
}
}
+ProxyFlushBatch::ProxyFlushBatch(const ObjectPrx& proxy,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
+ ProxyOutgoingAsyncBase(proxy, operation, delegate, cookie)
+{
+ _observer.attach(proxy.get(), operation, 0);
+}
+
bool
-IceInternal::OutgoingAsync::__invoke(bool userThread)
+ProxyFlushBatch::sent()
{
- const Reference::Mode mode = _proxy->__reference()->getMode();
- if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
- {
- _state |= Done | OK;
- _handler->finishBatchRequest(&_os);
- _observer.detach();
- return true;
- }
+ return ProxyOutgoingAsyncBase::sent(true); // Overriden because the flush is done even if using a two-way proxy.
+}
- while(true)
- {
- try
- {
- _sent = false;
- _handler = _proxy->__getRequestHandler();
- AsyncStatus status = _handler->sendAsyncRequest(this);
- if(status & AsyncStatusSent)
- {
- if(userThread)
- {
- _sentSynchronously = true;
- if(status & AsyncStatusInvokeSentCallback)
- {
- __invokeSent(); // Call the sent callback from the user thread.
- }
- }
- else
- {
- if(status & AsyncStatusInvokeSentCallback)
- {
- __invokeSentAsync(); // Call the sent callback from a client thread pool thread.
- }
- }
- }
+AsyncStatus
+ProxyFlushBatch::send(const ConnectionIPtr& connection, bool, bool)
+{
+ _cachedConnection = connection;
+ return connection->flushAsyncBatchRequests(this);
+}
- if(mode == Reference::ModeTwoway || !(status & AsyncStatusSent))
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- if(!(_state & Done))
- {
- int invocationTimeout = _handler->getReference()->getInvocationTimeout();
- if(invocationTimeout > 0)
- {
- _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout));
- _timeoutRequestHandler = _handler;
- }
- }
- }
- }
- catch(const RetryException&)
- {
- _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
- continue;
- }
- catch(const Ice::Exception& ex)
- {
- handleException(ex);
- }
- break;
- }
- return _sentSynchronously;
+AsyncStatus
+ProxyFlushBatch::invokeCollocated(CollocatedRequestHandler* handler)
+{
+ return handler->invokeAsyncBatchRequests(this);
}
void
-IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc)
+ProxyFlushBatch::invoke()
{
- try
- {
- int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt);
- _observer.retried(); // Invocation is being retried.
-
- //
- // Schedule the retry. Note that we always schedule the retry
- // on the retry queue even if the invocation can be retried
- // immediately. This is required because it might not be safe
- // to retry from this thread (this is for instance called by
- // finished(BasicStream) which is called with the connection
- // locked.
- //
- _instance->retryQueue()->add(this, interval);
- }
- catch(const Ice::Exception& ex)
- {
- _observer.failed(ex.ice_name());
- throw;
- }
+ checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol()));
+ invokeImpl(true); // userThread = true
}
-IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& communicator,
- const InstancePtr& instance,
- const std::string& operation,
- const CallbackBasePtr& delegate,
- const Ice::LocalObjectPtr& cookie) :
- AsyncResult(communicator, instance, operation, delegate, cookie)
+void
+ProxyFlushBatch::handleRetryException(const RetryException& ex)
{
+ _proxy->__setRequestHandler(_handler, 0); // Clear request handler
+ ex.get()->ice_throw(); // No retries, we want to notify the user of potentially lost batch requests
}
-AsyncStatus
-IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool, bool)
+int
+ProxyFlushBatch::handleException(const Exception& ex)
{
- _cachedConnection = connection;
- return connection->flushAsyncBatchRequests(this);
+ _proxy->__setRequestHandler(_handler, 0); // Clear request handler
+ ex.ice_throw(); // No retries, we want to notify the user of potentially lost batch requests
+ return 0;
}
-AsyncStatus
-IceInternal::BatchOutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler)
+ProxyGetConnection::ProxyGetConnection(const ObjectPrx& prx,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
+ ProxyOutgoingAsyncBase(prx, operation, delegate, cookie)
{
- return handler->invokeAsyncBatchRequests(this);
+ _observer.attach(prx.get(), operation, 0);
}
-bool
-IceInternal::BatchOutgoingAsync::__sent()
+AsyncStatus
+ProxyGetConnection::send(const ConnectionIPtr& connection, bool, bool)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(!_exception.get());
- _state |= Done | OK | Sent;
- //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization.
- _childObserver.detach();
- if(_timeoutRequestHandler)
- {
- _instance->timer()->cancel(this);
- _timeoutRequestHandler = 0;
- }
- _monitor.notifyAll();
- if(!_callback || !_callback->hasSentCallback())
+ _cachedConnection = connection;
+ if(finished(true))
{
- _observer.detach();
- return false;
+ invokeCompletedAsync();
}
- return true;
+ return AsyncStatusSent;
}
-void
-IceInternal::BatchOutgoingAsync::__invokeSent()
+AsyncStatus
+ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*)
{
- ::Ice::AsyncResult::__invokeSent();
+ if(finished(true))
+ {
+ invokeCompletedAsync();
+ }
+ return AsyncStatusSent;
}
void
-IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc)
+ProxyGetConnection::invoke()
{
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- _childObserver.failed(exc.ice_name());
- _childObserver.detach();
- if(_timeoutRequestHandler)
- {
- _instance->timer()->cancel(this);
- _timeoutRequestHandler = 0;
- }
- }
- __invokeException(exc);
+ invokeImpl(true); // userThread = true
}
-void
-IceInternal::BatchOutgoingAsync::__processRetry()
+ConnectionFlushBatch::ConnectionFlushBatch(const ConnectionIPtr& connection,
+ const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const string& operation,
+ const CallbackBasePtr& delegate,
+ const LocalObjectPtr& cookie) :
+ OutgoingAsyncBase(communicator, instance, operation, delegate, cookie), _connection(connection)
{
- assert(false); // Retries are never scheduled
+ _observer.attach(instance.get(), operation);
}
-IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectPrx& proxy,
- const std::string& operation,
- const CallbackBasePtr& delegate,
- const Ice::LocalObjectPtr& cookie) :
- BatchOutgoingAsync(proxy->ice_getCommunicator(), proxy->__reference()->getInstance(), operation, delegate, cookie),
- _proxy(proxy)
+ConnectionPtr
+ConnectionFlushBatch::getConnection() const
{
- _observer.attach(proxy.get(), operation, 0);
+ return _connection;
}
void
-IceInternal::ProxyBatchOutgoingAsync::__invoke()
+ConnectionFlushBatch::invoke()
{
- checkSupportedProtocol(_proxy->__reference()->getProtocol());
-
- RequestHandlerPtr handler;
try
{
- handler = _proxy->__getRequestHandler();
- AsyncStatus status = handler->sendAsyncRequest(this);
+ AsyncStatus status = _connection->flushAsyncBatchRequests(this);
if(status & AsyncStatusSent)
{
_sentSynchronously = true;
if(status & AsyncStatusInvokeSentCallback)
{
- __invokeSent();
- }
- }
- else
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- if(!(_state & Done))
- {
- int invocationTimeout = handler->getReference()->getInvocationTimeout();
- if(invocationTimeout > 0)
- {
- _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout));
- _timeoutRequestHandler = handler;
- }
+ invokeSent();
}
}
}
- catch(const RetryException&)
- {
- //
- // Clear request handler but don't retry or throw. Retrying
- // isn't useful, there were no batch requests associated with
- // the proxy's request handler.
- //
- _proxy->__setRequestHandler(handler, 0);
- }
- catch(const Ice::Exception& ex)
+ catch(const Exception& ex)
{
- _observer.failed(ex.ice_name());
- _proxy->__setRequestHandler(handler, 0); // Clear request handler
- throw; // Throw to notify the user that batch requests were potentially lost.
- }
-}
-
-IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const ConnectionIPtr& con,
- const CommunicatorPtr& communicator,
- const InstancePtr& instance,
- const string& operation,
- const CallbackBasePtr& delegate,
- const Ice::LocalObjectPtr& cookie) :
- BatchOutgoingAsync(communicator, instance, operation, delegate, cookie),
- _connection(con)
-{
- _observer.attach(instance.get(), operation);
-}
-
-void
-IceInternal::ConnectionBatchOutgoingAsync::__invoke()
-{
- AsyncStatus status = _connection->flushAsyncBatchRequests(this);
- if(status & AsyncStatusSent)
- {
- _sentSynchronously = true;
- if(status & AsyncStatusInvokeSentCallback)
+ if(completed(ex))
{
- __invokeSent();
+ invokeCompletedAsync();
}
}
}
-Ice::ConnectionPtr
-IceInternal::ConnectionBatchOutgoingAsync::getConnection() const
+CommunicatorFlushBatch::CommunicatorFlushBatch(const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const string& operation,
+ const CallbackBasePtr& cb,
+ const LocalObjectPtr& cookie) :
+ AsyncResult(communicator, instance, operation, cb, cookie)
{
- return _connection;
-}
+ _observer.attach(instance.get(), operation);
-IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(const CommunicatorPtr& communicator,
- const InstancePtr& instance,
- const string& operation,
- const CallbackBasePtr& delegate,
- const Ice::LocalObjectPtr& cookie) :
- AsyncResult(communicator, instance, operation, delegate, cookie)
-{
//
// _useCount is initialized to 1 to prevent premature callbacks.
// The caller must invoke ready() after all flush requests have
// been initiated.
//
_useCount = 1;
-
- //
- // Assume all connections are flushed synchronously.
- //
- _sentSynchronously = true;
-
- //
- // Attach observer
- //
- _observer.attach(instance.get(), operation);
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPtr& con)
+CommunicatorFlushBatch::flushConnection(const ConnectionIPtr& con)
{
- class BatchOutgoingAsyncI : public BatchOutgoingAsync
+ class FlushBatch : public OutgoingAsyncBase
{
public:
-
- BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync,
- const InstancePtr& instance,
- InvocationObserver& observer) :
- BatchOutgoingAsync(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0),
- _outAsync(outAsync), _observer(observer)
+
+ FlushBatch(const CommunicatorFlushBatchPtr& outAsync,
+ const InstancePtr& instance,
+ InvocationObserver& observer) :
+ OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0),
+ _outAsync(outAsync),
+ _observer(observer)
{
}
- virtual bool __sent()
+ virtual bool sent()
{
_childObserver.detach();
_outAsync->check(false);
return false;
}
-#ifdef __SUNPRO_CC
- using BatchOutgoingAsync::__sent;
-#endif
-
- virtual void __finished(const Ice::Exception& ex)
+ virtual bool completed(const Exception& ex)
{
_childObserver.failed(ex.ice_name());
_childObserver.detach();
_outAsync->check(false);
+ return false;
}
- virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt,
- Ice::Int requestId, Ice::Int sz)
+ private:
+
+ virtual InvocationObserver& getObserver()
{
- _childObserver.attach(_observer.getRemoteObserver(connection, endpt, requestId, sz));
+ return _observer;
}
- private:
-
- const CommunicatorBatchOutgoingAsyncPtr _outAsync;
+ const CommunicatorFlushBatchPtr _outAsync;
InvocationObserver& _observer;
};
@@ -1166,13 +785,9 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt
try
{
- AsyncStatus status = con->flushAsyncBatchRequests(new BatchOutgoingAsyncI(this, _instance, _observer));
- if(!(status & AsyncStatusSent))
- {
- _sentSynchronously = false;
- }
+ con->flushAsyncBatchRequests(new FlushBatch(this, _instance, _observer));
}
- catch(const Ice::LocalException&)
+ catch(const LocalException&)
{
check(false);
throw;
@@ -1180,19 +795,13 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::ready()
+CommunicatorFlushBatch::ready()
{
check(true);
}
void
-IceInternal::CommunicatorBatchOutgoingAsync::__processRetry()
-{
- assert(false); // Retries are never scheduled
-}
-
-void
-IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread)
+CommunicatorFlushBatch::check(bool userThread)
{
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
@@ -1201,255 +810,18 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread)
{
return;
}
- _state |= Done | OK | Sent;
- _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
- _monitor.notifyAll();
}
- if(!_callback || !_callback->hasSentCallback())
+ if(sent(true))
{
- _observer.detach();
- }
- else
- {
- //
- // _sentSynchronously is immutable here.
- //
- if(!_sentSynchronously || !userThread)
+ if(userThread)
{
- __invokeSentAsync();
+ _sentSynchronously = true;
+ invokeSent();
}
else
{
- AsyncResult::__invokeSent();
- }
- }
-}
-
-IceInternal::GetConnectionOutgoingAsync::GetConnectionOutgoingAsync(const Ice::ObjectPrx& prx,
- const std::string& operation,
- const CallbackBasePtr& delegate,
- const Ice::LocalObjectPtr& cookie) :
- AsyncResult(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie),
- _proxy(prx),
- _cnt(0)
-{
- _observer.attach(prx.get(), operation, 0);
-}
-
-void
-IceInternal::GetConnectionOutgoingAsync::__invoke()
-{
- while(true)
- {
- try
- {
- _handler = _proxy->__getRequestHandler();
- _handler->sendAsyncRequest(this);
- }
- catch(const RetryException&)
- {
- _proxy->__setRequestHandler(_handler, 0);
- }
- catch(const Ice::Exception& ex)
- {
- handleException(ex);
+ invokeSentAsync();
}
- break;
- }
-}
-
-AsyncStatus
-IceInternal::GetConnectionOutgoingAsync::__send(const Ice::ConnectionIPtr&, bool, bool)
-{
- __sent();
- return AsyncStatusSent;
-}
-
-AsyncStatus
-IceInternal::GetConnectionOutgoingAsync::__invokeCollocated(CollocatedRequestHandler*)
-{
- __sent();
- return AsyncStatusSent;
-}
-
-bool
-IceInternal::GetConnectionOutgoingAsync::__sent()
-{
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- _state |= Done;
- _monitor.notifyAll();
- }
- __invokeCompleted();
- return false;
-}
-
-void
-IceInternal::GetConnectionOutgoingAsync::__invokeSent()
-{
- // No sent callback
-}
-
-void
-IceInternal::GetConnectionOutgoingAsync::__finished(const Ice::Exception& exc)
-{
- try
- {
- handleException(exc);
- }
- catch(const Ice::Exception& ex)
- {
- __invokeException(ex);
- }
-}
-
-void
-IceInternal::GetConnectionOutgoingAsync::__processRetry()
-{
- __invoke();
-}
-
-void
-IceInternal::GetConnectionOutgoingAsync::handleException(const Ice::Exception& exc)
-{
- try
- {
- _instance->retryQueue()->add(this, _proxy->__handleException(exc, _handler, Ice::Idempotent, false, _cnt));
- _observer.retried(); // Invocation is being retried.
- }
- catch(const Ice::Exception& ex)
- {
- _observer.failed(ex.ice_name());
- throw;
- }
-}
-
-namespace
-{
-
-//
-// Dummy class derived from CallbackBase
-// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn,
-// this allows us to test whether the user supplied a null delegate instance to the
-// generated begin_ method without having to generate a separate test to throw IllegalArgumentException
-// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated
-// object code.
-//
-class DummyCallback : public CallbackBase
-{
-public:
-
- DummyCallback()
- {
- }
-
- virtual void
- completed(const Ice::AsyncResultPtr&) const
- {
- assert(false);
- }
-
- virtual CallbackBasePtr
- verify(const Ice::LocalObjectPtr&)
- {
- //
- // Called by the AsyncResult constructor to verify the delegate. The dummy
- // delegate is passed when the user used a begin_ method without delegate.
- // By returning 0 here, we tell the AsyncResult that no delegates was
- // provided.
- //
- return 0;
- }
-
- virtual void
- sent(const AsyncResultPtr&) const
- {
- assert(false);
- }
-
- virtual bool
- hasSentCallback() const
- {
- assert(false);
- return false;
- }
-};
-
-}
-
-//
-// This gives a pointer value to compare against in the generated
-// begin_ method to decide whether the caller passed a null pointer
-// versus the generated inline version of the begin_ method having
-// passed a pointer to the dummy delegate.
-//
-CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback;
-
-#ifdef ICE_CPP11
-
-Ice::CallbackPtr
-Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& completed,
- const ::IceInternal::Function<void (const AsyncResultPtr&)>& sent)
-{
- class Cpp11CB : public GenericCallbackBase
- {
- public:
-
- Cpp11CB(const ::std::function<void (const AsyncResultPtr&)>& completed,
- const ::std::function<void (const AsyncResultPtr&)>& sent) :
- _completed(completed),
- _sent(sent)
- {
- checkCallback(true, completed != nullptr);
- }
-
- virtual void
- completed(const AsyncResultPtr& result) const
- {
- _completed(result);
- }
-
- virtual CallbackBasePtr
- verify(const LocalObjectPtr&)
- {
- return this; // Nothing to do, the cookie is not type-safe.
- }
-
- virtual void
- sent(const AsyncResultPtr& result) const
- {
- if(_sent != nullptr)
- {
- _sent(result);
- }
- }
-
- virtual bool
- hasSentCallback() const
- {
- return _sent != nullptr;
- }
-
- private:
-
- ::std::function< void (const AsyncResultPtr&)> _completed;
- ::std::function< void (const AsyncResultPtr&)> _sent;
- };
-
- return new Cpp11CB(completed, sent);
-}
-#endif
-
-void
-IceInternal::CallbackBase::checkCallback(bool obj, bool cb)
-{
- if(!obj)
- {
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback object cannot be null");
- }
- if(!cb)
- {
- throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback cannot be null");
}
}