summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/AsyncResult.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-10-10 12:03:07 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-10-10 12:03:07 +0200
commit570455a381e6620f8ddfcca448559d3fa545ba38 (patch)
treefe3fa45e6a643b473d9370babff6224b1a9d4dcb /cpp/src/Ice/AsyncResult.cpp
parentFixed ICE-5726: provide deprecated public StringConverterPlugin (diff)
downloadice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.bz2
ice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.xz
ice-570455a381e6620f8ddfcca448559d3fa545ba38.zip
Fixed invocation timeouts/interrupt issues, addded AsyncResult.cancel()
Diffstat (limited to 'cpp/src/Ice/AsyncResult.cpp')
-rw-r--r--cpp/src/Ice/AsyncResult.cpp602
1 files changed, 602 insertions, 0 deletions
diff --git a/cpp/src/Ice/AsyncResult.cpp b/cpp/src/Ice/AsyncResult.cpp
new file mode 100644
index 00000000000..03abc55e020
--- /dev/null
+++ b/cpp/src/Ice/AsyncResult.cpp
@@ -0,0 +1,602 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceUtil/DisableWarnings.h>
+#include <Ice/AsyncResult.h>
+#include <Ice/ThreadPool.h>
+#include <Ice/Instance.h>
+#include <Ice/LoggerUtil.h>
+#include <Ice/Properties.h>
+#include <Ice/RequestHandler.h>
+#include <Ice/OutgoingAsync.h>
+
+using namespace std;
+using namespace Ice;
+using namespace IceInternal;
+
+IceUtil::Shared* Ice::upCast(AsyncResult* p) { return p; }
+
+const unsigned char Ice::AsyncResult::OK = 0x1;
+const unsigned char Ice::AsyncResult::Done = 0x2;
+const unsigned char Ice::AsyncResult::Sent = 0x4;
+const unsigned char Ice::AsyncResult::EndCalled = 0x8;
+
+void
+AsyncResult::cancel()
+{
+ cancel(InvocationCanceledException(__FILE__, __LINE__));
+}
+
+Int
+AsyncResult::getHash() const
+{
+ return static_cast<Int>(reinterpret_cast<Long>(this) >> 4);
+}
+
+CommunicatorPtr
+AsyncResult::getCommunicator() const
+{
+ return _communicator;
+}
+
+ConnectionPtr
+AsyncResult::getConnection() const
+{
+ return 0;
+}
+
+ObjectPrx
+AsyncResult::getProxy() const
+{
+ return 0;
+}
+
+bool
+AsyncResult::isCompleted() const
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ return _state & Done;
+}
+
+void
+AsyncResult::waitForCompleted()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(!(_state & Done))
+ {
+ _monitor.wait();
+ }
+}
+
+bool
+AsyncResult::isSent() const
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ return _state & Sent;
+}
+
+void
+AsyncResult::waitForSent()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(!(_state & Sent) && !_exception.get())
+ {
+ _monitor.wait();
+ }
+}
+
+void
+AsyncResult::throwLocalException() const
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ if(_exception.get())
+ {
+ _exception.get()->ice_throw();
+ }
+}
+
+bool
+AsyncResult::sentSynchronously() const
+{
+ return _sentSynchronously;
+}
+
+LocalObjectPtr
+AsyncResult::getCookie() const
+{
+ return _cookie;
+}
+
+const std::string&
+AsyncResult::getOperation() const
+{
+ return _operation;
+}
+
+void
+AsyncResult::__throwUserException()
+{
+ try
+ {
+ _is.startReadEncaps();
+ _is.throwException();
+ }
+ catch(const Ice::UserException&)
+ {
+ _is.endReadEncaps();
+ throw;
+ }
+}
+
+bool
+AsyncResult::__wait()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ if(_state & EndCalled)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once");
+ }
+ _state |= EndCalled;
+ while(!(_state & Done))
+ {
+ _monitor.wait();
+ }
+ if(_exception.get())
+ {
+ _exception.get()->ice_throw();
+ }
+ return _state & OK;
+}
+
+void
+AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation)
+{
+ __check(r, operation);
+ if(r->getProxy().get() != prx)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Proxy for call to end_" + operation +
+ " does not match proxy that was used to call corresponding begin_" +
+ operation + " method");
+ }
+}
+
+void
+AsyncResult::__check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation)
+{
+ __check(r, operation);
+ if(r->getCommunicator().get() != com)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Communicator for call to end_" + operation +
+ " does not match communicator that was used to call corresponding " +
+ "begin_" + operation + " method");
+ }
+}
+
+void
+AsyncResult::__check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation)
+{
+ __check(r, operation);
+ if(r->getConnection().get() != con)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Connection for call to end_" + operation +
+ " does not match connection that was used to call corresponding " +
+ "begin_" + operation + " method");
+ }
+}
+
+void
+AsyncResult::__check(const AsyncResultPtr& r, const string& operation)
+{
+ if(!r)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "AsyncResult == null");
+ }
+ else if(&r->_operation != &operation)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Incorrect operation for end_" + operation +
+ " method: " + r->_operation);
+ }
+}
+
+AsyncResult::AsyncResult(const CommunicatorPtr& communicator,
+ const IceInternal::InstancePtr& instance,
+ const string& op,
+ const CallbackBasePtr& del,
+ const LocalObjectPtr& cookie) :
+ _instance(instance),
+ _sentSynchronously(false),
+ _is(instance.get(), Ice::currentProtocolEncoding),
+ _communicator(communicator),
+ _operation(op),
+ _callback(del),
+ _cookie(cookie),
+ _state(0)
+{
+ if(!_callback)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__);
+ }
+ const_cast<CallbackBasePtr&>(_callback) = _callback->verify(_cookie);
+}
+
+AsyncResult::~AsyncResult()
+{
+}
+
+bool
+AsyncResult::sent(bool done)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ assert(!_exception.get());
+
+ bool alreadySent = _state & Sent;
+ _state |= Sent;
+ if(done)
+ {
+ _state |= Done | OK;
+ _cancellationHandler = 0;
+ if(!_callback || !_callback->hasSentCallback())
+ {
+ _observer.detach();
+ }
+ }
+
+ _monitor.notifyAll();
+ return !alreadySent && _callback && _callback->hasSentCallback();
+}
+
+bool
+AsyncResult::finished(bool ok)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _state |= Done;
+ if(ok)
+ {
+ _state |= OK;
+ }
+ _cancellationHandler = 0;
+ if(!_callback)
+ {
+ _observer.detach();
+ }
+ _monitor.notifyAll();
+ return _callback;
+}
+
+bool
+AsyncResult::finished(const Ice::Exception& ex)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _state |= Done;
+ _exception.reset(ex.ice_clone());
+ _cancellationHandler = 0;
+ _observer.failed(ex.ice_name());
+ if(!_callback)
+ {
+ _observer.detach();
+ }
+ _monitor.notifyAll();
+ return _callback;
+}
+
+void
+AsyncResult::invokeSentAsync()
+{
+ class AsynchronousSent : public DispatchWorkItem
+ {
+ public:
+
+ AsynchronousSent(const ConnectionPtr& connection, const AsyncResultPtr& result) :
+ DispatchWorkItem(connection), _result(result)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ _result->invokeSent();
+ }
+
+ private:
+
+ const AsyncResultPtr _result;
+ };
+
+ //
+ // This is called when it's not safe to call the sent callback
+ // synchronously from this thread. Instead the exception callback
+ // is called asynchronously from the client thread pool.
+ //
+ try
+ {
+ _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, this));
+ }
+ catch(const Ice::CommunicatorDestroyedException&)
+ {
+ }
+}
+
+void
+AsyncResult::invokeCompletedAsync()
+{
+ class AsynchronousCompleted : public DispatchWorkItem
+ {
+ public:
+
+ AsynchronousCompleted(const ConnectionPtr& connection, const AsyncResultPtr& result) :
+ DispatchWorkItem(connection), _result(result)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ _result->invokeCompleted();
+ }
+
+ private:
+
+ const AsyncResultPtr _result;
+ };
+
+ //
+ // CommunicatorDestroyedCompleted is the only exception that can propagate directly
+ // from this method.
+ //
+ _instance->clientThreadPool()->dispatch(new AsynchronousCompleted(_cachedConnection, this));
+}
+
+void
+AsyncResult::invokeSent()
+{
+ assert(_callback);
+
+ try
+ {
+ AsyncResultPtr self(this);
+ _callback->sent(self);
+ }
+ catch(const std::exception& ex)
+ {
+ warning(ex);
+ }
+ catch(...)
+ {
+ warning();
+ }
+
+ if(_observer)
+ {
+ ObjectPrx proxy = getProxy();
+ if(!proxy || !proxy->ice_isTwoway())
+ {
+ _observer.detach();
+ }
+ }
+}
+
+void
+AsyncResult::invokeCompleted()
+{
+ assert(_callback);
+
+ try
+ {
+ AsyncResultPtr self(this);
+ _callback->completed(self);
+ }
+ catch(const std::exception& ex)
+ {
+ warning(ex);
+ }
+ catch(...)
+ {
+ warning();
+ }
+
+ _observer.detach();
+}
+
+void
+AsyncResult::cancel(const Ice::LocalException& ex)
+{
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _cancellationException.reset(ex.ice_clone());
+ if(!_cancellationHandler)
+ {
+ return;
+ }
+ }
+ _cancellationHandler->asyncRequestCanceled(OutgoingAsyncBasePtr::dynamicCast(this), ex);
+}
+
+void
+AsyncResult::cancelable(const CancellationHandlerPtr& handler)
+{
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ if(!_cancellationException.get())
+ {
+ _cancellationHandler = handler;
+ return;
+ }
+ }
+ handler->asyncRequestCanceled(OutgoingAsyncBasePtr::dynamicCast(this), *_cancellationException.get());
+}
+
+void
+AsyncResult::checkCanceled()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ if(_cancellationException.get())
+ {
+ _cancellationException->ice_throw();
+ }
+}
+
+void
+AsyncResult::warning(const std::exception& exc) const
+{
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ Warning out(_instance->initializationData().logger);
+ const Exception* ex = dynamic_cast<const Exception*>(&exc);
+ if(ex)
+ {
+ out << "Ice::Exception raised by AMI callback:\n" << *ex;
+ }
+ else
+ {
+ out << "std::exception raised by AMI callback:\n" << exc.what();
+ }
+ }
+}
+
+void
+AsyncResult::warning() const
+{
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "unknown exception raised by AMI callback";
+ }
+}
+
+
+namespace
+{
+
+//
+// Dummy class derived from CallbackBase
+// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn,
+// this allows us to test whether the user supplied a null delegate instance to the
+// generated begin_ method without having to generate a separate test to throw IllegalArgumentException
+// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated
+// object code.
+//
+class DummyCallback : public CallbackBase
+{
+public:
+
+ DummyCallback()
+ {
+ }
+
+ virtual void
+ completed(const Ice::AsyncResultPtr&) const
+ {
+ assert(false);
+ }
+
+ virtual CallbackBasePtr
+ verify(const Ice::LocalObjectPtr&)
+ {
+ //
+ // Called by the AsyncResult constructor to verify the delegate. The dummy
+ // delegate is passed when the user used a begin_ method without delegate.
+ // By returning 0 here, we tell the AsyncResult that no delegates was
+ // provided.
+ //
+ return 0;
+ }
+
+ virtual void
+ sent(const AsyncResultPtr&) const
+ {
+ assert(false);
+ }
+
+ virtual bool
+ hasSentCallback() const
+ {
+ assert(false);
+ return false;
+ }
+};
+
+}
+
+//
+// This gives a pointer value to compare against in the generated
+// begin_ method to decide whether the caller passed a null pointer
+// versus the generated inline version of the begin_ method having
+// passed a pointer to the dummy delegate.
+//
+CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback;
+
+#ifdef ICE_CPP11
+
+Ice::CallbackPtr
+Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& completed,
+ const ::IceInternal::Function<void (const AsyncResultPtr&)>& sent)
+{
+ class Cpp11CB : public GenericCallbackBase
+ {
+ public:
+
+ Cpp11CB(const ::std::function<void (const AsyncResultPtr&)>& completed,
+ const ::std::function<void (const AsyncResultPtr&)>& sent) :
+ _completed(completed),
+ _sent(sent)
+ {
+ checkCallback(true, completed != nullptr);
+ }
+
+ virtual void
+ completed(const AsyncResultPtr& result) const
+ {
+ _completed(result);
+ }
+
+ virtual CallbackBasePtr
+ verify(const LocalObjectPtr&)
+ {
+ return this; // Nothing to do, the cookie is not type-safe.
+ }
+
+ virtual void
+ sent(const AsyncResultPtr& result) const
+ {
+ if(_sent != nullptr)
+ {
+ _sent(result);
+ }
+ }
+
+ virtual bool
+ hasSentCallback() const
+ {
+ return _sent != nullptr;
+ }
+
+ private:
+
+ ::std::function< void (const AsyncResultPtr&)> _completed;
+ ::std::function< void (const AsyncResultPtr&)> _sent;
+ };
+
+ return new Cpp11CB(completed, sent);
+}
+#endif
+
+void
+IceInternal::CallbackBase::checkCallback(bool obj, bool cb)
+{
+ if(!obj)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback object cannot be null");
+ }
+ if(!cb)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback cannot be null");
+ }
+}
+
+