diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-10-10 12:03:07 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-10-10 12:03:07 +0200 |
commit | 570455a381e6620f8ddfcca448559d3fa545ba38 (patch) | |
tree | fe3fa45e6a643b473d9370babff6224b1a9d4dcb /cpp | |
parent | Fixed ICE-5726: provide deprecated public StringConverterPlugin (diff) | |
download | ice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.bz2 ice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.xz ice-570455a381e6620f8ddfcca448559d3fa545ba38.zip |
Fixed invocation timeouts/interrupt issues, addded AsyncResult.cancel()
Diffstat (limited to 'cpp')
50 files changed, 2160 insertions, 1967 deletions
diff --git a/cpp/include/Ice/AsyncResult.h b/cpp/include/Ice/AsyncResult.h new file mode 100644 index 00000000000..5452c5de970 --- /dev/null +++ b/cpp/include/Ice/AsyncResult.h @@ -0,0 +1,353 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_ASYNC_RESULT_H +#define ICE_ASYNC_RESULT_H + +#include <IceUtil/Monitor.h> +#include <IceUtil/Mutex.h> +#include <IceUtil/UniquePtr.h> +#include <Ice/LocalObject.h> +#include <Ice/CommunicatorF.h> +#include <Ice/ConnectionF.h> +#include <Ice/ProxyF.h> +#include <Ice/InstanceF.h> +#include <Ice/RequestHandlerF.h> +#include <Ice/AsyncResultF.h> +#include <Ice/ObserverHelper.h> +#include <Ice/BasicStream.h> + +#ifdef ICE_CPP11 +# include <functional> // for std::function +#endif + +namespace IceInternal +{ + +class CallbackBase; +typedef IceUtil::Handle<CallbackBase> CallbackBasePtr; + +} + +namespace Ice +{ + +class ICE_API AsyncResult : public Ice::LocalObject, private IceUtil::noncopyable +{ +public: + + void cancel(); + + Int getHash() const; + + CommunicatorPtr getCommunicator() const; + virtual ConnectionPtr getConnection() const; + virtual ObjectPrx getProxy() const; + + bool isCompleted() const; + void waitForCompleted(); + + bool isSent() const; + void waitForSent(); + + void throwLocalException() const; + + bool sentSynchronously() const; + LocalObjectPtr getCookie() const; + const std::string& getOperation() const; + + ::IceInternal::BasicStream* __startReadParams() + { + _is.startReadEncaps(); + return &_is; + } + void __endReadParams() + { + _is.endReadEncaps(); + } + void __readEmptyParams() + { + _is.skipEmptyEncaps(); + } + void __readParamEncaps(const ::Ice::Byte*& encaps, ::Ice::Int& sz) + { + _is.readEncaps(encaps, sz); + } + void __throwUserException(); + + bool __wait(); + + static void __check(const AsyncResultPtr&, const ::IceProxy::Ice::Object*, const ::std::string&); + static void __check(const AsyncResultPtr&, const Connection*, const ::std::string&); + static void __check(const AsyncResultPtr&, const Communicator*, const ::std::string&); + +protected: + + static void __check(const AsyncResultPtr&, const ::std::string&); + + AsyncResult(const CommunicatorPtr&, const IceInternal::InstancePtr&, const std::string&, + const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); + virtual ~AsyncResult(); // Must be heap-allocated + + bool sent(bool); + bool finished(bool); + bool finished(const Exception&); + + void invokeSentAsync(); + void invokeCompletedAsync(); + + void invokeSent(); + void invokeCompleted(); + + void cancel(const LocalException&); + void cancelable(const IceInternal::CancellationHandlerPtr&); + void checkCanceled(); + + void warning(const std::exception&) const; + void warning() const; + + // + // This virtual method is necessary for the communicator flush + // batch requests implementation. + // + virtual IceInternal::InvocationObserver& getObserver() + { + return _observer; + } + + const IceInternal::InstancePtr _instance; + IceInternal::InvocationObserver _observer; + Ice::ConnectionPtr _cachedConnection; + bool _sentSynchronously; + + IceInternal::BasicStream _is; + + IceUtil::Monitor<IceUtil::Mutex> _monitor; + +private: + + const CommunicatorPtr _communicator; + const std::string& _operation; + const IceInternal::CallbackBasePtr _callback; + const LocalObjectPtr _cookie; + IceUtil::UniquePtr<Exception> _exception; + + IceInternal::CancellationHandlerPtr _cancellationHandler; + IceUtil::UniquePtr<Ice::LocalException> _cancellationException; + + static const unsigned char OK; + static const unsigned char Done; + static const unsigned char Sent; + static const unsigned char EndCalled; + static const unsigned char Canceled; + unsigned char _state; +}; + +} + +namespace IceInternal +{ + +// +// Base class for all callbacks. +// +class ICE_API CallbackBase : public IceUtil::Shared +{ +public: + + void checkCallback(bool, bool); + + virtual void completed(const ::Ice::AsyncResultPtr&) const = 0; + virtual CallbackBasePtr verify(const ::Ice::LocalObjectPtr&) = 0; + virtual void sent(const ::Ice::AsyncResultPtr&) const = 0; + virtual bool hasSentCallback() const = 0; +}; + +// +// Base class for generic callbacks. +// +class ICE_API GenericCallbackBase : virtual public CallbackBase +{ +}; + +// +// See comments in OutgoingAsync.cpp +// +extern ICE_API CallbackBasePtr __dummyCallback; + +// +// Generic callback template that requires the caller to down-cast the +// proxy and the cookie that are obtained from the AsyncResult. +// +template<class T> +class AsyncCallback : public GenericCallbackBase +{ +public: + + typedef T callback_type; + typedef IceUtil::Handle<T> TPtr; + + typedef void (T::*Callback)(const ::Ice::AsyncResultPtr&); + + AsyncCallback(const TPtr& instance, Callback cb, Callback sentcb = 0) : + _callback(instance), _completed(cb), _sent(sentcb) + { + checkCallback(instance, cb != 0); + } + + virtual void completed(const ::Ice::AsyncResultPtr& result) const + { + (_callback.get()->*_completed)(result); + } + + virtual CallbackBasePtr verify(const ::Ice::LocalObjectPtr&) + { + return this; // Nothing to do, the cookie is not type-safe. + } + + virtual void sent(const ::Ice::AsyncResultPtr& result) const + { + if(_sent) + { + (_callback.get()->*_sent)(result); + } + } + + virtual bool hasSentCallback() const + { + return _sent != 0; + } + +private: + + TPtr _callback; + Callback _completed; + Callback _sent; +}; + +#ifdef ICE_CPP11 + +template<typename T> struct callback_type +{ + static const int value = 1; +}; + +template<> struct callback_type<void(const ::Ice::AsyncResultPtr&)> +{ + static const int value = 2; +}; + +template<> struct callback_type<void(const ::Ice::Exception&)> +{ + static const int value = 3; +}; + +template<typename Callable, typename = void> struct callable_type +{ + static const int value = 1; +}; + +template<class Callable> struct callable_type<Callable, typename ::std::enable_if< + ::std::is_class<Callable>::value && + !::std::is_bind_expression<Callable>::value>::type> +{ + template<typename T, T> struct TypeCheck; + template<typename T> struct AsyncResultCallback + { + typedef void (T::*ok)(const ::Ice::AsyncResultPtr&) const; + }; + template<typename T> struct ExceptionCallback + { + typedef void (T::*ok)(const ::Ice::Exception&) const; + }; + + typedef char (&other)[1]; + typedef char (&asyncResult)[2]; + typedef char (&exception)[3]; + + template<typename T> static other check(...); + template<typename T> static asyncResult check(TypeCheck<typename AsyncResultCallback<T>::ok, &T::operator()>*); + template<typename T> static exception check(TypeCheck<typename ExceptionCallback<T>::ok, &T::operator()>*); + + enum { value = sizeof(check<Callable>(0)) }; +}; + +template<> struct callable_type<void(*)(const ::Ice::AsyncResultPtr&)> +{ + static const int value = 2; +}; + +template<> struct callable_type<void(*)(const ::Ice::Exception&)> +{ + static const int value = 3; +}; + +template<typename Callable, typename Callback> struct is_callable +{ + static const bool value = callable_type<Callable>::value == callback_type<Callback>::value; +}; + +template<class S> class Function : public std::function<S> +{ + +public: + + template<typename T> Function(T f, typename ::std::enable_if<is_callable<T, S>::value>::type* = 0) + : std::function<S>(f) + { + } + + Function() + { + } + + Function(::std::nullptr_t) : ::std::function<S>(nullptr) + { + } +}; + +#endif + +} + +namespace Ice +{ + +typedef IceUtil::Handle< ::IceInternal::GenericCallbackBase> CallbackPtr; + +template<class T> CallbackPtr +newCallback(const IceUtil::Handle<T>& instance, + void (T::*cb)(const AsyncResultPtr&), + void (T::*sentcb)(const AsyncResultPtr&) = 0) +{ + return new ::IceInternal::AsyncCallback<T>(instance, cb, sentcb); +} + +template<class T> CallbackPtr +newCallback(T* instance, + void (T::*cb)(const AsyncResultPtr&), + void (T::*sentcb)(const AsyncResultPtr&) = 0) +{ + return new ::IceInternal::AsyncCallback<T>(instance, cb, sentcb); +} + +#ifdef ICE_CPP11 + +ICE_API CallbackPtr +newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>&, + const ::IceInternal::Function<void (const AsyncResultPtr&)>& = + ::IceInternal::Function<void (const AsyncResultPtr&)>()); +#endif + +// +// Operation callbacks are specified in Proxy.h +// +} + +#endif diff --git a/cpp/include/Ice/AsyncResultF.h b/cpp/include/Ice/AsyncResultF.h new file mode 100644 index 00000000000..dac5f9b68cf --- /dev/null +++ b/cpp/include/Ice/AsyncResultF.h @@ -0,0 +1,26 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_ASYNC_RESULT_F_H +#define ICE_ASYNC_RESULT_F_H + +#include <IceUtil/Shared.h> + +#include <Ice/Handle.h> + +namespace Ice +{ + +class AsyncResult; +ICE_API IceUtil::Shared* upCast(::Ice::AsyncResult*); +typedef IceInternal::Handle<AsyncResult> AsyncResultPtr; + +} + +#endif diff --git a/cpp/include/Ice/Initialize.h b/cpp/include/Ice/Initialize.h index 04271017221..7331d12dae5 100644 --- a/cpp/include/Ice/Initialize.h +++ b/cpp/include/Ice/Initialize.h @@ -10,6 +10,7 @@ #ifndef ICE_INITIALIZE_H #define ICE_INITIALIZE_H +#include <IceUtil/Timer.h> #include <Ice/CommunicatorF.h> #include <Ice/PropertiesF.h> #include <Ice/InstanceF.h> diff --git a/cpp/include/Ice/Outgoing.h b/cpp/include/Ice/Outgoing.h index e6697387603..c50b0a28aae 100644 --- a/cpp/include/Ice/Outgoing.h +++ b/cpp/include/Ice/Outgoing.h @@ -35,38 +35,63 @@ namespace IceInternal class CollocatedRequestHandler; // Forward declaration -class ICE_API OutgoingMessageCallback : private IceUtil::noncopyable +class ICE_API OutgoingBase : private IceUtil::noncopyable { public: - virtual ~OutgoingMessageCallback() { } + virtual ~OutgoingBase() { } virtual bool send(const Ice::ConnectionIPtr&, bool, bool) = 0; virtual void invokeCollocated(CollocatedRequestHandler*) = 0; virtual void sent() = 0; - virtual void finished(const Ice::Exception&) = 0; + virtual void completed(const Ice::Exception&) = 0; + + BasicStream* os() { return &_os; } + + void attachRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& endpt, Ice::Int requestId) + { + const Ice::Int size = static_cast<Ice::Int>(_os.b.size() - IceInternal::headerSize - 4); + _childObserver.attach(_observer.getRemoteObserver(c, endpt, requestId, size)); + } + + void attachCollocatedObserver(const Ice::ObjectAdapterPtr& adapter, Ice::Int requestId) + { + const Ice::Int size = static_cast<Ice::Int>(_os.b.size() - IceInternal::headerSize - 4); + _childObserver.attach(_observer.getCollocatedObserver(adapter, requestId, size)); + } + +protected: + + OutgoingBase(Instance*, const std::string&); + + BasicStream _os; + IceUtil::UniquePtr<Ice::Exception> _exception; + bool _sent; + InvocationObserver _observer; + ObserverHelperT<Ice::Instrumentation::ChildInvocationObserver> _childObserver; + + IceUtil::Monitor<IceUtil::Mutex> _monitor; }; -class ICE_API Outgoing : public OutgoingMessageCallback +class ICE_API Outgoing : public OutgoingBase { public: Outgoing(IceProxy::Ice::Object*, const std::string&, Ice::OperationMode, const Ice::Context*); ~Outgoing(); - bool invoke(); // Returns true if ok, false if user exception. - void abort(const Ice::LocalException&); - virtual bool send(const Ice::ConnectionIPtr&, bool, bool); virtual void invokeCollocated(CollocatedRequestHandler*); + virtual void sent(); - virtual void finished(const Ice::Exception&); + virtual void completed(const Ice::Exception&); - void finished(BasicStream&); + bool invoke(); // Returns true if ok, false if user exception. + void abort(const Ice::LocalException&); + void completed(BasicStream&); // Inlined for speed optimization. - BasicStream* os() { return &_os; } BasicStream* startReadParams() { _is.startReadEncaps(); @@ -117,20 +142,6 @@ public: void throwUserException(); - void attachRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& endpt, - Ice::Int requestId, Ice::Int size) - { - _childObserver.attach(_observer.getRemoteObserver(c, endpt, requestId, size)); - } - - void attachCollocatedObserver(const Ice::ObjectAdapterPtr& adapter, Ice::Int requestId) - { - _childObserver.attach(_observer.getCollocatedObserver(adapter, - requestId, - static_cast<Ice::Int>(_os.b.size() - - IceInternal::headerSize - 4))); - } - private: // @@ -140,9 +151,7 @@ private: IceProxy::Ice::Object* _proxy; Ice::OperationMode _mode; RequestHandlerPtr _handler; - IceUtil::UniquePtr<Ice::Exception> _exception; - InvocationObserver _observer; - ObserverHelperT<Ice::Instrumentation::ChildInvocationObserver> _childObserver; + IceUtil::Time _invocationTimeoutDeadline; enum { @@ -156,60 +165,27 @@ private: Ice::EncodingVersion _encoding; BasicStream _is; - BasicStream _os; - bool _sent; - - // - // NOTE: we use an attribute for the monitor instead of inheriting - // from the monitor template. Otherwise, the template would be - // exported from the DLL on Windows and could cause linker errors - // because of multiple definition of IceUtil::Monitor<IceUtil::Mutex>, - // see bug 1541. - // - IceUtil::Monitor<IceUtil::Mutex> _monitor; }; -class BatchOutgoing : public OutgoingMessageCallback +class FlushBatch : public OutgoingBase { public: - BatchOutgoing(IceProxy::Ice::Object*, const std::string&); - BatchOutgoing(Ice::ConnectionI*, Instance*, const std::string&); + FlushBatch(IceProxy::Ice::Object*, const std::string&); + FlushBatch(Ice::ConnectionI*, Instance*, const std::string&); void invoke(); virtual bool send(const Ice::ConnectionIPtr&, bool, bool); virtual void invokeCollocated(CollocatedRequestHandler*); - virtual void sent(); - virtual void finished(const Ice::Exception&); - - BasicStream* os() { return &_os; } - void attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt, Ice::Int sz) - { - _childObserver.attach(_observer.getRemoteObserver(connection, endpt, 0, sz)); - } - - void attachCollocatedObserver(const Ice::ObjectAdapterPtr& adapter, Ice::Int requestId) - { - _childObserver.attach(_observer.getCollocatedObserver(adapter, - requestId, - static_cast<Ice::Int>(_os.b.size() - - IceInternal::headerSize - 4))); - } + virtual void sent(); + virtual void completed(const Ice::Exception&); private: - IceUtil::Monitor<IceUtil::Mutex> _monitor; IceProxy::Ice::Object* _proxy; Ice::ConnectionI* _connection; - bool _sent; - IceUtil::UniquePtr<Ice::Exception> _exception; - - BasicStream _os; - - InvocationObserver _observer; - ObserverHelperT<Ice::Instrumentation::ChildInvocationObserver> _childObserver; }; } diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h index 422ad6d65db..a81419465fa 100644 --- a/cpp/include/Ice/OutgoingAsync.h +++ b/cpp/include/Ice/OutgoingAsync.h @@ -10,276 +10,151 @@ #ifndef ICE_OUTGOING_ASYNC_H #define ICE_OUTGOING_ASYNC_H -#include <IceUtil/Monitor.h> -#include <IceUtil/Mutex.h> #include <IceUtil/Timer.h> -#include <IceUtil/Exception.h> -#include <IceUtil/UniquePtr.h> #include <Ice/OutgoingAsyncF.h> -#include <Ice/RequestHandlerF.h> -#include <Ice/InstanceF.h> -#include <Ice/ReferenceF.h> +#include <Ice/AsyncResult.h> #include <Ice/CommunicatorF.h> #include <Ice/ConnectionIF.h> -#include <Ice/Current.h> -#include <Ice/BasicStream.h> -#include <Ice/ObserverHelper.h> #include <Ice/ObjectAdapterF.h> -#include <Ice/ThreadPoolF.h> - -#ifdef ICE_CPP11 -# include <functional> // for std::function -#endif namespace IceInternal { -class CallbackBase; -typedef IceUtil::Handle<CallbackBase> CallbackBasePtr; - -} - -namespace Ice -{ +class RetryException; +class CollocatedRequestHandler; -class ICE_API AsyncResult : virtual public Ice::LocalObject, protected IceUtil::TimerTask, private IceUtil::noncopyable +// +// Base class for handling asynchronous invocations. This class is +// responsible for the handling of the output stream and the child +// invocation observer. +// +class ICE_API OutgoingAsyncBase : public Ice::AsyncResult { public: - virtual Int getHash() const; - - virtual CommunicatorPtr getCommunicator() const - { - return _communicator; - } - - virtual ConnectionPtr getConnection() const - { - return 0; - } - - virtual ObjectPrx getProxy() const - { - return 0; - } - - bool isCompleted() const; - void waitForCompleted(); + // + // Those methods must be overriden if the invocation is sent + // through a request handler. + // + virtual AsyncStatus send(const Ice::ConnectionIPtr&, bool, bool) { assert(false); return AsyncStatusQueued; } + virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*) { assert(false); return AsyncStatusQueued; } - bool isSent() const; - void waitForSent(); + virtual bool sent(); + virtual bool completed(const Ice::Exception&); - void throwLocalException() const; + // Those methods are public when called from an OutgoingAsyncBase reference. + using Ice::AsyncResult::cancelable; + using Ice::AsyncResult::invokeSent; + using Ice::AsyncResult::invokeCompleted; + using Ice::AsyncResult::invokeCompletedAsync; - bool sentSynchronously() const + void attachRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& endpt, Ice::Int requestId) { - return _sentSynchronously; // No lock needed, immutable once __send() is called + const Ice::Int size = static_cast<Ice::Int>(_os.b.size() - headerSize - 4); + _childObserver.attach(getObserver().getRemoteObserver(c, endpt, requestId, size)); } - - LocalObjectPtr getCookie() const + + void attachCollocatedObserver(const Ice::ObjectAdapterPtr& adapter, Ice::Int requestId) { - return _cookie; // No lock needed, cookie is immutable + const Ice::Int size = static_cast<Ice::Int>(_os.b.size() - headerSize - 4); + _childObserver.attach(getObserver().getCollocatedObserver(adapter, requestId, size)); } - const std::string& getOperation() const - { - return _operation; - } - - ::IceInternal::BasicStream* - __getOs() + BasicStream* getOs() { return &_os; } - ::IceInternal::BasicStream* __startReadParams() - { - _is.startReadEncaps(); - return &_is; - } - void __endReadParams() - { - _is.endReadEncaps(); - } - void __readEmptyParams() - { - _is.skipEmptyEncaps(); - } - void __readParamEncaps(const ::Ice::Byte*& encaps, ::Ice::Int& sz) - { - _is.readEncaps(encaps, sz); - } - - bool __wait(); - void __throwUserException(); - virtual void __invokeExceptionAsync(const Exception&); - void __invokeCompleted(); - - static void __check(const AsyncResultPtr&, const ::IceProxy::Ice::Object*, const ::std::string&); - static void __check(const AsyncResultPtr&, const Connection*, const ::std::string&); - static void __check(const AsyncResultPtr&, const Communicator*, const ::std::string&); - - virtual void __invokeException(const Exception&); // Required to be public for AsynchronousException - void __invokeSent(); // Required to be public for AsynchronousSent - - void __attachRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& endpt, - Ice::Int requestId, Ice::Int sz) - { - _childObserver.attach(_observer.getRemoteObserver(c, endpt, requestId, sz)); - } - - void __attachCollocatedObserver(const Ice::ObjectAdapterPtr& adapter, Ice::Int requestId) - { - _childObserver.attach(_observer.getCollocatedObserver(adapter, - requestId, - static_cast<Ice::Int>(_os.b.size() - - IceInternal::headerSize - 4))); - } - - // - // Called by the retry queue to process retry. - // - virtual void __processRetry() = 0; - protected: - static void __check(const AsyncResultPtr&, const ::std::string&); - - AsyncResult(const CommunicatorPtr&, const IceInternal::InstancePtr&, const std::string&, - const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); - - void __invokeSentAsync(); - - void runTimerTask(); // Implementation of TimerTask::runTimerTask() - - void __warning(const std::exception&) const; - void __warning() const; - - virtual ~AsyncResult(); // Must be heap-allocated. - - const CommunicatorPtr _communicator; - const IceInternal::InstancePtr _instance; - const std::string& _operation; - Ice::ConnectionPtr _cachedConnection; - const IceInternal::CallbackBasePtr _callback; - const LocalObjectPtr _cookie; + OutgoingAsyncBase(const Ice::CommunicatorPtr&, const InstancePtr&, const std::string&, const CallbackBasePtr&, + const Ice::LocalObjectPtr&); - IceUtil::Monitor<IceUtil::Mutex> _monitor; - IceInternal::BasicStream _is; - IceInternal::BasicStream _os; + bool sent(bool); + bool finished(const Ice::Exception&); - IceInternal::RequestHandlerPtr _timeoutRequestHandler; + ObserverHelperT<Ice::Instrumentation::ChildInvocationObserver> _childObserver; - static const unsigned char OK; - static const unsigned char Done; - static const unsigned char Sent; - static const unsigned char EndCalled; - - unsigned char _state; - bool _sentSynchronously; - IceUtil::UniquePtr<Exception> _exception; - IceInternal::InvocationObserver _observer; - IceInternal::ObserverHelperT<Ice::Instrumentation::ChildInvocationObserver> _childObserver; + BasicStream _os; }; -} - -namespace IceInternal -{ - // -// See comments in OutgoingAsync.cpp +// Base class for proxy based invocations. This class handles the +// retry for proxy invocations. It also ensures the child observer is +// correct notified of failures and make sure the retry task is +// correctly canceled when the invocation completes. // -extern ICE_API CallbackBasePtr __dummyCallback; - -class CollocatedRequestHandler; - -// -// This interface is used by the connection to handle OutgoingAsync -// and BatchOutgoingAsync messages. -// -class ICE_API OutgoingAsyncMessageCallback : virtual public Ice::LocalObject +class ICE_API ProxyOutgoingAsyncBase : public OutgoingAsyncBase, protected IceUtil::TimerTask { public: - virtual ~OutgoingAsyncMessageCallback() - { - } + virtual Ice::ObjectPrx getProxy() const; - // - // Called by the request handler to send the request over the connection. - // - virtual IceInternal::AsyncStatus __send(const Ice::ConnectionIPtr&, bool, bool) = 0; + virtual bool sent(); + virtual bool completed(const Ice::Exception&); - // - // Called by the collocated request handler to invoke the request. - // - virtual IceInternal::AsyncStatus __invokeCollocated(CollocatedRequestHandler*) = 0; + void retry(); + void abort(const Ice::Exception&); - // - // Called by the connection when the message is confirmed sent. The connection is locked - // when this is called so this method can't call the sent callback. Instead, this method - // returns true if there's a sent callback and false otherwise. If true is returned, the - // connection will call the __invokeSentCallback() method bellow (which in turn should - // call the sent callback). - // - virtual bool __sent() = 0; +protected: - // - // Called by the connection to call the user sent callback. - // - virtual void __invokeSent() = 0; + ProxyOutgoingAsyncBase(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, + const Ice::LocalObjectPtr&); - // - // Called by the connection when the request failed. - // - virtual void __finished(const Ice::Exception&) = 0; + void invokeImpl(bool); - // - // Helper to dispatch invocation timeout. - // - void __dispatchInvocationTimeout(const ThreadPoolPtr&, const Ice::ConnectionPtr&); + bool sent(bool); + bool finished(const Ice::Exception&); + bool finished(bool); + + virtual void handleRetryException(const RetryException&); + virtual int handleException(const Ice::Exception&); + virtual void runTimerTask(); + + const Ice::ObjectPrx _proxy; + RequestHandlerPtr _handler; + Ice::OperationMode _mode; + +private: + + int _cnt; + bool _sent; }; -class ICE_API OutgoingAsync : public OutgoingAsyncMessageCallback, public Ice::AsyncResult +// +// Class for handling Slice operation invocations +// +class ICE_API OutgoingAsync : public ProxyOutgoingAsyncBase { public: OutgoingAsync(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&); - void __prepare(const std::string&, Ice::OperationMode, const Ice::Context*); + void prepare(const std::string&, Ice::OperationMode, const Ice::Context*); - virtual Ice::ObjectPrx - getProxy() const - { - return _proxy; - } + virtual AsyncStatus send(const Ice::ConnectionIPtr&, bool, bool); + virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*); - virtual IceInternal::AsyncStatus __send(const Ice::ConnectionIPtr&, bool, bool); - virtual IceInternal::AsyncStatus __invokeCollocated(CollocatedRequestHandler*); - virtual bool __sent(); - virtual void __invokeSent(); - virtual void __finished(const Ice::Exception&); - virtual void __invokeExceptionAsync(const Ice::Exception&); - virtual void __processRetry(); + void abort(const Ice::Exception&); - bool __finished(); - bool __invoke(bool); + void invoke(); + using ProxyOutgoingAsyncBase::completed; + bool completed(); - BasicStream* __startWriteParams(Ice::FormatType format) + BasicStream* startWriteParams(Ice::FormatType format) { _os.startWriteEncaps(_encoding, format); return &_os; } - void __endWriteParams() + void endWriteParams() { _os.endWriteEncaps(); } - void __writeEmptyParams() + void writeEmptyParams() { _os.writeEmptyEncaps(_encoding); } - void __writeParamEncaps(const ::Ice::Byte* encaps, ::Ice::Int size) + void writeParamEncaps(const ::Ice::Byte* encaps, ::Ice::Int size) { if(size == 0) { @@ -291,315 +166,95 @@ public: } } - ::IceInternal::BasicStream* - __getIs() + BasicStream* getIs() { return &_is; } private: - void handleException(const Ice::Exception&); - - const Ice::ObjectPrx _proxy; const Ice::EncodingVersion _encoding; - - RequestHandlerPtr _handler; - int _cnt; - bool _sent; - Ice::OperationMode _mode; -}; - -class ICE_API BatchOutgoingAsync : public OutgoingAsyncMessageCallback, public Ice::AsyncResult -{ -public: - - BatchOutgoingAsync(const Ice::CommunicatorPtr&, const InstancePtr&, const std::string&, const CallbackBasePtr&, - const Ice::LocalObjectPtr&); - - virtual IceInternal::AsyncStatus __send(const Ice::ConnectionIPtr&, bool, bool); - virtual IceInternal::AsyncStatus __invokeCollocated(CollocatedRequestHandler*); - virtual bool __sent(); - virtual void __invokeSent(); - virtual void __finished(const Ice::Exception&); - virtual void __processRetry(); -}; - -class ICE_API ProxyBatchOutgoingAsync : public BatchOutgoingAsync -{ -public: - - ProxyBatchOutgoingAsync(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, - const Ice::LocalObjectPtr&); - - void __invoke(); - - virtual Ice::ObjectPrx - getProxy() const - { - return _proxy; - } - -private: - - const Ice::ObjectPrx _proxy; }; -class ICE_API ConnectionBatchOutgoingAsync : public BatchOutgoingAsync -{ -public: - - ConnectionBatchOutgoingAsync(const Ice::ConnectionIPtr&, const Ice::CommunicatorPtr&, const InstancePtr&, - const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&); - - void __invoke(); - - virtual Ice::ConnectionPtr getConnection() const; - -private: - - const Ice::ConnectionIPtr _connection; -}; - -class ICE_API CommunicatorBatchOutgoingAsync : public Ice::AsyncResult -{ -public: - - CommunicatorBatchOutgoingAsync(const Ice::CommunicatorPtr&, const InstancePtr&, const std::string&, - const CallbackBasePtr&, const Ice::LocalObjectPtr&); - - void flushConnection(const Ice::ConnectionIPtr&); - void ready(); - - virtual void __processRetry(); - -private: - - void check(bool); - - int _useCount; -}; - -class ICE_API GetConnectionOutgoingAsync : public OutgoingAsyncMessageCallback, public Ice::AsyncResult +// +// Class for handling the proxy's begin_ice_flushBatchRequest request. +// +class ICE_API ProxyFlushBatch : public ProxyOutgoingAsyncBase { public: - GetConnectionOutgoingAsync(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, - const Ice::LocalObjectPtr&); + ProxyFlushBatch(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&); - void __invoke(); + virtual bool sent(); - virtual Ice::ObjectPrx - getProxy() const - { - return _proxy; - } + virtual AsyncStatus send(const Ice::ConnectionIPtr&, bool, bool); + virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*); - virtual AsyncStatus __send(const Ice::ConnectionIPtr&, bool, bool); - virtual AsyncStatus __invokeCollocated(CollocatedRequestHandler*); - virtual bool __sent(); - virtual void __invokeSent(); - virtual void __finished(const Ice::Exception&); - virtual void __processRetry(); + void invoke(); private: - void handleException(const Ice::Exception&); - - Ice::ObjectPrx _proxy; - RequestHandlerPtr _handler; - int _cnt; + virtual void handleRetryException(const RetryException&); + virtual int handleException(const Ice::Exception&); }; +typedef IceUtil::Handle<ProxyFlushBatch> ProxyFlushBatchPtr; // -// Base class for all callbacks. +// Class for handling the proxy's begin_ice_getConnection request. // -class ICE_API CallbackBase : public IceUtil::Shared +class ICE_API ProxyGetConnection : public ProxyOutgoingAsyncBase { public: - void checkCallback(bool, bool); + ProxyGetConnection(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&); - virtual void completed(const ::Ice::AsyncResultPtr&) const = 0; - virtual CallbackBasePtr verify(const ::Ice::LocalObjectPtr&) = 0; - virtual void sent(const ::Ice::AsyncResultPtr&) const = 0; - virtual bool hasSentCallback() const = 0; -}; + virtual AsyncStatus send(const Ice::ConnectionIPtr&, bool, bool); + virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*); -// -// Base class for generic callbacks. -// -class ICE_API GenericCallbackBase : virtual public CallbackBase -{ + void invoke(); }; +typedef IceUtil::Handle<ProxyGetConnection> ProxyGetConnectionPtr; // -// Generic callback template that requires the caller to down-cast the -// proxy and the cookie that are obtained from the AsyncResult. +// Class for handling Ice::Connection::begin_flushBatchRequests // -template<class T> -class AsyncCallback : public GenericCallbackBase +class ICE_API ConnectionFlushBatch : public OutgoingAsyncBase { public: - typedef T callback_type; - typedef IceUtil::Handle<T> TPtr; - - typedef void (T::*Callback)(const ::Ice::AsyncResultPtr&); - - AsyncCallback(const TPtr& instance, Callback cb, Callback sentcb = 0) : - _callback(instance), _completed(cb), _sent(sentcb) - { - checkCallback(instance, cb != 0); - } - - virtual void completed(const ::Ice::AsyncResultPtr& result) const - { - (_callback.get()->*_completed)(result); - } - - virtual CallbackBasePtr verify(const ::Ice::LocalObjectPtr&) - { - return this; // Nothing to do, the cookie is not type-safe. - } - - virtual void sent(const ::Ice::AsyncResultPtr& result) const - { - if(_sent) - { - (_callback.get()->*_sent)(result); - } - } + ConnectionFlushBatch(const Ice::ConnectionIPtr&, const Ice::CommunicatorPtr&, const InstancePtr&, + const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&); + + virtual Ice::ConnectionPtr getConnection() const; - virtual bool hasSentCallback() const - { - return _sent != 0; - } + void invoke(); private: - TPtr _callback; - Callback _completed; - Callback _sent; -}; - -#ifdef ICE_CPP11 - -template<typename T> struct callback_type -{ - static const int value = 1; -}; - -template<> struct callback_type<void(const ::Ice::AsyncResultPtr&)> -{ - static const int value = 2; -}; - -template<> struct callback_type<void(const ::Ice::Exception&)> -{ - static const int value = 3; -}; - -template<typename Callable, typename = void> struct callable_type -{ - static const int value = 1; -}; - -template<class Callable> struct callable_type<Callable, typename ::std::enable_if< - ::std::is_class<Callable>::value && - !::std::is_bind_expression<Callable>::value>::type> -{ - template<typename T, T> struct TypeCheck; - template<typename T> struct AsyncResultCallback - { - typedef void (T::*ok)(const ::Ice::AsyncResultPtr&) const; - }; - template<typename T> struct ExceptionCallback - { - typedef void (T::*ok)(const ::Ice::Exception&) const; - }; - - typedef char (&other)[1]; - typedef char (&asyncResult)[2]; - typedef char (&exception)[3]; - - template<typename T> static other check(...); - template<typename T> static asyncResult check(TypeCheck<typename AsyncResultCallback<T>::ok, &T::operator()>*); - template<typename T> static exception check(TypeCheck<typename ExceptionCallback<T>::ok, &T::operator()>*); - - enum { value = sizeof(check<Callable>(0)) }; -}; - -template<> struct callable_type<void(*)(const ::Ice::AsyncResultPtr&)> -{ - static const int value = 2; -}; - -template<> struct callable_type<void(*)(const ::Ice::Exception&)> -{ - static const int value = 3; -}; - -template<typename Callable, typename Callback> struct is_callable -{ - static const bool value = callable_type<Callable>::value == callback_type<Callback>::value; + const Ice::ConnectionIPtr _connection; }; +typedef IceUtil::Handle<ConnectionFlushBatch> ConnectionFlushBatchPtr; -template<class S> class Function : public std::function<S> +// +// Class for handling Ice::Communicator::begin_flushBatchRequests +// +class ICE_API CommunicatorFlushBatch : public Ice::AsyncResult { - public: - template<typename T> Function(T f, typename ::std::enable_if<is_callable<T, S>::value>::type* = 0) - : std::function<S>(f) - { - } - - Function() - { - } - - Function(::std::nullptr_t) : ::std::function<S>(nullptr) - { - } -}; - -#endif - -} - -namespace Ice -{ + CommunicatorFlushBatch(const Ice::CommunicatorPtr&, const InstancePtr&, const std::string&, + const CallbackBasePtr&, const Ice::LocalObjectPtr&); -typedef IceUtil::Handle< ::IceInternal::GenericCallbackBase> CallbackPtr; - -template<class T> CallbackPtr -newCallback(const IceUtil::Handle<T>& instance, - void (T::*cb)(const AsyncResultPtr&), - void (T::*sentcb)(const AsyncResultPtr&) = 0) -{ - return new ::IceInternal::AsyncCallback<T>(instance, cb, sentcb); -} + void flushConnection(const Ice::ConnectionIPtr&); + void ready(); -template<class T> CallbackPtr -newCallback(T* instance, - void (T::*cb)(const AsyncResultPtr&), - void (T::*sentcb)(const AsyncResultPtr&) = 0) -{ - return new ::IceInternal::AsyncCallback<T>(instance, cb, sentcb); -} +private: -#ifdef ICE_CPP11 + void check(bool); -ICE_API CallbackPtr -newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>&, - const ::IceInternal::Function<void (const AsyncResultPtr&)>& = - ::IceInternal::Function<void (const AsyncResultPtr&)>()); -#endif + int _useCount; +}; -// -// Operation callbacks are specified in Proxy.h -// } #endif diff --git a/cpp/include/Ice/OutgoingAsyncF.h b/cpp/include/Ice/OutgoingAsyncF.h index 9675fa6c468..811a588e0b3 100644 --- a/cpp/include/Ice/OutgoingAsyncF.h +++ b/cpp/include/Ice/OutgoingAsyncF.h @@ -14,45 +14,24 @@ #include <Ice/Handle.h> -namespace Ice -{ - -class AsyncResult; -ICE_API IceUtil::Shared* upCast(::Ice::AsyncResult*); -typedef IceInternal::Handle<AsyncResult> AsyncResultPtr; - -} - namespace IceInternal { +class OutgoingAsyncBase; +ICE_API IceUtil::Shared* upCast(OutgoingAsyncBase*); +typedef IceInternal::Handle<OutgoingAsyncBase> OutgoingAsyncBasePtr; + class OutgoingAsync; ICE_API IceUtil::Shared* upCast(OutgoingAsync*); typedef IceInternal::Handle<OutgoingAsync> OutgoingAsyncPtr; -class OutgoingAsyncMessageCallback; -ICE_API IceUtil::Shared* upCast(OutgoingAsyncMessageCallback*); -typedef IceInternal::Handle<OutgoingAsyncMessageCallback> OutgoingAsyncMessageCallbackPtr; - -class BatchOutgoingAsync; -ICE_API IceUtil::Shared* upCast(BatchOutgoingAsync*); -typedef IceInternal::Handle<BatchOutgoingAsync> BatchOutgoingAsyncPtr; - -class ProxyBatchOutgoingAsync; -ICE_API IceUtil::Shared* upCast(ProxyBatchOutgoingAsync*); -typedef IceInternal::Handle<ProxyBatchOutgoingAsync> ProxyBatchOutgoingAsyncPtr; - -class ConnectionBatchOutgoingAsync; -ICE_API IceUtil::Shared* upCast(ConnectionBatchOutgoingAsync*); -typedef IceInternal::Handle<ConnectionBatchOutgoingAsync> ConnectionBatchOutgoingAsyncPtr; - -class CommunicatorBatchOutgoingAsync; -ICE_API IceUtil::Shared* upCast(CommunicatorBatchOutgoingAsync*); -typedef IceInternal::Handle<CommunicatorBatchOutgoingAsync> CommunicatorBatchOutgoingAsyncPtr; +class ProxyOutgoingAsyncBase; +ICE_API IceUtil::Shared* upCast(ProxyOutgoingAsyncBase*); +typedef IceInternal::Handle<ProxyOutgoingAsyncBase> ProxyOutgoingAsyncBasePtr; -class GetConnectionOutgoingAsync; -ICE_API IceUtil::Shared* upCast(GetConnectionOutgoingAsync*); -typedef IceInternal::Handle<GetConnectionOutgoingAsync> GetConnectionOutgoingAsyncPtr; +class CommunicatorFlushBatch; +ICE_API IceUtil::Shared* upCast(CommunicatorFlushBatch*); +typedef IceInternal::Handle<CommunicatorFlushBatch> CommunicatorFlushBatchPtr; } diff --git a/cpp/include/Ice/Proxy.h b/cpp/include/Ice/Proxy.h index 6bbd1c7ffe7..0600916d324 100644 --- a/cpp/include/Ice/Proxy.h +++ b/cpp/include/Ice/Proxy.h @@ -21,7 +21,7 @@ #include <Ice/ObjectF.h> #include <Ice/ObjectAdapterF.h> #include <Ice/ReferenceF.h> -#include <Ice/OutgoingAsync.h> +#include <Ice/AsyncResult.h> //#include <Ice/RouterF.h> // Can't include RouterF.h here, otherwise we have cyclic includes //#include <Ice/LocatorF.h> // Can't include RouterF.h here, otherwise we have cyclic includes #include <Ice/Current.h> @@ -1378,8 +1378,6 @@ public: } }; - - template<class T, typename CT> class TwowayCallback : public Callback<T, CT> { diff --git a/cpp/include/Ice/RequestHandlerF.h b/cpp/include/Ice/RequestHandlerF.h index 09013ba6fc8..3d63656edc5 100644 --- a/cpp/include/Ice/RequestHandlerF.h +++ b/cpp/include/Ice/RequestHandlerF.h @@ -20,6 +20,10 @@ class RequestHandler; ICE_API IceUtil::Shared* upCast(RequestHandler*); typedef IceInternal::Handle<RequestHandler> RequestHandlerPtr; +class CancellationHandler; +ICE_API IceUtil::Shared* upCast(CancellationHandler*); +typedef IceInternal::Handle<CancellationHandler> CancellationHandlerPtr; + } #endif diff --git a/cpp/src/Ice/AsyncResult.cpp b/cpp/src/Ice/AsyncResult.cpp new file mode 100644 index 00000000000..03abc55e020 --- /dev/null +++ b/cpp/src/Ice/AsyncResult.cpp @@ -0,0 +1,602 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <IceUtil/DisableWarnings.h> +#include <Ice/AsyncResult.h> +#include <Ice/ThreadPool.h> +#include <Ice/Instance.h> +#include <Ice/LoggerUtil.h> +#include <Ice/Properties.h> +#include <Ice/RequestHandler.h> +#include <Ice/OutgoingAsync.h> + +using namespace std; +using namespace Ice; +using namespace IceInternal; + +IceUtil::Shared* Ice::upCast(AsyncResult* p) { return p; } + +const unsigned char Ice::AsyncResult::OK = 0x1; +const unsigned char Ice::AsyncResult::Done = 0x2; +const unsigned char Ice::AsyncResult::Sent = 0x4; +const unsigned char Ice::AsyncResult::EndCalled = 0x8; + +void +AsyncResult::cancel() +{ + cancel(InvocationCanceledException(__FILE__, __LINE__)); +} + +Int +AsyncResult::getHash() const +{ + return static_cast<Int>(reinterpret_cast<Long>(this) >> 4); +} + +CommunicatorPtr +AsyncResult::getCommunicator() const +{ + return _communicator; +} + +ConnectionPtr +AsyncResult::getConnection() const +{ + return 0; +} + +ObjectPrx +AsyncResult::getProxy() const +{ + return 0; +} + +bool +AsyncResult::isCompleted() const +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + return _state & Done; +} + +void +AsyncResult::waitForCompleted() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + while(!(_state & Done)) + { + _monitor.wait(); + } +} + +bool +AsyncResult::isSent() const +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + return _state & Sent; +} + +void +AsyncResult::waitForSent() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + while(!(_state & Sent) && !_exception.get()) + { + _monitor.wait(); + } +} + +void +AsyncResult::throwLocalException() const +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(_exception.get()) + { + _exception.get()->ice_throw(); + } +} + +bool +AsyncResult::sentSynchronously() const +{ + return _sentSynchronously; +} + +LocalObjectPtr +AsyncResult::getCookie() const +{ + return _cookie; +} + +const std::string& +AsyncResult::getOperation() const +{ + return _operation; +} + +void +AsyncResult::__throwUserException() +{ + try + { + _is.startReadEncaps(); + _is.throwException(); + } + catch(const Ice::UserException&) + { + _is.endReadEncaps(); + throw; + } +} + +bool +AsyncResult::__wait() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(_state & EndCalled) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once"); + } + _state |= EndCalled; + while(!(_state & Done)) + { + _monitor.wait(); + } + if(_exception.get()) + { + _exception.get()->ice_throw(); + } + return _state & OK; +} + +void +AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation) +{ + __check(r, operation); + if(r->getProxy().get() != prx) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Proxy for call to end_" + operation + + " does not match proxy that was used to call corresponding begin_" + + operation + " method"); + } +} + +void +AsyncResult::__check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation) +{ + __check(r, operation); + if(r->getCommunicator().get() != com) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Communicator for call to end_" + operation + + " does not match communicator that was used to call corresponding " + + "begin_" + operation + " method"); + } +} + +void +AsyncResult::__check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation) +{ + __check(r, operation); + if(r->getConnection().get() != con) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Connection for call to end_" + operation + + " does not match connection that was used to call corresponding " + + "begin_" + operation + " method"); + } +} + +void +AsyncResult::__check(const AsyncResultPtr& r, const string& operation) +{ + if(!r) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "AsyncResult == null"); + } + else if(&r->_operation != &operation) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Incorrect operation for end_" + operation + + " method: " + r->_operation); + } +} + +AsyncResult::AsyncResult(const CommunicatorPtr& communicator, + const IceInternal::InstancePtr& instance, + const string& op, + const CallbackBasePtr& del, + const LocalObjectPtr& cookie) : + _instance(instance), + _sentSynchronously(false), + _is(instance.get(), Ice::currentProtocolEncoding), + _communicator(communicator), + _operation(op), + _callback(del), + _cookie(cookie), + _state(0) +{ + if(!_callback) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__); + } + const_cast<CallbackBasePtr&>(_callback) = _callback->verify(_cookie); +} + +AsyncResult::~AsyncResult() +{ +} + +bool +AsyncResult::sent(bool done) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + assert(!_exception.get()); + + bool alreadySent = _state & Sent; + _state |= Sent; + if(done) + { + _state |= Done | OK; + _cancellationHandler = 0; + if(!_callback || !_callback->hasSentCallback()) + { + _observer.detach(); + } + } + + _monitor.notifyAll(); + return !alreadySent && _callback && _callback->hasSentCallback(); +} + +bool +AsyncResult::finished(bool ok) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _state |= Done; + if(ok) + { + _state |= OK; + } + _cancellationHandler = 0; + if(!_callback) + { + _observer.detach(); + } + _monitor.notifyAll(); + return _callback; +} + +bool +AsyncResult::finished(const Ice::Exception& ex) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _state |= Done; + _exception.reset(ex.ice_clone()); + _cancellationHandler = 0; + _observer.failed(ex.ice_name()); + if(!_callback) + { + _observer.detach(); + } + _monitor.notifyAll(); + return _callback; +} + +void +AsyncResult::invokeSentAsync() +{ + class AsynchronousSent : public DispatchWorkItem + { + public: + + AsynchronousSent(const ConnectionPtr& connection, const AsyncResultPtr& result) : + DispatchWorkItem(connection), _result(result) + { + } + + virtual void + run() + { + _result->invokeSent(); + } + + private: + + const AsyncResultPtr _result; + }; + + // + // This is called when it's not safe to call the sent callback + // synchronously from this thread. Instead the exception callback + // is called asynchronously from the client thread pool. + // + try + { + _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, this)); + } + catch(const Ice::CommunicatorDestroyedException&) + { + } +} + +void +AsyncResult::invokeCompletedAsync() +{ + class AsynchronousCompleted : public DispatchWorkItem + { + public: + + AsynchronousCompleted(const ConnectionPtr& connection, const AsyncResultPtr& result) : + DispatchWorkItem(connection), _result(result) + { + } + + virtual void + run() + { + _result->invokeCompleted(); + } + + private: + + const AsyncResultPtr _result; + }; + + // + // CommunicatorDestroyedCompleted is the only exception that can propagate directly + // from this method. + // + _instance->clientThreadPool()->dispatch(new AsynchronousCompleted(_cachedConnection, this)); +} + +void +AsyncResult::invokeSent() +{ + assert(_callback); + + try + { + AsyncResultPtr self(this); + _callback->sent(self); + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + if(_observer) + { + ObjectPrx proxy = getProxy(); + if(!proxy || !proxy->ice_isTwoway()) + { + _observer.detach(); + } + } +} + +void +AsyncResult::invokeCompleted() +{ + assert(_callback); + + try + { + AsyncResultPtr self(this); + _callback->completed(self); + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + _observer.detach(); +} + +void +AsyncResult::cancel(const Ice::LocalException& ex) +{ + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _cancellationException.reset(ex.ice_clone()); + if(!_cancellationHandler) + { + return; + } + } + _cancellationHandler->asyncRequestCanceled(OutgoingAsyncBasePtr::dynamicCast(this), ex); +} + +void +AsyncResult::cancelable(const CancellationHandlerPtr& handler) +{ + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(!_cancellationException.get()) + { + _cancellationHandler = handler; + return; + } + } + handler->asyncRequestCanceled(OutgoingAsyncBasePtr::dynamicCast(this), *_cancellationException.get()); +} + +void +AsyncResult::checkCanceled() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(_cancellationException.get()) + { + _cancellationException->ice_throw(); + } +} + +void +AsyncResult::warning(const std::exception& exc) const +{ + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + Warning out(_instance->initializationData().logger); + const Exception* ex = dynamic_cast<const Exception*>(&exc); + if(ex) + { + out << "Ice::Exception raised by AMI callback:\n" << *ex; + } + else + { + out << "std::exception raised by AMI callback:\n" << exc.what(); + } + } +} + +void +AsyncResult::warning() const +{ + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + Warning out(_instance->initializationData().logger); + out << "unknown exception raised by AMI callback"; + } +} + + +namespace +{ + +// +// Dummy class derived from CallbackBase +// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn, +// this allows us to test whether the user supplied a null delegate instance to the +// generated begin_ method without having to generate a separate test to throw IllegalArgumentException +// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated +// object code. +// +class DummyCallback : public CallbackBase +{ +public: + + DummyCallback() + { + } + + virtual void + completed(const Ice::AsyncResultPtr&) const + { + assert(false); + } + + virtual CallbackBasePtr + verify(const Ice::LocalObjectPtr&) + { + // + // Called by the AsyncResult constructor to verify the delegate. The dummy + // delegate is passed when the user used a begin_ method without delegate. + // By returning 0 here, we tell the AsyncResult that no delegates was + // provided. + // + return 0; + } + + virtual void + sent(const AsyncResultPtr&) const + { + assert(false); + } + + virtual bool + hasSentCallback() const + { + assert(false); + return false; + } +}; + +} + +// +// This gives a pointer value to compare against in the generated +// begin_ method to decide whether the caller passed a null pointer +// versus the generated inline version of the begin_ method having +// passed a pointer to the dummy delegate. +// +CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback; + +#ifdef ICE_CPP11 + +Ice::CallbackPtr +Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& completed, + const ::IceInternal::Function<void (const AsyncResultPtr&)>& sent) +{ + class Cpp11CB : public GenericCallbackBase + { + public: + + Cpp11CB(const ::std::function<void (const AsyncResultPtr&)>& completed, + const ::std::function<void (const AsyncResultPtr&)>& sent) : + _completed(completed), + _sent(sent) + { + checkCallback(true, completed != nullptr); + } + + virtual void + completed(const AsyncResultPtr& result) const + { + _completed(result); + } + + virtual CallbackBasePtr + verify(const LocalObjectPtr&) + { + return this; // Nothing to do, the cookie is not type-safe. + } + + virtual void + sent(const AsyncResultPtr& result) const + { + if(_sent != nullptr) + { + _sent(result); + } + } + + virtual bool + hasSentCallback() const + { + return _sent != nullptr; + } + + private: + + ::std::function< void (const AsyncResultPtr&)> _completed; + ::std::function< void (const AsyncResultPtr&)> _sent; + }; + + return new Cpp11CB(completed, sent); +} +#endif + +void +IceInternal::CallbackBase::checkCallback(bool obj, bool cb) +{ + if(!obj) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback object cannot be null"); + } + if(!cb) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback cannot be null"); + } +} + + diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 0b08198726a..3fc321735cb 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -14,6 +14,8 @@ #include <Ice/Reference.h> #include <Ice/Instance.h> #include <Ice/TraceLevels.h> +#include <Ice/Outgoing.h> +#include <Ice/OutgoingAsync.h> #include <Ice/TraceUtil.h> @@ -28,7 +30,7 @@ class InvokeAll : public DispatchWorkItem { public: - InvokeAll(OutgoingMessageCallback* out, + InvokeAll(OutgoingBase* out, BasicStream* os, CollocatedRequestHandler* handler, Int requestId, @@ -49,7 +51,7 @@ public: private: - OutgoingMessageCallback* _out; + OutgoingBase* _out; BasicStream* _os; CollocatedRequestHandlerPtr _handler; Int _requestId; @@ -61,7 +63,7 @@ class InvokeAllAsync : public DispatchWorkItem { public: - InvokeAllAsync(const OutgoingAsyncMessageCallbackPtr& outAsync, + InvokeAllAsync(const OutgoingAsyncBasePtr& outAsync, BasicStream* os, CollocatedRequestHandler* handler, Int requestId, @@ -82,7 +84,7 @@ public: private: - OutgoingAsyncMessageCallbackPtr _outAsync; + OutgoingAsyncBasePtr _outAsync; BasicStream* _os; CollocatedRequestHandlerPtr _handler; Int _requestId; @@ -113,7 +115,7 @@ public: private: const CollocatedRequestHandlerPtr _handler; - const OutgoingAsyncMessageCallbackPtr _outAsync; + const OutgoingAsyncBasePtr _outAsync; BasicStream _stream; Int _invokeNum; }; @@ -261,24 +263,24 @@ CollocatedRequestHandler::abortBatchRequest() } bool -CollocatedRequestHandler::sendRequest(OutgoingMessageCallback* out) +CollocatedRequestHandler::sendRequest(OutgoingBase* out) { out->invokeCollocated(this); return !_response && _reference->getInvocationTimeout() == 0; } AsyncStatus -CollocatedRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& outAsync) +CollocatedRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& outAsync) { - return outAsync->__invokeCollocated(this); + return outAsync->invokeCollocated(this); } void -CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) +CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalException& ex) { Lock sync(*this); - map<OutgoingMessageCallback*, Int>::iterator p = _sendRequests.find(out); + map<OutgoingBase*, Int>::iterator p = _sendRequests.find(out); if(p != _sendRequests.end()) { if(p->second > 0) @@ -286,7 +288,7 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) _requests.erase(p->second); } InvocationTimeoutException ex(__FILE__, __LINE__); - out->finished(ex); + out->completed(ex); _sendRequests.erase(p); return; } @@ -299,7 +301,7 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) if(q->second == o) { InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex); + o->completed(ex); _requests.erase(q); return; // We're done. } @@ -307,12 +309,12 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) } } -void -CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) +void +CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex) { Lock sync(*this); - - map<OutgoingAsyncMessageCallbackPtr, Int>::iterator p = _sendAsyncRequests.find(outAsync); + + map<OutgoingAsyncBasePtr, Int>::iterator p = _sendAsyncRequests.find(outAsync); if(p != _sendAsyncRequests.end()) { if(p->second > 0) @@ -320,7 +322,10 @@ CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbac _asyncRequests.erase(p->second); } _sendAsyncRequests.erase(p); - outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0); + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } return; } @@ -332,7 +337,10 @@ CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbac if(q->second.get() == o.get()) { _asyncRequests.erase(q); - outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0); + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } return; } } @@ -391,16 +399,17 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync) { _sendAsyncRequests.insert(make_pair(outAsync, requestId)); } + outAsync->cancelable(this); } - outAsync->__attachCollocatedObserver(_adapter, requestId); + outAsync->attachCollocatedObserver(_adapter, requestId); - _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, requestId, 1, false)); + _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, requestId, 1, false)); return AsyncStatusQueued; } void -CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out) +CollocatedRequestHandler::invokeBatchRequests(OutgoingBase* out) { Int invokeNum; { @@ -457,7 +466,7 @@ CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out) } AsyncStatus -CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync) +CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync) { Int invokeNum; { @@ -473,10 +482,12 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync) if(_reference->getInvocationTimeout() > 0) { _sendAsyncRequests.insert(make_pair(outAsync, 0)); + + outAsync->cancelable(this); } assert(!_batchStream.b.empty()); - _batchStream.swap(*outAsync->__getOs()); + _batchStream.swap(*outAsync->getOs()); // // Reset the batch stream. @@ -488,14 +499,14 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync) } } - outAsync->__attachCollocatedObserver(_adapter, 0); + outAsync->attachCollocatedObserver(_adapter, 0); if(invokeNum > 0) { - _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, 0, invokeNum,true)); + _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, 0, invokeNum,true)); return AsyncStatusQueued; } - else if(outAsync->__sent()) + else if(outAsync->sent()) { return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback); } @@ -512,7 +523,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte) { Lock sync(*this); assert(_response); - + os->i = os->b.begin() + sizeof(replyHdr) + 4; if(_traceLevels->protocol >= 1) @@ -524,7 +535,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte) map<int, Outgoing*>::iterator p = _requests.find(requestId); if(p != _requests.end()) { - p->second->finished(*os); + p->second->completed(*os); _requests.erase(p); } else @@ -532,17 +543,21 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte) map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId); if(q != _asyncRequests.end()) { - os->swap(*q->second->__getIs()); - outAsync = q->second; + os->swap(*q->second->getIs()); + if(q->second->completed()) + { + outAsync = q->second; + } _asyncRequests.erase(q); } } } - if(outAsync && outAsync->__finished()) + if(outAsync) { - outAsync->__invokeCompleted(); + outAsync->invokeCompleted(); } + _adapter->decDirectCount(); } @@ -563,12 +578,7 @@ CollocatedRequestHandler::systemException(Int requestId, const SystemException& void CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum) { - if(requestId > 0) - { - Lock sync(*this); - _requests.erase(requestId); - _asyncRequests.erase(requestId); - } + handleException(requestId, ex); _adapter->decDirectCount(); } @@ -585,7 +595,7 @@ CollocatedRequestHandler::waitForConnection() } bool -CollocatedRequestHandler::sent(OutgoingMessageCallback* out) +CollocatedRequestHandler::sent(OutgoingBase* out) { if(_reference->getInvocationTimeout() > 0) { @@ -600,7 +610,7 @@ CollocatedRequestHandler::sent(OutgoingMessageCallback* out) } bool -CollocatedRequestHandler::sentAsync(OutgoingAsyncMessageCallback* outAsync) +CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync) { if(_reference->getInvocationTimeout() > 0) { @@ -610,9 +620,9 @@ CollocatedRequestHandler::sentAsync(OutgoingAsyncMessageCallback* outAsync) return false; // The request timed-out. } } - if(outAsync->__sent()) + if(outAsync->sent()) { - outAsync->__invokeSent(); + outAsync->invokeSent(); } return true; } @@ -684,7 +694,7 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex) map<int, Outgoing*>::iterator p = _requests.find(requestId); if(p != _requests.end()) { - p->second->finished(ex); + p->second->completed(ex); _requests.erase(p); } else @@ -692,13 +702,17 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex) map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId); if(q != _asyncRequests.end()) { - outAsync = q->second; + if(q->second->completed(ex)) + { + outAsync = q->second; + } _asyncRequests.erase(q); } } } + if(outAsync) { - outAsync->__finished(ex); + outAsync->invokeCompleted(); } } diff --git a/cpp/src/Ice/CollocatedRequestHandler.h b/cpp/src/Ice/CollocatedRequestHandler.h index a3ac1387045..3930c12ce1c 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.h +++ b/cpp/src/Ice/CollocatedRequestHandler.h @@ -31,10 +31,10 @@ typedef IceUtil::Handle<ObjectAdapterI> ObjectAdapterIPtr; namespace IceInternal { +class OutgoingBase; class Outgoing; -class BatchOutgoing; +class OutgoingAsyncBase; class OutgoingAsync; -class BatchOutgoingAsync; class CollocatedRequestHandler : public RequestHandler, public ResponseHandler, private IceUtil::Monitor<IceUtil::Mutex> { @@ -50,11 +50,11 @@ public: virtual void finishBatchRequest(BasicStream*); virtual void abortBatchRequest(); - virtual bool sendRequest(OutgoingMessageCallback*); - virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr&); + virtual bool sendRequest(OutgoingBase*); + virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncBasePtr&); - virtual void requestTimedOut(OutgoingMessageCallback*); - virtual void asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr&); + virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&); + virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&); virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte); virtual void sendNoResponse(); @@ -68,11 +68,11 @@ public: void invokeRequest(Outgoing*); AsyncStatus invokeAsyncRequest(OutgoingAsync*); - void invokeBatchRequests(BatchOutgoing*); - AsyncStatus invokeAsyncBatchRequests(BatchOutgoingAsync*); + void invokeBatchRequests(OutgoingBase*); + AsyncStatus invokeAsyncBatchRequests(OutgoingAsyncBase*); - bool sent(OutgoingMessageCallback*); - bool sentAsync(OutgoingAsyncMessageCallback*); + bool sent(OutgoingBase*); + bool sentAsync(OutgoingAsyncBase*); void invokeAll(BasicStream*, Ice::Int, Ice::Int, bool); @@ -88,8 +88,8 @@ private: int _requestId; - std::map<OutgoingMessageCallback*, Ice::Int> _sendRequests; - std::map<OutgoingAsyncMessageCallbackPtr, Ice::Int> _sendAsyncRequests; + std::map<OutgoingBase*, Ice::Int> _sendRequests; + std::map<OutgoingAsyncBasePtr, Ice::Int> _sendAsyncRequests; std::map<Ice::Int, Outgoing*> _requests; std::map<Ice::Int, OutgoingAsyncPtr> _asyncRequests; diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp index 9b7ce8b8cd8..a7c2aee35d2 100644 --- a/cpp/src/Ice/CommunicatorI.cpp +++ b/cpp/src/Ice/CommunicatorI.cpp @@ -21,6 +21,7 @@ #include <Ice/DefaultsAndOverrides.h> #include <Ice/TraceLevels.h> #include <Ice/Router.h> +#include <Ice/OutgoingAsync.h> #include <IceUtil/Mutex.h> #include <IceUtil/MutexPtrLock.h> #include <IceUtil/UUID.h> @@ -198,8 +199,7 @@ Ice::CommunicatorI::getPluginManager() const void Ice::CommunicatorI::flushBatchRequests() { - AsyncResultPtr r = begin_flushBatchRequests(); - end_flushBatchRequests(r); + end_flushBatchRequests(begin_flushBatchRequests()); } AsyncResultPtr @@ -223,9 +223,8 @@ Ice::CommunicatorI::begin_flushBatchRequests(const Callback_Communicator_flushBa #ifdef ICE_CPP11 AsyncResultPtr -Ice::CommunicatorI::begin_flushBatchRequests( - const IceInternal::Function<void (const Exception&)>& exception, - const IceInternal::Function<void (bool)>& sent) +Ice::CommunicatorI::begin_flushBatchRequests(const IceInternal::Function<void (const Exception&)>& exception, + const IceInternal::Function<void (bool)>& sent) { class Cpp11CB : public IceInternal::Cpp11FnCallbackNC { @@ -268,8 +267,7 @@ const ::std::string __flushBatchRequests_name = "flushBatchRequests"; } AsyncResultPtr -Ice::CommunicatorI::__begin_flushBatchRequests(const IceInternal::CallbackBasePtr& cb, - const LocalObjectPtr& cookie) +Ice::CommunicatorI::__begin_flushBatchRequests(const IceInternal::CallbackBasePtr& cb, const LocalObjectPtr& cookie) { OutgoingConnectionFactoryPtr connectionFactory = _instance->outgoingConnectionFactory(); ObjectAdapterFactoryPtr adapterFactory = _instance->objectAdapterFactory(); @@ -278,8 +276,11 @@ Ice::CommunicatorI::__begin_flushBatchRequests(const IceInternal::CallbackBasePt // This callback object receives the results of all invocations // of Connection::begin_flushBatchRequests. // - CommunicatorBatchOutgoingAsyncPtr result = - new CommunicatorBatchOutgoingAsync(this, _instance, __flushBatchRequests_name, cb, cookie); + CommunicatorFlushBatchPtr result = new CommunicatorFlushBatch(this, + _instance, + __flushBatchRequests_name, + cb, + cookie); connectionFactory->flushAsyncBatchRequests(result); adapterFactory->flushAsyncBatchRequests(result); diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 5a9a84c83b1..7d7f4a9291c 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -49,7 +49,7 @@ class FlushSentRequests : public DispatchWorkItem { public: - FlushSentRequests(const Ice::ConnectionPtr& connection, const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) : + FlushSentRequests(const Ice::ConnectionPtr& connection, const vector<OutgoingAsyncBasePtr>& callbacks) : DispatchWorkItem(connection), _callbacks(callbacks) { } @@ -57,15 +57,15 @@ public: virtual void run() { - for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = _callbacks.begin(); p != _callbacks.end(); ++p) + for(vector<OutgoingAsyncBasePtr>::const_iterator p = _callbacks.begin(); p != _callbacks.end(); ++p) { - (*p)->__invokeSent(); + (*p)->invokeSent(); } } private: - vector<OutgoingAsyncMessageCallbackPtr> _callbacks; + vector<OutgoingAsyncBasePtr> _callbacks; }; }; @@ -202,7 +202,7 @@ ConnectRequestHandler::abortBatchRequest() } bool -ConnectRequestHandler::sendRequest(OutgoingMessageCallback* out) +ConnectRequestHandler::sendRequest(OutgoingBase* out) { { Lock sync(*this); @@ -225,7 +225,7 @@ ConnectRequestHandler::sendRequest(OutgoingMessageCallback* out) } AsyncStatus -ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& out) +ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out) { { Lock sync(*this); @@ -236,6 +236,7 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& o Request req; req.outAsync = out; _requests.push_back(req); + out->cancelable(this); return AsyncStatusQueued; } } @@ -244,11 +245,11 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& o throw RetryException(ex); } } - return out->__send(_connection, _compress, _response); + return out->send(_connection, _compress, _response); } void -ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out) +ConnectRequestHandler::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex) { { Lock sync(*this); @@ -263,8 +264,7 @@ ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out) { if(p->out == out) { - Ice::InvocationTimeoutException ex(__FILE__, __LINE__); - out->finished(ex); + out->completed(ex); _requests.erase(p); return; } @@ -272,11 +272,11 @@ ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out) assert(false); // The request has to be queued if it timed out and we're not initialized yet. } } - _connection->requestTimedOut(out); + _connection->requestCanceled(out, ex); } void -ConnectRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) +ConnectRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex) { { Lock sync(*this); @@ -292,14 +292,16 @@ ConnectRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPt if(p->outAsync.get() == outAsync.get()) { _requests.erase(p); - outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0); + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } return; } } - assert(false); // The request has to be queued if it timed out and we're not initialized yet. } } - _connection->asyncRequestTimedOut(outAsync); + _connection->asyncRequestCanceled(outAsync, ex); } Ice::ConnectionIPtr @@ -451,7 +453,7 @@ ConnectRequestHandler::flushRequests() _flushing = true; } - vector<OutgoingAsyncMessageCallbackPtr> sentCallbacks; + vector<OutgoingAsyncBasePtr> sentCallbacks; try { while(!_requests.empty()) // _requests is immutable when _flushing = true @@ -463,7 +465,7 @@ ConnectRequestHandler::flushRequests() } else if(req.outAsync) { - if(req.outAsync->__send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) + if(req.outAsync->send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) { sentCallbacks.push_back(req.outAsync); } @@ -551,11 +553,14 @@ ConnectRequestHandler::flushRequestsWithException() { if(p->out) { - p->out->finished(*_exception.get()); + p->out->completed(*_exception.get()); } else if(p->outAsync) { - p->outAsync->__finished(*_exception.get()); + if(p->outAsync->completed(*_exception.get())) + { + p->outAsync->invokeCompleted(); + } } else { diff --git a/cpp/src/Ice/ConnectRequestHandler.h b/cpp/src/Ice/ConnectRequestHandler.h index 095edda6123..c5bc6602766 100644 --- a/cpp/src/Ice/ConnectRequestHandler.h +++ b/cpp/src/Ice/ConnectRequestHandler.h @@ -42,11 +42,11 @@ public: virtual void finishBatchRequest(BasicStream*); virtual void abortBatchRequest(); - virtual bool sendRequest(OutgoingMessageCallback*); - virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr&); + virtual bool sendRequest(OutgoingBase*); + virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncBasePtr&); - virtual void requestTimedOut(OutgoingMessageCallback*); - virtual void asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr&); + virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&); + virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&); virtual Ice::ConnectionIPtr getConnection(); virtual Ice::ConnectionIPtr waitForConnection(); @@ -69,8 +69,8 @@ private: { } - OutgoingMessageCallback* out; - OutgoingAsyncMessageCallbackPtr outAsync; + OutgoingBase* out; + OutgoingAsyncBasePtr outAsync; BasicStream* os; }; diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index f859ac3fa58..f1fc0380727 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -24,6 +24,7 @@ #include <Ice/RouterInfo.h> #include <Ice/LocalException.h> #include <Ice/Functional.h> +#include <Ice/OutgoingAsync.h> #include <IceUtil/Random.h> #include <iterator> @@ -432,7 +433,7 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad } void -IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync) +IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchPtr& outAsync) { list<ConnectionIPtr> c; @@ -1357,7 +1358,7 @@ IceInternal::IncomingConnectionFactory::connections() const } void -IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync) +IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchPtr& outAsync) { list<ConnectionIPtr> c = connections(); // connections() is synchronized, so no need to synchronize here. diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index bd0bbe30804..92603a55b04 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -66,7 +66,7 @@ public: const CreateConnectionCallbackPtr&); void setRouterInfo(const RouterInfoPtr&); void removeAdapter(const Ice::ObjectAdapterPtr&); - void flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr&); + void flushAsyncBatchRequests(const CommunicatorFlushBatchPtr&); private: @@ -178,7 +178,7 @@ public: EndpointIPtr endpoint() const; std::list<Ice::ConnectionIPtr> connections() const; - void flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr&); + void flushAsyncBatchRequests(const CommunicatorFlushBatchPtr&); // // Operations from EventHandler diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 1bdbeb12448..92dc4a1693d 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -242,7 +242,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) } void -Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream) +Ice::ConnectionI::OutgoingMessage::canceled(bool adoptStream) { assert((out || outAsync)); // Only requests can timeout. out = 0; @@ -253,7 +253,7 @@ Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream) } else { - assert(!adopted && !stream); + assert(!adopted); } } @@ -273,25 +273,28 @@ Ice::ConnectionI::OutgoingMessage::sent() else if(outAsync) { #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - invokeSent = outAsync->__sent(); + invokeSent = outAsync->sent(); return invokeSent || receivedReply; #else - return outAsync->__sent(); + return outAsync->sent(); #endif } return false; } void -Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex) +Ice::ConnectionI::OutgoingMessage::completed(const Ice::LocalException& ex) { if(out) { - out->finished(ex); + out->completed(ex); } else if(outAsync) { - outAsync->__finished(ex); + if(outAsync->completed(ex)) + { + outAsync->invokeCompleted(); + } } if(adopted) @@ -651,8 +654,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) #endif } - out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, - static_cast<Int>(os->b.size() - headerSize - 4)); + out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); // // Send the message. If it can't be sent without blocking the message is added @@ -685,7 +687,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) AsyncStatus Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response) { - BasicStream* os = out->__getOs(); + BasicStream* os = out->getOs(); IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_exception.get()) @@ -731,8 +733,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b #endif } - out->__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, - static_cast<Int>(os->b.size() - headerSize - 4)); + out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); AsyncStatus status = AsyncStatusQueued; try @@ -747,6 +748,11 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b _exception->ice_throw(); } + if(response || status & AsyncStatusQueued) + { + out->cancelable(this); // Notify the request that it's cancelable + } + if(response) { // @@ -961,7 +967,7 @@ Ice::ConnectionI::abortBatchRequest() void Ice::ConnectionI::flushBatchRequests() { - BatchOutgoing out(this, _instance.get(), __flushBatchRequests_name); + FlushBatch out(this, _instance.get(), __flushBatchRequests_name); out.invoke(); } @@ -986,9 +992,8 @@ Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchR #ifdef ICE_CPP11 AsyncResultPtr -Ice::ConnectionI::begin_flushBatchRequests( - const IceInternal::Function<void (const Exception&)>& exception, - const IceInternal::Function<void (bool)>& sent) +Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (const Exception&)>& exception, + const IceInternal::Function<void (bool)>& sent) { class Cpp11CB : public IceInternal::Cpp11FnCallbackNC @@ -1026,16 +1031,13 @@ Ice::ConnectionI::begin_flushBatchRequests( AsyncResultPtr Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie) { - ConnectionBatchOutgoingAsyncPtr result = - new ConnectionBatchOutgoingAsync(this, _communicator, _instance, __flushBatchRequests_name, cb, cookie); - try - { - result->__invoke(); - } - catch(const LocalException& __ex) - { - result->__invokeExceptionAsync(__ex); - } + ConnectionFlushBatchPtr result = new ConnectionFlushBatch(this, + _communicator, + _instance, + __flushBatchRequests_name, + cb, + cookie); + result->invoke(); return result; } @@ -1047,7 +1049,7 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) } bool -Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) +Ice::ConnectionI::flushBatchRequests(OutgoingBase* out) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); while(_batchStreamInUse && !_exception.get()) @@ -1075,12 +1077,10 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) #else copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif - - out->attachRemoteObserver(initConnectionInfo(), _endpoint, - static_cast<Int>(_batchStream.b.size() - headerSize - 4)); - _batchStream.swap(*out->os()); + out->attachRemoteObserver(initConnectionInfo(), _endpoint, 0); + // // Send the batch stream. // @@ -1109,7 +1109,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) } AsyncStatus -Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) +Ice::ConnectionI::flushAsyncBatchRequests(const OutgoingAsyncBasePtr& outAsync) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); while(_batchStreamInUse && !_exception.get()) @@ -1125,7 +1125,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) if(_batchRequestNum == 0) { AsyncStatus status = AsyncStatusSent; - if(outAsync->__sent()) + if(outAsync->sent()) { status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback); } @@ -1141,11 +1141,9 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) #else copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif + _batchStream.swap(*outAsync->getOs()); - outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint, 0, - static_cast<Int>(_batchStream.b.size() - headerSize - 4)); - - _batchStream.swap(*outAsync->__getOs()); + outAsync->attachRemoteObserver(initConnectionInfo(), _endpoint, 0); // // Send the batch stream. @@ -1153,7 +1151,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) AsyncStatus status = AsyncStatusQueued; try { - OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, 0); + OutgoingMessage message(outAsync, outAsync->getOs(), _batchRequestCompress, 0); status = sendMessage(message); } catch(const Ice::LocalException& ex) @@ -1163,6 +1161,11 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) _exception->ice_throw(); } + if(status & AsyncStatusQueued) + { + outAsync->cancelable(this); // Notify the request that it's cancelable. + } + // // Reset the batch stream. // @@ -1276,9 +1279,14 @@ Ice::ConnectionI::getACM() } void -Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) +Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state >= StateClosed) + { + return; // The request has already been or will be shortly notified of the failure. + } + for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { if(o->out == out) @@ -1302,16 +1310,15 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) // if(o == _sendStreams.begin()) { - o->timedOut(true); // true = adopt the stream. + o->canceled(true); // true = adopt the stream. } else { - o->timedOut(false); + o->canceled(false); _sendStreams.erase(o); } - InvocationTimeoutException ex(__FILE__, __LINE__); - out->finished(ex); + out->completed(ex); return; } } @@ -1321,8 +1328,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) { if(_requestsHint != _requests.end() && _requestsHint->second == o) { - InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex); + o->completed(ex); _requests.erase(_requestsHint); _requestsHint = _requests.end(); } @@ -1332,8 +1338,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) { if(p->second == o) { - InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex); + o->completed(ex); assert(p != _requestsHint); _requests.erase(p); return; // We're done. @@ -1344,10 +1349,18 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) } void -Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) +Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + // + // NOTE: This isn't called from a thread pool thread. + // + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state >= StateClosed) + { + return; // The request has already been or will be shortly notified of the failure. + } + for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { if(o->outAsync.get() == outAsync.get()) @@ -1365,25 +1378,29 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou _asyncRequests.erase(o->requestId); } } - + // // If the request is being sent, don't remove it from the send streams, // it will be removed once the sending is finished. // if(o == _sendStreams.begin()) { - o->timedOut(true); // true = adopt the stream + o->canceled(true); // true = adopt the stream } else { - o->timedOut(false); + o->canceled(false); _sendStreams.erase(o); } - outAsync->__dispatchInvocationTimeout(_threadPool, this); - return; // We're done + if(outAsync->completed(ex)) + { + sync.release(); + outAsync->invokeCompleted(); + } + return; } } - + OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync); if(o) { @@ -1393,19 +1410,25 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou { _asyncRequests.erase(_asyncRequestsHint); _asyncRequestsHint = _asyncRequests.end(); - outAsync->__dispatchInvocationTimeout(_threadPool, this); - return; // We're done + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } + return; } } - + for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) { if(p->second.get() == o.get()) { assert(p != _asyncRequestsHint); _asyncRequests.erase(p); - outAsync->__dispatchInvocationTimeout(_threadPool, this); - return; // We're done + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } + return; } } } @@ -1972,18 +1995,18 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) if(p->invokeSent) { - p->outAsync->__invokeSent(); + p->outAsync->invokeSent(); } if(p->receivedReply) { OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync); - if(outAsync->__finished()) + if(outAsync->completed()) { - outAsync->__invokeCompleted(); + outAsync->invokeCompleted(); } } #else - p->outAsync->__invokeSent(); + p->outAsync->invokeSent(); #endif } ++dispatchedCount; @@ -1995,7 +2018,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess // if(outAsync) { - outAsync->__invokeCompleted(); + outAsync->invokeCompleted(); ++dispatchedCount; } @@ -2147,14 +2170,14 @@ Ice::ConnectionI::finish() { if(message->sent() && message->invokeSent) { - message->outAsync->__invokeSent(); + message->outAsync->invokeSent(); } if(message->receivedReply) { OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync); - if(outAsync->__finished()) + if(outAsync->completed()) { - outAsync->__invokeCompleted(); + outAsync->invokeCompleted(); } } _sendStreams.pop_front(); @@ -2164,7 +2187,7 @@ Ice::ConnectionI::finish() for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { - o->finished(*_exception.get()); + o->completed(*_exception.get()); if(o->requestId) // Make sure finished isn't called twice. { if(o->out) @@ -2182,13 +2205,16 @@ Ice::ConnectionI::finish() for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { - p->second->finished(*_exception.get()); + p->second->completed(*_exception.get()); } _requests.clear(); for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { - q->second->__finished(*_exception.get()); + if(q->second->completed(*_exception.get())) + { + q->second->invokeCompleted(); + } } _asyncRequests.clear(); @@ -3481,7 +3507,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request if(p != _requests.end()) { - p->second->finished(stream); + p->second->completed(stream); if(p == _requestsHint) { @@ -3508,7 +3534,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request _asyncRequests.erase(q); } - stream.swap(*outAsync->__getIs()); + stream.swap(*outAsync->getIs()); #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) // @@ -3522,7 +3548,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request message->receivedReply = true; outAsync = 0; } - else if(outAsync->__finished()) + else if(outAsync->completed()) { ++dispatchCount; } @@ -3531,7 +3557,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request outAsync = 0; } #else - if(outAsync->__finished()) + if(outAsync->completed()) { ++dispatchCount; } diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 32c474d20da..3ecec79a247 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -30,6 +30,7 @@ #include <Ice/TraceLevelsF.h> #include <Ice/OutgoingAsyncF.h> #include <Ice/EventHandler.h> +#include <Ice/RequestHandler.h> #include <Ice/ResponseHandler.h> #include <Ice/Dispatcher.h> #include <Ice/ObserverHelper.h> @@ -43,8 +44,7 @@ namespace IceInternal { class Outgoing; -class BatchOutgoing; -class OutgoingMessageCallback; +class OutgoingBase; } @@ -56,6 +56,7 @@ class LocalException; class ConnectionI : public Connection, public IceInternal::EventHandler, public IceInternal::ResponseHandler, + public IceInternal::CancellationHandler, public IceUtil::Monitor<IceUtil::Mutex> { class Observer : public IceInternal::ObserverHelperT<Ice::Instrumentation::ConnectionObserver> @@ -89,7 +90,7 @@ public: { } - OutgoingMessage(IceInternal::OutgoingMessageCallback* o, IceInternal::BasicStream* str, bool comp, int rid) : + OutgoingMessage(IceInternal::OutgoingBase* o, IceInternal::BasicStream* str, bool comp, int rid) : stream(str), out(o), compress(comp), requestId(rid), adopted(false) #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) , isSent(false), invokeSent(false), receivedReply(false) @@ -97,7 +98,7 @@ public: { } - OutgoingMessage(const IceInternal::OutgoingAsyncMessageCallbackPtr& o, IceInternal::BasicStream* str, + OutgoingMessage(const IceInternal::OutgoingAsyncBasePtr& o, IceInternal::BasicStream* str, bool comp, int rid) : stream(str), out(0), outAsync(o), compress(comp), requestId(rid), adopted(false) #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) @@ -107,13 +108,13 @@ public: } void adopt(IceInternal::BasicStream*); - void timedOut(bool); + void canceled(bool); bool sent(); - void finished(const Ice::LocalException&); + void completed(const Ice::LocalException&); IceInternal::BasicStream* stream; - IceInternal::OutgoingMessageCallback* out; - IceInternal::OutgoingAsyncMessageCallbackPtr outAsync; + IceInternal::OutgoingBase* out; + IceInternal::OutgoingAsyncBasePtr outAsync; bool compress; int requestId; bool adopted; @@ -178,8 +179,8 @@ public: virtual void end_flushBatchRequests(const AsyncResultPtr&); - bool flushBatchRequests(IceInternal::BatchOutgoing*); - IceInternal::AsyncStatus flushAsyncBatchRequests(const IceInternal::BatchOutgoingAsyncPtr&); + bool flushBatchRequests(IceInternal::OutgoingBase*); + IceInternal::AsyncStatus flushAsyncBatchRequests(const IceInternal::OutgoingAsyncBasePtr&); virtual void setCallback(const ConnectionCallbackPtr&); virtual void setACM(const IceUtil::Optional<int>&, @@ -187,8 +188,8 @@ public: const IceUtil::Optional<ACMHeartbeat>&); virtual ACM getACM(); - void requestTimedOut(IceInternal::OutgoingMessageCallback*); - void asyncRequestTimedOut(const IceInternal::OutgoingAsyncMessageCallbackPtr&); + virtual void requestCanceled(IceInternal::OutgoingBase*, const LocalException&); + virtual void asyncRequestCanceled(const IceInternal::OutgoingAsyncBasePtr&, const LocalException&); virtual void sendResponse(Int, IceInternal::BasicStream*, Byte); virtual void sendNoResponse(); diff --git a/cpp/src/Ice/ConnectionRequestHandler.cpp b/cpp/src/Ice/ConnectionRequestHandler.cpp index 43dda1b88ed..a94d3e7180a 100644 --- a/cpp/src/Ice/ConnectionRequestHandler.cpp +++ b/cpp/src/Ice/ConnectionRequestHandler.cpp @@ -91,27 +91,27 @@ ConnectionRequestHandler::abortBatchRequest() } bool -ConnectionRequestHandler::sendRequest(OutgoingMessageCallback* out) +ConnectionRequestHandler::sendRequest(OutgoingBase* out) { return out->send(_connection, _compress, _response) && !_response; // Finished if sent and no response } AsyncStatus -ConnectionRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& out) +ConnectionRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out) { - return out->__send(_connection, _compress, _response); + return out->send(_connection, _compress, _response); } void -ConnectionRequestHandler::requestTimedOut(OutgoingMessageCallback* out) +ConnectionRequestHandler::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex) { - _connection->requestTimedOut(out); + _connection->requestCanceled(out, ex); } void -ConnectionRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) +ConnectionRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex) { - _connection->asyncRequestTimedOut(outAsync); + _connection->asyncRequestCanceled(outAsync, ex); } Ice::ConnectionIPtr diff --git a/cpp/src/Ice/ConnectionRequestHandler.h b/cpp/src/Ice/ConnectionRequestHandler.h index 5ab5a4c9ea7..211e8f02819 100644 --- a/cpp/src/Ice/ConnectionRequestHandler.h +++ b/cpp/src/Ice/ConnectionRequestHandler.h @@ -31,11 +31,11 @@ public: virtual void finishBatchRequest(BasicStream*); virtual void abortBatchRequest(); - virtual bool sendRequest(OutgoingMessageCallback*); - virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr&); + virtual bool sendRequest(OutgoingBase*); + virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncBasePtr&); - virtual void requestTimedOut(OutgoingMessageCallback*); - virtual void asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr&); + virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&); + virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&); virtual Ice::ConnectionIPtr getConnection(); virtual Ice::ConnectionIPtr waitForConnection(); diff --git a/cpp/src/Ice/Exception.cpp b/cpp/src/Ice/Exception.cpp index a8bb8c6f81a..e5a3aed5b9f 100644 --- a/cpp/src/Ice/Exception.cpp +++ b/cpp/src/Ice/Exception.cpp @@ -502,6 +502,13 @@ Ice::InvocationTimeoutException::ice_print(ostream& out) const } void +Ice::InvocationCanceledException::ice_print(ostream& out) const +{ + Exception::ice_print(out); + out << ":\ninvocation canceled"; +} + +void Ice::ProtocolException::ice_print(ostream& out) const { Exception::ice_print(out); diff --git a/cpp/src/Ice/Makefile b/cpp/src/Ice/Makefile index 1398ab5d07a..f44e0c977a8 100644 --- a/cpp/src/Ice/Makefile +++ b/cpp/src/Ice/Makefile @@ -57,6 +57,7 @@ SLICE_OBJS = BuiltinSequences.o \ OBJS = Acceptor.o \ ACM.o \ Application.o \ + AsyncResult.o \ Base64.o \ BasicStream.o \ Buffer.o \ diff --git a/cpp/src/Ice/Makefile.mak b/cpp/src/Ice/Makefile.mak index 75cf8c26a68..fcc4d3eb3e9 100644 --- a/cpp/src/Ice/Makefile.mak +++ b/cpp/src/Ice/Makefile.mak @@ -59,6 +59,7 @@ WINDOWS_OBJS = .\DLLMain.obj OBJS = .\Acceptor.obj \ .\ACM.obj \ .\Application.obj \ + .\AsyncResult.obj \ .\Base64.obj \ .\BasicStream.obj \ .\Buffer.obj \ diff --git a/cpp/src/Ice/ObjectAdapterFactory.cpp b/cpp/src/Ice/ObjectAdapterFactory.cpp index ca7938bcc55..35ca16ea03e 100644 --- a/cpp/src/Ice/ObjectAdapterFactory.cpp +++ b/cpp/src/Ice/ObjectAdapterFactory.cpp @@ -211,7 +211,7 @@ IceInternal::ObjectAdapterFactory::removeObjectAdapter(const ObjectAdapterPtr& a } void -IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync) const +IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchPtr& outAsync) const { list<ObjectAdapterIPtr> adapters; { diff --git a/cpp/src/Ice/ObjectAdapterFactory.h b/cpp/src/Ice/ObjectAdapterFactory.h index 7faa8ef73f4..37b3853497c 100644 --- a/cpp/src/Ice/ObjectAdapterFactory.h +++ b/cpp/src/Ice/ObjectAdapterFactory.h @@ -33,7 +33,7 @@ public: ::Ice::ObjectAdapterPtr createObjectAdapter(const std::string&, const Ice::RouterPrx&); ::Ice::ObjectAdapterPtr findObjectAdapter(const ::Ice::ObjectPrx&); void removeObjectAdapter(const ::Ice::ObjectAdapterPtr&); - void flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr&) const; + void flushAsyncBatchRequests(const CommunicatorFlushBatchPtr&) const; private: diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp index 422cdb8fd01..c9a05c091ca 100644 --- a/cpp/src/Ice/ObjectAdapterI.cpp +++ b/cpp/src/Ice/ObjectAdapterI.cpp @@ -743,7 +743,7 @@ Ice::ObjectAdapterI::isLocal(const ObjectPrx& proxy) const } void -Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync) +Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorFlushBatchPtr& outAsync) { vector<IncomingConnectionFactoryPtr> f; { diff --git a/cpp/src/Ice/ObjectAdapterI.h b/cpp/src/Ice/ObjectAdapterI.h index 3897c0ef557..30f499d96c0 100644 --- a/cpp/src/Ice/ObjectAdapterI.h +++ b/cpp/src/Ice/ObjectAdapterI.h @@ -87,7 +87,7 @@ public: bool isLocal(const ObjectPrx&) const; - void flushAsyncBatchRequests(const IceInternal::CommunicatorBatchOutgoingAsyncPtr&); + void flushAsyncBatchRequests(const IceInternal::CommunicatorFlushBatchPtr&); void updateConnectionObservers(); void updateThreadObservers(); diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index 66509a2bedc..4815b9796fb 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -7,36 +7,44 @@ // // ********************************************************************** +#include <IceUtil/DisableWarnings.h> #include <Ice/Outgoing.h> -#include <Ice/Object.h> -#include <Ice/CollocatedRequestHandler.h> #include <Ice/ConnectionI.h> +#include <Ice/CollocatedRequestHandler.h> #include <Ice/Reference.h> -#include <Ice/Endpoint.h> -#include <Ice/LocalException.h> -#include <Ice/Protocol.h> #include <Ice/Instance.h> +#include <Ice/LocalException.h> #include <Ice/ReplyStatus.h> -#include <Ice/ProxyFactory.h> +#include <Ice/ImplicitContextI.h> using namespace std; using namespace Ice; using namespace Ice::Instrumentation; using namespace IceInternal; -IceInternal::Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode, - const Context* context) : +OutgoingBase::OutgoingBase(Instance* instance, const string& operation) : + _os(instance, Ice::currentProtocolEncoding), _sent(false) +{ +} + +Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode, const Context* context) : + OutgoingBase(proxy->__reference()->getInstance().get(), operation), _proxy(proxy), _mode(mode), - _observer(proxy, operation, context), _state(StateUnsent), _encoding(getCompatibleEncoding(proxy->__reference()->getEncoding())), - _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), - _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), - _sent(false) + _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding) { checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol())); + _observer.attach(proxy, operation, context); + + int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); + if(invocationTimeout > 0) + { + _invocationTimeoutDeadline = IceUtil::Time::now() + IceUtil::Time::milliSeconds(invocationTimeout); + } + switch(_proxy->__reference()->getMode()) { case Reference::ModeTwoway: @@ -129,7 +137,66 @@ Outgoing::~Outgoing() } bool -IceInternal::Outgoing::invoke() +Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response) +{ + return connection->sendRequest(this, compress, response); +} + +void +Outgoing::invokeCollocated(CollocatedRequestHandler* handler) +{ + handler->invokeRequest(this); +} + +void +Outgoing::sent() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(_proxy->__reference()->getMode() != Reference::ModeTwoway) + { + _childObserver.detach(); + _state = StateOK; + } + _sent = true; + _monitor.notify(); + + // + // NOTE: At this point the stack allocated Outgoing object can be destroyed + // since the notify() on the monitor will release the thread waiting on the + // synchronous Ice call. + // +} + +void +Outgoing::completed(const Exception& ex) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + //assert(_state <= StateInProgress); + if(_state > StateInProgress) + { + // + // Response was already received but message + // didn't get removed first from the connection + // send message queue so it's possible we can be + // notified of failures. In this case, ignore the + // failure and assume the outgoing has been sent. + // + assert(_state != StateFailed); + _sent = true; + _monitor.notify(); + return; + } + + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); + + _state = StateFailed; + _exception.reset(ex.ice_clone()); + _monitor.notify(); +} + +bool +Outgoing::invoke() { assert(_state == StateUnsent); @@ -146,6 +213,11 @@ IceInternal::Outgoing::invoke() { try { + if(_invocationTimeoutDeadline != IceUtil::Time() && _invocationTimeoutDeadline <= IceUtil::Time::now()) + { + throw Ice::InvocationTimeoutException(__FILE__, __LINE__); + } + _state = StateInProgress; _exception.reset(0); _sent = false; @@ -164,19 +236,18 @@ IceInternal::Outgoing::invoke() // // If the handler says it's not finished, we wait until we're done. // - int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); - if(invocationTimeout > 0) + if(_invocationTimeoutDeadline != IceUtil::Time()) { IceUtil::Time now = IceUtil::Time::now(); - IceUtil::Time deadline = now + IceUtil::Time::milliSeconds(invocationTimeout); + timedOut = now >= _invocationTimeoutDeadline; while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut) { - _monitor.timedWait(deadline - now); + _monitor.timedWait(_invocationTimeoutDeadline - now); if((_state == StateInProgress || !_sent) && _state != StateFailed) { now = IceUtil::Time::now(); - timedOut = now >= deadline; + timedOut = now >= _invocationTimeoutDeadline; } } } @@ -191,15 +262,15 @@ IceInternal::Outgoing::invoke() if(timedOut) { - _handler->requestTimedOut(this); + _handler->requestCanceled(this, InvocationTimeoutException(__FILE__, __LINE__)); // // Wait for the exception to propagate. It's possible the request handler ignores - // the timeout if there was a failure shortly before requestTimedOut got called. + // the timeout if there was a failure shortly before requestCanceled got called. // In this case, the exception should be set on the Outgoing. // IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - while(!_exception.get()) + while(_state == StateInProgress) { _monitor.wait(); } @@ -223,11 +294,23 @@ IceInternal::Outgoing::invoke() { try { - int interval = _proxy->__handleException(ex, _handler, _mode, _sent, cnt); - _observer.retried(); // Invocation is being retried. - if(interval > 0) + IceUtil::Time interval; + interval = IceUtil::Time::milliSeconds(_proxy->__handleException(ex, _handler, _mode, _sent, cnt)); + if(interval > IceUtil::Time()) + { + if(_invocationTimeoutDeadline != IceUtil::Time()) + { + IceUtil::Time deadline = _invocationTimeoutDeadline - IceUtil::Time::now(); + if(deadline < interval) + { + interval = deadline; + } + } + IceUtil::ThreadControl::sleep(interval); + } + if(_invocationTimeoutDeadline == IceUtil::Time() || _invocationTimeoutDeadline > IceUtil::Time::now()) { - IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval)); + _observer.retried(); } } catch(const Ice::Exception& ex) @@ -243,7 +326,7 @@ IceInternal::Outgoing::invoke() } void -IceInternal::Outgoing::abort(const LocalException& ex) +Outgoing::abort(const LocalException& ex) { assert(_state == StateUnsent); @@ -261,67 +344,8 @@ IceInternal::Outgoing::abort(const LocalException& ex) ex.ice_throw(); } -bool -IceInternal::Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response) -{ - return connection->sendRequest(this, compress, response); -} - -void -IceInternal::Outgoing::invokeCollocated(CollocatedRequestHandler* handler) -{ - handler->invokeRequest(this); -} - -void -IceInternal::Outgoing::sent() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(_proxy->__reference()->getMode() != Reference::ModeTwoway) - { - _childObserver.detach(); - _state = StateOK; - } - _sent = true; - _monitor.notify(); - - // - // NOTE: At this point the stack allocated Outgoing object can be destroyed - // since the notify() on the monitor will release the thread waiting on the - // synchronous Ice call. - // -} - void -IceInternal::Outgoing::finished(const Exception& ex) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - //assert(_state <= StateInProgress); - if(_state > StateInProgress) - { - // - // Response was already received but message - // didn't get removed first from the connection - // send message queue so it's possible we can be - // notified of failures. In this case, ignore the - // failure and assume the outgoing has been sent. - // - assert(_state != StateFailed); - _sent = true; - _monitor.notify(); - return; - } - - _childObserver.failed(ex.ice_name()); - _childObserver.detach(); - - _state = StateFailed; - _exception.reset(ex.ice_clone()); - _monitor.notify(); -} - -void -IceInternal::Outgoing::finished(BasicStream& is) +Outgoing::completed(BasicStream& is) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); @@ -482,7 +506,7 @@ IceInternal::Outgoing::finished(BasicStream& is) } void -IceInternal::Outgoing::throwUserException() +Outgoing::throwUserException() { try { @@ -496,27 +520,22 @@ IceInternal::Outgoing::throwUserException() } } -IceInternal::BatchOutgoing::BatchOutgoing(IceProxy::Ice::Object* proxy, const string& name) : - _proxy(proxy), - _connection(0), - _sent(false), - _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), - _observer(proxy, name, 0) +FlushBatch::FlushBatch(IceProxy::Ice::Object* proxy, const string& operation) : + OutgoingBase(proxy->__reference()->getInstance().get(), operation), _proxy(proxy), _connection(0) { checkSupportedProtocol(proxy->__reference()->getProtocol()); + + _observer.attach(proxy->__reference()->getInstance().get(), operation); } -IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance, const string& name) : - _proxy(0), - _connection(connection), - _sent(false), - _os(instance, Ice::currentProtocolEncoding), - _observer(instance, name) +FlushBatch::FlushBatch(ConnectionI* connection, Instance* instance, const string& operation) : + OutgoingBase(instance, operation), _proxy(0), _connection(connection) { + _observer.attach(instance, operation); } void -IceInternal::BatchOutgoing::invoke() +FlushBatch::invoke() { assert(_proxy || _connection); @@ -577,7 +596,8 @@ IceInternal::BatchOutgoing::invoke() if(timedOut) { - handler->requestTimedOut(this); + Ice::InvocationTimeoutException ex(__FILE__, __LINE__); + handler->requestCanceled(this, ex); // // Wait for the exception to propagate. It's possible the request handler ignores @@ -614,19 +634,19 @@ IceInternal::BatchOutgoing::invoke() } bool -IceInternal::BatchOutgoing::send(const Ice::ConnectionIPtr& connection, bool, bool) +FlushBatch::send(const Ice::ConnectionIPtr& connection, bool, bool) { return connection->flushBatchRequests(this); } void -IceInternal::BatchOutgoing::invokeCollocated(CollocatedRequestHandler* handler) +FlushBatch::invokeCollocated(CollocatedRequestHandler* handler) { handler->invokeBatchRequests(this); } void -IceInternal::BatchOutgoing::sent() +FlushBatch::sent() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); _childObserver.detach(); @@ -635,14 +655,14 @@ IceInternal::BatchOutgoing::sent() _monitor.notify(); // - // NOTE: At this point the stack allocated BatchOutgoing object + // NOTE: At this point the stack allocated FlushBatch object // can be destroyed since the notify() on the monitor will release // the thread waiting on the synchronous Ice call. // } void -IceInternal::BatchOutgoing::finished(const Ice::Exception& ex) +FlushBatch::completed(const Ice::Exception& ex) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); _childObserver.failed(ex.ice_name()); diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index fb57965b60f..b348b1c8cc8 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -9,18 +9,11 @@ #include <IceUtil/DisableWarnings.h> #include <Ice/OutgoingAsync.h> -#include <Ice/Object.h> #include <Ice/ConnectionI.h> #include <Ice/CollocatedRequestHandler.h> #include <Ice/Reference.h> #include <Ice/Instance.h> #include <Ice/LocalException.h> -#include <Ice/Properties.h> -#include <Ice/LoggerUtil.h> -#include <Ice/LocatorInfo.h> -#include <Ice/ProxyFactory.h> -#include <Ice/RouterInfo.h> -#include <Ice/Protocol.h> #include <Ice/ReplyStatus.h> #include <Ice/ImplicitContextI.h> #include <Ice/ThreadPool.h> @@ -30,431 +23,289 @@ using namespace std; using namespace Ice; using namespace IceInternal; -IceUtil::Shared* Ice::upCast(AsyncResult* p) { return p; } - -IceUtil::Shared* IceInternal::upCast(OutgoingAsyncMessageCallback* p) { return p; } +IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; } +IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; } IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(ProxyBatchOutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(ConnectionBatchOutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(CommunicatorBatchOutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(GetConnectionOutgoingAsync* p) { return p; } - -const unsigned char Ice::AsyncResult::OK = 0x1; -const unsigned char Ice::AsyncResult::Done = 0x2; -const unsigned char Ice::AsyncResult::Sent = 0x4; -const unsigned char Ice::AsyncResult::EndCalled = 0x8; - -namespace -{ - -class AsynchronousException : public DispatchWorkItem -{ -public: - - AsynchronousException(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result, - const Ice::Exception& ex) : - DispatchWorkItem(connection), _result(result), _exception(ex.ice_clone()) - { - } - - virtual void - run() - { - _result->__invokeException(*_exception.get()); - } - -private: - - const Ice::AsyncResultPtr _result; - const IceUtil::UniquePtr<Ice::Exception> _exception; -}; - -class AsynchronousSent : public DispatchWorkItem -{ -public: - - AsynchronousSent(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result) : - DispatchWorkItem(connection), _result(result) - { - } - - virtual void - run() - { - _result->__invokeSent(); - } +IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatch* p) { return p; } -private: - - const Ice::AsyncResultPtr _result; -}; - -}; - -Ice::AsyncResult::AsyncResult(const CommunicatorPtr& communicator, - const IceInternal::InstancePtr& instance, - const string& op, - const CallbackBasePtr& del, - const LocalObjectPtr& cookie) : - _communicator(communicator), - _instance(instance), - _operation(op), - _callback(del), - _cookie(cookie), - _is(instance.get(), Ice::currentProtocolEncoding), - _os(instance.get(), Ice::currentProtocolEncoding), - _state(0), - _sentSynchronously(false), - _exception(0) +bool +OutgoingAsyncBase::sent() { - if(!_callback) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__); - } - const_cast<CallbackBasePtr&>(_callback) = _callback->verify(_cookie); + return sent(true); } -Ice::AsyncResult::~AsyncResult() +bool +OutgoingAsyncBase::completed(const Exception& ex) { + return finished(ex); } -Int -Ice::AsyncResult::getHash() const +OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + AsyncResult(communicator, instance, operation, delegate, cookie), + _os(instance.get(), Ice::currentProtocolEncoding) { - return static_cast<Int>(reinterpret_cast<Long>(this) >> 4); } bool -Ice::AsyncResult::isCompleted() const +OutgoingAsyncBase::sent(bool done) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - return _state & Done; -} - -void -Ice::AsyncResult::waitForCompleted() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - while(!(_state & Done)) + if(done) { - _monitor.wait(); + _childObserver.detach(); } + return AsyncResult::sent(done); } bool -Ice::AsyncResult::isSent() const -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - return _state & Sent; -} - -void -Ice::AsyncResult::waitForSent() +OutgoingAsyncBase::finished(const Exception& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - while(!(_state & Sent) && !_exception.get()) + if(_childObserver) { - _monitor.wait(); + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); } + return AsyncResult::finished(ex); } -void -Ice::AsyncResult::throwLocalException() const +Ice::ObjectPrx +ProxyOutgoingAsyncBase::getProxy() const { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(_exception.get()) - { - _exception.get()->ice_throw(); - } + return _proxy; } bool -Ice::AsyncResult::__wait() +ProxyOutgoingAsyncBase::sent() { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(_state & EndCalled) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once"); - } - _state |= EndCalled; - while(!(_state & Done)) - { - _monitor.wait(); - } - if(_exception.get()) - { - _exception.get()->ice_throw(); - } - return _state & OK; + return sent(!_proxy->ice_isTwoway()); // Done if it's not a two-way proxy (no response expected). } -void -Ice::AsyncResult::__throwUserException() +bool +ProxyOutgoingAsyncBase::completed(const Exception& exc) { - try - { - _is.startReadEncaps(); - _is.throwException(); - } - catch(const Ice::UserException&) + if(_childObserver) { - _is.endReadEncaps(); - throw; + _childObserver.failed(exc.ice_name()); + _childObserver.detach(); } -} -void -Ice::AsyncResult::__invokeSent() -{ // - // Note: no need to change the _state here, specializations are responsible for - // changing the state. + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. // - - if(_callback) + try { - try - { - AsyncResultPtr self(this); - _callback->sent(self); - } - catch(const std::exception& ex) - { - __warning(ex); - } - catch(...) - { - __warning(); - } + _instance->retryQueue()->add(this, handleException(exc)); + return false; } - - if(_observer) + catch(const Exception& ex) { - Ice::ObjectPrx proxy = getProxy(); - if(!proxy || !proxy->ice_isTwoway()) - { - _observer.detach(); - } + return finished(ex); // No retries, we're done } } void -Ice::AsyncResult::__invokeSentAsync() +ProxyOutgoingAsyncBase::retry() { - // - // This is called when it's not safe to call the sent callback synchronously - // from this thread. Instead the exception callback is called asynchronously from - // the client thread pool. - // - try - { - _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, this)); - } - catch(const Ice::CommunicatorDestroyedException&) - { - } + invokeImpl(false); } void -Ice::AsyncResult::__invokeException(const Ice::Exception& ex) +ProxyOutgoingAsyncBase::abort(const Ice::Exception& ex) { + assert(!_childObserver); + + if(finished(ex)) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _state |= Done; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation - _exception.reset(ex.ice_clone()); - _monitor.notifyAll(); + invokeCompletedAsync(); + } + else if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) + { + // + // If it's a communicator destroyed exception, don't swallow + // it but instead notify the user thread. Even if no callback + // was provided. + // + ex.ice_throw(); } - - __invokeCompleted(); } -void -Ice::AsyncResult::__invokeExceptionAsync(const Ice::Exception& ex) +ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrx& prx, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + OutgoingAsyncBase(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie), + _proxy(prx), + _mode(Normal), + _cnt(0), + _sent(false) { - // - // This is called when it's not safe to call the exception callback synchronously - // from this thread. Instead the exception callback is called asynchronously from - // the client thread pool. - // - // CommunicatorDestroyedException is the only exception that can propagate directly - // from this method. - // - _instance->clientThreadPool()->dispatch(new AsynchronousException(_cachedConnection, this, ex)); } void -Ice::AsyncResult::__invokeCompleted() +ProxyOutgoingAsyncBase::invokeImpl(bool userThread) { - // - // Note: no need to change the _state here, specializations are responsible for - // changing the state. - // - - if(_callback) + try { - try + if(userThread) { - AsyncResultPtr self(this); - _callback->completed(self); + int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); + if(invocationTimeout > 0) + { + _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); + } } - catch(const std::exception& ex) + else { - __warning(ex); + checkCanceled(); // Cancellation exception aren't retriable + _observer.retried(); } - catch(...) + + while(true) { - __warning(); + try + { + _sent = false; + _handler = _proxy->__getRequestHandler(); + AsyncStatus status = _handler->sendAsyncRequest(this); + if(status & AsyncStatusSent) + { + if(userThread) + { + _sentSynchronously = true; + if(status & AsyncStatusInvokeSentCallback) + { + invokeSent(); // Call the sent callback from the user thread. + } + } + else + { + if(status & AsyncStatusInvokeSentCallback) + { + invokeSentAsync(); // Call the sent callback from a client thread pool thread. + } + } + } + return; // We're done! + } + catch(const RetryException& ex) + { + handleRetryException(ex); + } + catch(const Exception& ex) + { + if(_childObserver) + { + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); + } + int interval = handleException(ex); + if(interval > 0) + { + _instance->retryQueue()->add(this, interval); + return; + } + else + { + checkCanceled(); // Cancellation exception aren't retriable + _observer.retried(); + } + } } } - - _observer.detach(); -} - -void -Ice::AsyncResult::runTimerTask() // Implementation of TimerTask::runTimerTask() -{ - RequestHandlerPtr handler; + catch(const Exception& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - swap(handler, _timeoutRequestHandler); - } - - if(handler) - { - handler->asyncRequestTimedOut(OutgoingAsyncMessageCallbackPtr::dynamicCast(this)); + // + // If called from the user thread we re-throw, the exception + // will be catch by the caller and abort() will be called. + // + if(userThread) + { + throw; + } + else if(finished(ex)) // No retries, we're done + { + invokeCompletedAsync(); + } } } - -void -Ice::AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation) +bool +ProxyOutgoingAsyncBase::sent(bool done) { - __check(r, operation); - if(r->getProxy().get() != prx) + _sent = true; + if(done) { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Proxy for call to end_" + operation + - " does not match proxy that was used to call corresponding begin_" + - operation + " method"); + if(_proxy->__reference()->getInvocationTimeout() > 0) + { + _instance->timer()->cancel(this); + } } + return OutgoingAsyncBase::sent(done); } -void -Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation) +bool +ProxyOutgoingAsyncBase::finished(const Exception& ex) { - __check(r, operation); - if(r->getCommunicator().get() != com) + if(_proxy->__reference()->getInvocationTimeout() > 0) { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Communicator for call to end_" + operation + - " does not match communicator that was used to call corresponding " + - "begin_" + operation + " method"); + _instance->timer()->cancel(this); } + return OutgoingAsyncBase::finished(ex); } -void -Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation) +bool +ProxyOutgoingAsyncBase::finished(bool ok) { - __check(r, operation); - if(r->getConnection().get() != con) + if(_proxy->__reference()->getInvocationTimeout() > 0) { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Connection for call to end_" + operation + - " does not match connection that was used to call corresponding " + - "begin_" + operation + " method"); + _instance->timer()->cancel(this); } + return AsyncResult::finished(ok); } void -Ice::AsyncResult::__check(const AsyncResultPtr& r, const string& operation) +ProxyOutgoingAsyncBase::handleRetryException(const RetryException& exc) { - if(!r) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "AsyncResult == null"); - } - else if(&r->_operation != &operation) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Incorrect operation for end_" + operation + - " method: " + r->_operation); - } + _proxy->__setRequestHandler(_handler, 0); // Clear request handler and always retry. } - -void -Ice::AsyncResult::__warning(const std::exception& exc) const +int +ProxyOutgoingAsyncBase::handleException(const Exception& exc) { - if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) - { - Warning out(_instance->initializationData().logger); - const Exception* ex = dynamic_cast<const Exception*>(&exc); - if(ex) - { - out << "Ice::Exception raised by AMI callback:\n" << *ex; - } - else - { - out << "std::exception raised by AMI callback:\n" << exc.what(); - } - } + return _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); } void -Ice::AsyncResult::__warning() const +ProxyOutgoingAsyncBase::runTimerTask() { - if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + try { - Warning out(_instance->initializationData().logger); - out << "unknown exception raised by AMI callback"; + cancel(InvocationTimeoutException(__FILE__, __LINE__)); } -} - -void -IceInternal::OutgoingAsyncMessageCallback::__dispatchInvocationTimeout(const ThreadPoolPtr& threadPool, - const Ice::ConnectionPtr& connection) -{ - class InvocationTimeoutCall : public DispatchWorkItem + catch(const CommunicatorDestroyedException&) { - public: - - InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) : - DispatchWorkItem(connection), _outAsync(outAsync) - { - } - - virtual void - run() - { - InvocationTimeoutException ex(__FILE__, __LINE__); - _outAsync->__finished(ex); - } - - private: - - const OutgoingAsyncMessageCallbackPtr _outAsync; - }; - threadPool->dispatch(new InvocationTimeoutCall(this, connection)); + } } -IceInternal::OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, - const std::string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - AsyncResult(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie), - _proxy(prx), +OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + ProxyOutgoingAsyncBase(prx, operation, delegate, cookie), _encoding(getCompatibleEncoding(prx->__reference()->getEncoding())) { } void -IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMode mode, const Context* context) +OutgoingAsync::prepare(const string& operation, OperationMode mode, const Context* context) { - _handler = 0; - _cnt = 0; - _sent = false; - _mode = mode; - _sentSynchronously = false; - checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol())); + _mode = mode; _observer.attach(_proxy.get(), operation, context); switch(_proxy->__reference()->getMode()) @@ -482,7 +333,7 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod { _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. } - catch(const Ice::LocalException& ex) + catch(const LocalException& ex) { _observer.failed(ex.ice_name()); _proxy->__setRequestHandler(_handler, 0); // Clear request handler @@ -541,109 +392,63 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod } AsyncStatus -IceInternal::OutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool compress, bool response) +OutgoingAsync::send(const ConnectionIPtr& connection, bool compress, bool response) { _cachedConnection = connection; return connection->sendAsyncRequest(this, compress, response); } AsyncStatus -IceInternal::OutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler) +OutgoingAsync::invokeCollocated(CollocatedRequestHandler* handler) { return handler->invokeAsyncRequest(this); } -bool -IceInternal::OutgoingAsync::__sent() +void +OutgoingAsync::abort(const Exception& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - - bool alreadySent = _state & Sent; // Expected in case of a retry. - _state |= Sent; - _sent = true; - - assert(!(_state & Done)); - if(!_proxy->ice_isTwoway()) + const Reference::Mode mode = _proxy->__reference()->getMode(); + if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) { - _childObserver.detach(); - if(!_callback || !_callback->hasSentCallback()) + if(_handler) { - _observer.detach(); - } - if(_timeoutRequestHandler) - { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; + // + // If we didn't finish a batch oneway or datagram request, we + // must notify the connection about that we give up ownership + // of the batch stream. + // + _handler->abortBatchRequest(); } - _state |= Done | OK; - //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization. } - _monitor.notifyAll(); - return !alreadySent && _callback && _callback->hasSentCallback(); -} - -void -IceInternal::OutgoingAsync::__invokeSent() -{ - ::Ice::AsyncResult::__invokeSent(); + + ProxyOutgoingAsyncBase::abort(ex); } void -IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc) +OutgoingAsync::invoke() { + const Reference::Mode mode = _proxy->__reference()->getMode(); + if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - assert(!(_state & Done)); - _childObserver.failed(exc.ice_name()); - _childObserver.detach(); - if(_timeoutRequestHandler) + if(_handler) { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; + _sentSynchronously = true; + _handler->finishBatchRequest(&_os); + finished(true); } + return; // Don't call sent/completed callback for batch AMI requests } // - // NOTE: at this point, synchronization isn't needed, no other threads should be - // calling on the callback. + // NOTE: invokeImpl doesn't throw so this can be called from the + // try block with the catch block calling abort() in case of an + // exception. // - try - { - handleException(exc); - } - catch(const Ice::Exception& ex) - { - __invokeException(ex); - } -} - -void -IceInternal::OutgoingAsync::__invokeExceptionAsync(const Ice::Exception& ex) -{ - if((_state & Done) == 0 && _handler) - { - // - // If we didn't finish a batch oneway or datagram request, we - // must notify the connection about that we give up ownership - // of the batch stream. - // - int mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - _handler->abortBatchRequest(); - } - } - AsyncResult::__invokeExceptionAsync(ex); -} - -void -IceInternal::OutgoingAsync::__processRetry() -{ - __invoke(false); + invokeImpl(true); // userThread = true } bool -IceInternal::OutgoingAsync::__finished() +OutgoingAsync::completed() { // // NOTE: this method is called from ConnectionI.parseMessage @@ -652,25 +457,15 @@ IceInternal::OutgoingAsync::__finished() // assert(_proxy->ice_isTwoway()); // Can only be called for twoways. - Ice::Byte replyStatus; - try + if(_childObserver) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - assert(!_exception.get() && !(_state & Done)); - assert(!_is.b.empty()); - - if(_childObserver) - { - _childObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4)); - } + _childObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4)); _childObserver.detach(); + } - if(_timeoutRequestHandler) - { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; - } - + Byte replyStatus; + try + { _is.read(replyStatus); switch(replyStatus) @@ -789,373 +584,197 @@ IceInternal::OutgoingAsync::__finished() } } - _state |= Done; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation - if(replyStatus == replyOK) - { - _state |= OK; - } - _monitor.notifyAll(); - - if(!_callback) - { - _observer.detach(); - return false; - } - return true; + return finished(replyStatus == replyOK); } - catch(const LocalException& exc) + catch(const Exception& ex) { - // - // We don't call finished(exc) here because we don't want - // to invoke the completion callback. The completion - // callback is invoked by the connection is this method - // returns true. - // - try - { - handleException(exc); - return false; // Invocation will be retried. - } - catch(const Ice::Exception& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _state |= Done; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation - _exception.reset(ex.ice_clone()); - _monitor.notifyAll(); - - if(!_callback) - { - _observer.detach(); - return false; - } - return true; - } + return completed(ex); } } +ProxyFlushBatch::ProxyFlushBatch(const ObjectPrx& proxy, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + ProxyOutgoingAsyncBase(proxy, operation, delegate, cookie) +{ + _observer.attach(proxy.get(), operation, 0); +} + bool -IceInternal::OutgoingAsync::__invoke(bool userThread) +ProxyFlushBatch::sent() { - const Reference::Mode mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - _state |= Done | OK; - _handler->finishBatchRequest(&_os); - _observer.detach(); - return true; - } + return ProxyOutgoingAsyncBase::sent(true); // Overriden because the flush is done even if using a two-way proxy. +} - while(true) - { - try - { - _sent = false; - _handler = _proxy->__getRequestHandler(); - AsyncStatus status = _handler->sendAsyncRequest(this); - if(status & AsyncStatusSent) - { - if(userThread) - { - _sentSynchronously = true; - if(status & AsyncStatusInvokeSentCallback) - { - __invokeSent(); // Call the sent callback from the user thread. - } - } - else - { - if(status & AsyncStatusInvokeSentCallback) - { - __invokeSentAsync(); // Call the sent callback from a client thread pool thread. - } - } - } +AsyncStatus +ProxyFlushBatch::send(const ConnectionIPtr& connection, bool, bool) +{ + _cachedConnection = connection; + return connection->flushAsyncBatchRequests(this); +} - if(mode == Reference::ModeTwoway || !(status & AsyncStatusSent)) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(!(_state & Done)) - { - int invocationTimeout = _handler->getReference()->getInvocationTimeout(); - if(invocationTimeout > 0) - { - _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); - _timeoutRequestHandler = _handler; - } - } - } - } - catch(const RetryException&) - { - _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. - continue; - } - catch(const Ice::Exception& ex) - { - handleException(ex); - } - break; - } - return _sentSynchronously; +AsyncStatus +ProxyFlushBatch::invokeCollocated(CollocatedRequestHandler* handler) +{ + return handler->invokeAsyncBatchRequests(this); } void -IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc) +ProxyFlushBatch::invoke() { - try - { - int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); - _observer.retried(); // Invocation is being retried. - - // - // Schedule the retry. Note that we always schedule the retry - // on the retry queue even if the invocation can be retried - // immediately. This is required because it might not be safe - // to retry from this thread (this is for instance called by - // finished(BasicStream) which is called with the connection - // locked. - // - _instance->retryQueue()->add(this, interval); - } - catch(const Ice::Exception& ex) - { - _observer.failed(ex.ice_name()); - throw; - } + checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol())); + invokeImpl(true); // userThread = true } -IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& communicator, - const InstancePtr& instance, - const std::string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - AsyncResult(communicator, instance, operation, delegate, cookie) +void +ProxyFlushBatch::handleRetryException(const RetryException& ex) { + _proxy->__setRequestHandler(_handler, 0); // Clear request handler + ex.get()->ice_throw(); // No retries, we want to notify the user of potentially lost batch requests } -AsyncStatus -IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool, bool) +int +ProxyFlushBatch::handleException(const Exception& ex) { - _cachedConnection = connection; - return connection->flushAsyncBatchRequests(this); + _proxy->__setRequestHandler(_handler, 0); // Clear request handler + ex.ice_throw(); // No retries, we want to notify the user of potentially lost batch requests + return 0; } -AsyncStatus -IceInternal::BatchOutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler) +ProxyGetConnection::ProxyGetConnection(const ObjectPrx& prx, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + ProxyOutgoingAsyncBase(prx, operation, delegate, cookie) { - return handler->invokeAsyncBatchRequests(this); + _observer.attach(prx.get(), operation, 0); } -bool -IceInternal::BatchOutgoingAsync::__sent() +AsyncStatus +ProxyGetConnection::send(const ConnectionIPtr& connection, bool, bool) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - assert(!_exception.get()); - _state |= Done | OK | Sent; - //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization. - _childObserver.detach(); - if(_timeoutRequestHandler) - { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; - } - _monitor.notifyAll(); - if(!_callback || !_callback->hasSentCallback()) + _cachedConnection = connection; + if(finished(true)) { - _observer.detach(); - return false; + invokeCompletedAsync(); } - return true; + return AsyncStatusSent; } -void -IceInternal::BatchOutgoingAsync::__invokeSent() +AsyncStatus +ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*) { - ::Ice::AsyncResult::__invokeSent(); + if(finished(true)) + { + invokeCompletedAsync(); + } + return AsyncStatusSent; } void -IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc) +ProxyGetConnection::invoke() { - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _childObserver.failed(exc.ice_name()); - _childObserver.detach(); - if(_timeoutRequestHandler) - { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; - } - } - __invokeException(exc); + invokeImpl(true); // userThread = true } -void -IceInternal::BatchOutgoingAsync::__processRetry() +ConnectionFlushBatch::ConnectionFlushBatch(const ConnectionIPtr& connection, + const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + OutgoingAsyncBase(communicator, instance, operation, delegate, cookie), _connection(connection) { - assert(false); // Retries are never scheduled + _observer.attach(instance.get(), operation); } -IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectPrx& proxy, - const std::string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - BatchOutgoingAsync(proxy->ice_getCommunicator(), proxy->__reference()->getInstance(), operation, delegate, cookie), - _proxy(proxy) +ConnectionPtr +ConnectionFlushBatch::getConnection() const { - _observer.attach(proxy.get(), operation, 0); + return _connection; } void -IceInternal::ProxyBatchOutgoingAsync::__invoke() +ConnectionFlushBatch::invoke() { - checkSupportedProtocol(_proxy->__reference()->getProtocol()); - - RequestHandlerPtr handler; try { - handler = _proxy->__getRequestHandler(); - AsyncStatus status = handler->sendAsyncRequest(this); + AsyncStatus status = _connection->flushAsyncBatchRequests(this); if(status & AsyncStatusSent) { _sentSynchronously = true; if(status & AsyncStatusInvokeSentCallback) { - __invokeSent(); - } - } - else - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(!(_state & Done)) - { - int invocationTimeout = handler->getReference()->getInvocationTimeout(); - if(invocationTimeout > 0) - { - _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); - _timeoutRequestHandler = handler; - } + invokeSent(); } } } - catch(const RetryException&) - { - // - // Clear request handler but don't retry or throw. Retrying - // isn't useful, there were no batch requests associated with - // the proxy's request handler. - // - _proxy->__setRequestHandler(handler, 0); - } - catch(const Ice::Exception& ex) + catch(const Exception& ex) { - _observer.failed(ex.ice_name()); - _proxy->__setRequestHandler(handler, 0); // Clear request handler - throw; // Throw to notify the user that batch requests were potentially lost. - } -} - -IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const ConnectionIPtr& con, - const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - BatchOutgoingAsync(communicator, instance, operation, delegate, cookie), - _connection(con) -{ - _observer.attach(instance.get(), operation); -} - -void -IceInternal::ConnectionBatchOutgoingAsync::__invoke() -{ - AsyncStatus status = _connection->flushAsyncBatchRequests(this); - if(status & AsyncStatusSent) - { - _sentSynchronously = true; - if(status & AsyncStatusInvokeSentCallback) + if(completed(ex)) { - __invokeSent(); + invokeCompletedAsync(); } } } -Ice::ConnectionPtr -IceInternal::ConnectionBatchOutgoingAsync::getConnection() const +CommunicatorFlushBatch::CommunicatorFlushBatch(const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& cb, + const LocalObjectPtr& cookie) : + AsyncResult(communicator, instance, operation, cb, cookie) { - return _connection; -} + _observer.attach(instance.get(), operation); -IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - AsyncResult(communicator, instance, operation, delegate, cookie) -{ // // _useCount is initialized to 1 to prevent premature callbacks. // The caller must invoke ready() after all flush requests have // been initiated. // _useCount = 1; - - // - // Assume all connections are flushed synchronously. - // - _sentSynchronously = true; - - // - // Attach observer - // - _observer.attach(instance.get(), operation); } void -IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPtr& con) +CommunicatorFlushBatch::flushConnection(const ConnectionIPtr& con) { - class BatchOutgoingAsyncI : public BatchOutgoingAsync + class FlushBatch : public OutgoingAsyncBase { public: - - BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync, - const InstancePtr& instance, - InvocationObserver& observer) : - BatchOutgoingAsync(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0), - _outAsync(outAsync), _observer(observer) + + FlushBatch(const CommunicatorFlushBatchPtr& outAsync, + const InstancePtr& instance, + InvocationObserver& observer) : + OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0), + _outAsync(outAsync), + _observer(observer) { } - virtual bool __sent() + virtual bool sent() { _childObserver.detach(); _outAsync->check(false); return false; } -#ifdef __SUNPRO_CC - using BatchOutgoingAsync::__sent; -#endif - - virtual void __finished(const Ice::Exception& ex) + virtual bool completed(const Exception& ex) { _childObserver.failed(ex.ice_name()); _childObserver.detach(); _outAsync->check(false); + return false; } - virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt, - Ice::Int requestId, Ice::Int sz) + private: + + virtual InvocationObserver& getObserver() { - _childObserver.attach(_observer.getRemoteObserver(connection, endpt, requestId, sz)); + return _observer; } - private: - - const CommunicatorBatchOutgoingAsyncPtr _outAsync; + const CommunicatorFlushBatchPtr _outAsync; InvocationObserver& _observer; }; @@ -1166,13 +785,9 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt try { - AsyncStatus status = con->flushAsyncBatchRequests(new BatchOutgoingAsyncI(this, _instance, _observer)); - if(!(status & AsyncStatusSent)) - { - _sentSynchronously = false; - } + con->flushAsyncBatchRequests(new FlushBatch(this, _instance, _observer)); } - catch(const Ice::LocalException&) + catch(const LocalException&) { check(false); throw; @@ -1180,19 +795,13 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt } void -IceInternal::CommunicatorBatchOutgoingAsync::ready() +CommunicatorFlushBatch::ready() { check(true); } void -IceInternal::CommunicatorBatchOutgoingAsync::__processRetry() -{ - assert(false); // Retries are never scheduled -} - -void -IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) +CommunicatorFlushBatch::check(bool userThread) { { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); @@ -1201,255 +810,18 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) { return; } - _state |= Done | OK | Sent; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation - _monitor.notifyAll(); } - if(!_callback || !_callback->hasSentCallback()) + if(sent(true)) { - _observer.detach(); - } - else - { - // - // _sentSynchronously is immutable here. - // - if(!_sentSynchronously || !userThread) + if(userThread) { - __invokeSentAsync(); + _sentSynchronously = true; + invokeSent(); } else { - AsyncResult::__invokeSent(); - } - } -} - -IceInternal::GetConnectionOutgoingAsync::GetConnectionOutgoingAsync(const Ice::ObjectPrx& prx, - const std::string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - AsyncResult(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie), - _proxy(prx), - _cnt(0) -{ - _observer.attach(prx.get(), operation, 0); -} - -void -IceInternal::GetConnectionOutgoingAsync::__invoke() -{ - while(true) - { - try - { - _handler = _proxy->__getRequestHandler(); - _handler->sendAsyncRequest(this); - } - catch(const RetryException&) - { - _proxy->__setRequestHandler(_handler, 0); - } - catch(const Ice::Exception& ex) - { - handleException(ex); + invokeSentAsync(); } - break; - } -} - -AsyncStatus -IceInternal::GetConnectionOutgoingAsync::__send(const Ice::ConnectionIPtr&, bool, bool) -{ - __sent(); - return AsyncStatusSent; -} - -AsyncStatus -IceInternal::GetConnectionOutgoingAsync::__invokeCollocated(CollocatedRequestHandler*) -{ - __sent(); - return AsyncStatusSent; -} - -bool -IceInternal::GetConnectionOutgoingAsync::__sent() -{ - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _state |= Done; - _monitor.notifyAll(); - } - __invokeCompleted(); - return false; -} - -void -IceInternal::GetConnectionOutgoingAsync::__invokeSent() -{ - // No sent callback -} - -void -IceInternal::GetConnectionOutgoingAsync::__finished(const Ice::Exception& exc) -{ - try - { - handleException(exc); - } - catch(const Ice::Exception& ex) - { - __invokeException(ex); - } -} - -void -IceInternal::GetConnectionOutgoingAsync::__processRetry() -{ - __invoke(); -} - -void -IceInternal::GetConnectionOutgoingAsync::handleException(const Ice::Exception& exc) -{ - try - { - _instance->retryQueue()->add(this, _proxy->__handleException(exc, _handler, Ice::Idempotent, false, _cnt)); - _observer.retried(); // Invocation is being retried. - } - catch(const Ice::Exception& ex) - { - _observer.failed(ex.ice_name()); - throw; - } -} - -namespace -{ - -// -// Dummy class derived from CallbackBase -// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn, -// this allows us to test whether the user supplied a null delegate instance to the -// generated begin_ method without having to generate a separate test to throw IllegalArgumentException -// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated -// object code. -// -class DummyCallback : public CallbackBase -{ -public: - - DummyCallback() - { - } - - virtual void - completed(const Ice::AsyncResultPtr&) const - { - assert(false); - } - - virtual CallbackBasePtr - verify(const Ice::LocalObjectPtr&) - { - // - // Called by the AsyncResult constructor to verify the delegate. The dummy - // delegate is passed when the user used a begin_ method without delegate. - // By returning 0 here, we tell the AsyncResult that no delegates was - // provided. - // - return 0; - } - - virtual void - sent(const AsyncResultPtr&) const - { - assert(false); - } - - virtual bool - hasSentCallback() const - { - assert(false); - return false; - } -}; - -} - -// -// This gives a pointer value to compare against in the generated -// begin_ method to decide whether the caller passed a null pointer -// versus the generated inline version of the begin_ method having -// passed a pointer to the dummy delegate. -// -CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback; - -#ifdef ICE_CPP11 - -Ice::CallbackPtr -Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& completed, - const ::IceInternal::Function<void (const AsyncResultPtr&)>& sent) -{ - class Cpp11CB : public GenericCallbackBase - { - public: - - Cpp11CB(const ::std::function<void (const AsyncResultPtr&)>& completed, - const ::std::function<void (const AsyncResultPtr&)>& sent) : - _completed(completed), - _sent(sent) - { - checkCallback(true, completed != nullptr); - } - - virtual void - completed(const AsyncResultPtr& result) const - { - _completed(result); - } - - virtual CallbackBasePtr - verify(const LocalObjectPtr&) - { - return this; // Nothing to do, the cookie is not type-safe. - } - - virtual void - sent(const AsyncResultPtr& result) const - { - if(_sent != nullptr) - { - _sent(result); - } - } - - virtual bool - hasSentCallback() const - { - return _sent != nullptr; - } - - private: - - ::std::function< void (const AsyncResultPtr&)> _completed; - ::std::function< void (const AsyncResultPtr&)> _sent; - }; - - return new Cpp11CB(completed, sent); -} -#endif - -void -IceInternal::CallbackBase::checkCallback(bool obj, bool cb) -{ - if(!obj) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback object cannot be null"); - } - if(!cb) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback cannot be null"); } } diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index f455f5e6361..13bd0120a8f 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -217,15 +217,15 @@ IceProxy::Ice::Object::begin_ice_isA(const string& typeId, OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_isA_name, del, cookie); try { - __result->__prepare(ice_isA_name, Nonmutating, ctx); - IceInternal::BasicStream* __os = __result->__startWriteParams(DefaultFormat); + __result->prepare(ice_isA_name, Nonmutating, ctx); + IceInternal::BasicStream* __os = __result->startWriteParams(DefaultFormat); __os->write(typeId); - __result->__endWriteParams(); - __result->__invoke(true); + __result->endWriteParams(); + __result->invoke(); } catch(const Exception& __ex) { - __result->__invokeExceptionAsync(__ex); + __result->abort(__ex); } return __result; } @@ -233,12 +233,11 @@ IceProxy::Ice::Object::begin_ice_isA(const string& typeId, #ifdef ICE_CPP11 Ice::AsyncResultPtr -IceProxy::Ice::Object::__begin_ice_isA( - const ::std::string& typeId, - const ::Ice::Context* ctx, - const ::IceInternal::Function<void (bool)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception, - const ::IceInternal::Function<void (bool)>& sent) +IceProxy::Ice::Object::__begin_ice_isA(const ::std::string& typeId, + const ::Ice::Context* ctx, + const ::IceInternal::Function<void (bool)>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception, + const ::IceInternal::Function<void (bool)>& sent) { class Cpp11CB : public ::IceInternal::Cpp11FnCallbackNC { @@ -281,11 +280,10 @@ IceProxy::Ice::Object::__begin_ice_isA( } Ice::AsyncResultPtr -IceProxy::Ice::Object::__begin_ice_id( - const ::Ice::Context* ctx, - const ::IceInternal::Function<void (const ::std::string&)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception, - const ::IceInternal::Function<void (bool)>& sent) +IceProxy::Ice::Object::__begin_ice_id(const ::Ice::Context* ctx, + const ::IceInternal::Function<void (const ::std::string&)>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception, + const ::IceInternal::Function<void (bool)>& sent) { class Cpp11CB : public ::IceInternal::Cpp11FnCallbackNC { @@ -573,13 +571,13 @@ IceProxy::Ice::Object::begin_ice_ping(const Context* ctx, OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_ping_name, del, cookie); try { - __result->__prepare(ice_ping_name, Nonmutating, ctx); - __result->__writeEmptyParams(); - __result->__invoke(true); + __result->prepare(ice_ping_name, Nonmutating, ctx); + __result->writeEmptyParams(); + __result->invoke(); } catch(const Exception& __ex) { - __result->__invokeExceptionAsync(__ex); + __result->abort(__ex); } return __result; } @@ -647,13 +645,13 @@ IceProxy::Ice::Object::begin_ice_ids(const Context* ctx, OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_ids_name, del, cookie); try { - __result->__prepare(ice_ids_name, Nonmutating, ctx); - __result->__writeEmptyParams(); - __result->__invoke(true); + __result->prepare(ice_ids_name, Nonmutating, ctx); + __result->writeEmptyParams(); + __result->invoke(); } catch(const Exception& __ex) { - __result->__invokeExceptionAsync(__ex); + __result->abort(__ex); } return __result; } @@ -690,13 +688,13 @@ IceProxy::Ice::Object::begin_ice_id(const Context* ctx, OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_id_name, del, cookie); try { - __result->__prepare(ice_id_name, Nonmutating, ctx); - __result->__writeEmptyParams(); - __result->__invoke(true); + __result->prepare(ice_id_name, Nonmutating, ctx); + __result->writeEmptyParams(); + __result->invoke(); } catch(const Exception& __ex) { - __result->__invokeExceptionAsync(__ex); + __result->abort(__ex); } return __result; } @@ -818,13 +816,13 @@ IceProxy::Ice::Object::begin_ice_invoke(const string& operation, OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_invoke_name, del, cookie); try { - __result->__prepare(operation, mode, ctx); - __result->__writeParamEncaps(inEncaps.first, static_cast<Int>(inEncaps.second - inEncaps.first)); - __result->__invoke(true); + __result->prepare(operation, mode, ctx); + __result->writeParamEncaps(inEncaps.first, static_cast<Int>(inEncaps.second - inEncaps.first)); + __result->invoke(); } catch(const Exception& __ex) { - __result->__invokeExceptionAsync(__ex); + __result->abort(__ex); } return __result; } @@ -1389,17 +1387,16 @@ AsyncResultPtr IceProxy::Ice::Object::begin_ice_getConnectionInternal(const ::IceInternal::CallbackBasePtr& del, const ::Ice::LocalObjectPtr& cookie) { - ::IceInternal::GetConnectionOutgoingAsyncPtr __result = - new ::IceInternal::GetConnectionOutgoingAsync(this, ice_getConnection_name, del, cookie); + ProxyGetConnectionPtr result = new ProxyGetConnection(this, ice_getConnection_name, del, cookie); try { - __result->__invoke(); + result->invoke(); } - catch(const Exception& __ex) + catch(const Exception& ex) { - __result->__invokeExceptionAsync(__ex); + result->abort(ex); } - return __result; + return result; } ConnectionPtr @@ -1435,32 +1432,31 @@ IceProxy::Ice::Object::ice_getCachedConnection() const void IceProxy::Ice::Object::ice_flushBatchRequests() { - BatchOutgoing __og(this, ice_flushBatchRequests_name); - __og.invoke(); + FlushBatch og(this, ice_flushBatchRequests_name); + og.invoke(); } ::Ice::AsyncResultPtr IceProxy::Ice::Object::begin_ice_flushBatchRequestsInternal(const ::IceInternal::CallbackBasePtr& del, const ::Ice::LocalObjectPtr& cookie) { - ::IceInternal::ProxyBatchOutgoingAsyncPtr __result = - new ::IceInternal::ProxyBatchOutgoingAsync(this, ice_flushBatchRequests_name, del, cookie); + ProxyFlushBatchPtr result = new ProxyFlushBatch(this, ice_flushBatchRequests_name, del, cookie); try { - __result->__invoke(); + result->invoke(); } - catch(const Exception& __ex) + catch(const Exception& ex) { - __result->__invokeExceptionAsync(__ex); + result->abort(ex); } - return __result; + return result; } void -IceProxy::Ice::Object::end_ice_flushBatchRequests(const AsyncResultPtr& __result) +IceProxy::Ice::Object::end_ice_flushBatchRequests(const AsyncResultPtr& result) { - AsyncResult::__check(__result, this, ice_flushBatchRequests_name); - __result->__wait(); + AsyncResult::__check(result, this, ice_flushBatchRequests_name); + result->__wait(); } Int diff --git a/cpp/src/Ice/ProxyFactory.cpp b/cpp/src/Ice/ProxyFactory.cpp index 68ca8b02598..fb2aea3e337 100644 --- a/cpp/src/Ice/ProxyFactory.cpp +++ b/cpp/src/Ice/ProxyFactory.cpp @@ -223,7 +223,7 @@ IceInternal::ProxyFactory::checkRetryAfterException(const LocalException& ex, co // // Don't retry invocation timeouts. // - if(dynamic_cast<const InvocationTimeoutException*>(&ex)) + if(dynamic_cast<const InvocationTimeoutException*>(&ex) || dynamic_cast<const InvocationCanceledException*>(&ex)) { ex.ice_throw(); } diff --git a/cpp/src/Ice/RequestHandler.cpp b/cpp/src/Ice/RequestHandler.cpp index 2cbf7826213..07ec753912d 100644 --- a/cpp/src/Ice/RequestHandler.cpp +++ b/cpp/src/Ice/RequestHandler.cpp @@ -14,6 +14,7 @@ using namespace std; using namespace IceInternal; IceUtil::Shared* IceInternal::upCast(RequestHandler* p) { return p; } +IceUtil::Shared* IceInternal::upCast(CancellationHandler* p) { return p; } RetryException::RetryException(const Ice::LocalException& ex) { @@ -32,10 +33,6 @@ RetryException::get() const return _ex.get(); } -RequestHandler::~RequestHandler() -{ -} - RequestHandler::RequestHandler(const ReferencePtr& reference) : _reference(reference), _response(reference->getMode() == Reference::ModeTwoway) diff --git a/cpp/src/Ice/RequestHandler.h b/cpp/src/Ice/RequestHandler.h index 73c3e818b56..68ff00d647d 100644 --- a/cpp/src/Ice/RequestHandler.h +++ b/cpp/src/Ice/RequestHandler.h @@ -31,7 +31,7 @@ namespace IceInternal class BasicStream; -class OutgoingMessageCallback; +class OutgoingBase; // // An exception wrapper, which is used to notify that the request @@ -51,11 +51,17 @@ private: IceUtil::UniquePtr<Ice::LocalException> _ex; }; -class RequestHandler : virtual public ::IceUtil::Shared +class CancellationHandler : virtual public IceUtil::Shared { public: - virtual ~RequestHandler(); + virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&) = 0; + virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&) = 0; +}; + +class RequestHandler : public CancellationHandler +{ +public: virtual RequestHandlerPtr connect() = 0; virtual RequestHandlerPtr update(const RequestHandlerPtr&, const RequestHandlerPtr&) = 0; @@ -64,11 +70,8 @@ public: virtual void finishBatchRequest(BasicStream*) = 0; virtual void abortBatchRequest() = 0; - virtual bool sendRequest(OutgoingMessageCallback*) = 0; - virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr&) = 0; - - virtual void requestTimedOut(OutgoingMessageCallback*) = 0; - virtual void asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr&) = 0; + virtual bool sendRequest(OutgoingBase*) = 0; + virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncBasePtr&) = 0; const ReferencePtr& getReference() const { return _reference; } // Inlined for performances. diff --git a/cpp/src/Ice/RetryQueue.cpp b/cpp/src/Ice/RetryQueue.cpp index adfb6bf9440..c70ed379edd 100644 --- a/cpp/src/Ice/RetryQueue.cpp +++ b/cpp/src/Ice/RetryQueue.cpp @@ -18,7 +18,7 @@ using namespace IceInternal; IceUtil::Shared* IceInternal::upCast(RetryQueue* p) { return p; } -IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const AsyncResultPtr& outAsync) : +IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const ProxyOutgoingAsyncBasePtr& outAsync) : _queue(queue), _outAsync(outAsync) { } @@ -26,14 +26,7 @@ IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const AsyncResultP void IceInternal::RetryTask::runTimerTask() { - try - { - _outAsync->__processRetry(); - } - catch(const Ice::LocalException& ex) - { - _outAsync->__invokeExceptionAsync(ex); - } + _outAsync->retry(); // Retry again the invocation. // // NOTE: this must be called last, destroy() blocks until all task @@ -44,10 +37,37 @@ IceInternal::RetryTask::runTimerTask() _queue->remove(this); } +void +IceInternal::RetryTask::requestCanceled(OutgoingBase*, const Ice::LocalException&) +{ + assert(false); +} + +void +IceInternal::RetryTask::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException&) +{ + if(_queue->cancel(this)) + { + // + // We just retry the outgoing async now rather than marking it + // as finished. The retry will check for the cancellation + // exception and terminate appropriately the request. + // + _outAsync->retry(); + } +} + void IceInternal::RetryTask::destroy() { - _outAsync->__invokeExceptionAsync(CommunicatorDestroyedException(__FILE__, __LINE__)); + try + { + _outAsync->abort(CommunicatorDestroyedException(__FILE__, __LINE__)); + } + catch(const CommunicatorDestroyedException&) + { + // Abort shouldn't throw if there's no callback, ignore. + } } bool @@ -61,7 +81,7 @@ IceInternal::RetryQueue::RetryQueue(const InstancePtr& instance) : _instance(ins } void -IceInternal::RetryQueue::add(const AsyncResultPtr& out, int interval) +IceInternal::RetryQueue::add(const ProxyOutgoingAsyncBasePtr& out, int interval) { Lock sync(*this); if(!_instance) @@ -78,6 +98,7 @@ IceInternal::RetryQueue::add(const AsyncResultPtr& out, int interval) throw CommunicatorDestroyedException(__FILE__, __LINE__); } _requests.insert(task); + out->cancelable(task); } void @@ -119,4 +140,17 @@ IceInternal::RetryQueue::remove(const RetryTaskPtr& task) } } - +bool +IceInternal::RetryQueue::cancel(const RetryTaskPtr& task) +{ + Lock sync(*this); + if(_requests.erase(task) > 0) + { + if(!_instance && _requests.empty()) + { + notify(); // If we are destroying the queue, destroy is probably waiting on the queue to be empty. + } + return _instance->timer()->cancel(task); + } + return false; +} diff --git a/cpp/src/Ice/RetryQueue.h b/cpp/src/Ice/RetryQueue.h index 5a0b7cf52a2..71c9c946d8d 100644 --- a/cpp/src/Ice/RetryQueue.h +++ b/cpp/src/Ice/RetryQueue.h @@ -16,25 +16,31 @@ #include <Ice/RetryQueueF.h> #include <Ice/OutgoingAsyncF.h> #include <Ice/InstanceF.h> +#include <Ice/RequestHandler.h> // For CancellationHandler namespace IceInternal { -class RetryTask : public IceUtil::TimerTask +class RetryTask : public IceUtil::TimerTask, public CancellationHandler { public: - RetryTask(const RetryQueuePtr&, const Ice::AsyncResultPtr&); + RetryTask(const RetryQueuePtr&, const ProxyOutgoingAsyncBasePtr&); virtual void runTimerTask(); + + virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&); + virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&); + void destroy(); bool operator<(const RetryTask&) const; + private: const RetryQueuePtr _queue; - const Ice::AsyncResultPtr _outAsync; + const ProxyOutgoingAsyncBasePtr _outAsync; }; typedef IceUtil::Handle<RetryTask> RetryTaskPtr; @@ -44,12 +50,13 @@ public: RetryQueue(const InstancePtr&); - void add(const Ice::AsyncResultPtr&, int); + void add(const ProxyOutgoingAsyncBasePtr&, int); void destroy(); private: void remove(const RetryTaskPtr&); + bool cancel(const RetryTaskPtr&); friend class RetryTask; InstancePtr _instance; diff --git a/cpp/src/Ice/winrt/Makefile.mak b/cpp/src/Ice/winrt/Makefile.mak index 7eda814e8f6..c05ba0bc7e0 100644 --- a/cpp/src/Ice/winrt/Makefile.mak +++ b/cpp/src/Ice/winrt/Makefile.mak @@ -7,13 +7,14 @@ # # ********************************************************************** -top_srcdir = ..\..\.. +top_srcdir = ..\..\.. LIBNAME = $(SDK_LIBRARY_PATH)\ice.lib TARGETS = $(LIBNAME) SOURCE_DIR = .. OBJS = $(ARCH)\$(CONFIG)\Acceptor.obj \ $(ARCH)\$(CONFIG)\ACM.obj \ + $(ARCH)\$(CONFIG)\AsyncResult.obj \ $(ARCH)\$(CONFIG)\Base64.obj \ $(ARCH)\$(CONFIG)\Buffer.obj \ $(ARCH)\$(CONFIG)\BasicStream.obj \ diff --git a/cpp/src/IceDiscovery/LocatorI.h b/cpp/src/IceDiscovery/LocatorI.h index 700f4305129..bb48d8cb509 100644 --- a/cpp/src/IceDiscovery/LocatorI.h +++ b/cpp/src/IceDiscovery/LocatorI.h @@ -13,6 +13,8 @@ #include <Ice/Locator.h> #include <Ice/ProxyF.h> +#include <set> + namespace IceDiscovery { diff --git a/cpp/src/IceDiscovery/LookupI.h b/cpp/src/IceDiscovery/LookupI.h index 1a249b4e0da..58e1e7ca044 100644 --- a/cpp/src/IceDiscovery/LookupI.h +++ b/cpp/src/IceDiscovery/LookupI.h @@ -13,6 +13,7 @@ #include <IceDiscovery/IceDiscovery.h> #include <IceDiscovery/LocatorI.h> +#include <IceUtil/Timer.h> #include <Ice/Properties.h> namespace IceDiscovery diff --git a/cpp/src/IceGrid/AdapterCache.h b/cpp/src/IceGrid/AdapterCache.h index 8c37a623def..78258c2ff54 100644 --- a/cpp/src/IceGrid/AdapterCache.h +++ b/cpp/src/IceGrid/AdapterCache.h @@ -16,6 +16,8 @@ #include <IceGrid/Query.h> #include <IceGrid/Internal.h> +#include <set> + namespace IceGrid { diff --git a/cpp/src/slice2cpp/Gen.cpp b/cpp/src/slice2cpp/Gen.cpp index 5bf6c662656..57f3337c06f 100644 --- a/cpp/src/slice2cpp/Gen.cpp +++ b/cpp/src/slice2cpp/Gen.cpp @@ -373,8 +373,7 @@ Slice::Gen::generate(const UnitPtr& p) { H << "\n#include <Ice/Proxy.h>"; H << "\n#include <Ice/GCObject.h>"; - H << "\n#include <Ice/Outgoing.h>"; - H << "\n#include <Ice/OutgoingAsync.h>"; + H << "\n#include <Ice/AsyncResult.h>"; H << "\n#include <Ice/Incoming.h>"; if(p->hasContentsWithMetaData("amd")) { @@ -382,11 +381,14 @@ Slice::Gen::generate(const UnitPtr& p) } C << "\n#include <Ice/LocalException.h>"; C << "\n#include <Ice/ObjectFactory.h>"; + C << "\n#include <Ice/Outgoing.h>"; + C << "\n#include <Ice/OutgoingAsync.h>"; } else if(p->hasLocalClassDefsWithAsync()) { H << "\n#include <Ice/Proxy.h>"; - H << "\n#include <Ice/OutgoingAsync.h>"; + H << "\n#include <Ice/AsyncResult.h>"; + C << "\n#include <Ice/OutgoingAsync.h>"; } else if(p->hasNonLocalClassDecls()) { @@ -2184,26 +2186,26 @@ Slice::Gen::ProxyVisitor::visitOperation(const OperationPtr& p) C << flatName << ", __del, __cookie);"; C << nl << "try"; C << sb; - C << nl << "__result->__prepare(" << flatName << ", " << operationModeToString(p->sendMode()) << ", __ctx);"; + C << nl << "__result->prepare(" << flatName << ", " << operationModeToString(p->sendMode()) << ", __ctx);"; if(inParams.empty()) { - C << nl << "__result->__writeEmptyParams();"; + C << nl << "__result->writeEmptyParams();"; } else { - C << nl << "::IceInternal::BasicStream* __os = __result->__startWriteParams(" << opFormatTypeToString(p) <<");"; + C << nl << "::IceInternal::BasicStream* __os = __result->startWriteParams(" << opFormatTypeToString(p) <<");"; writeMarshalCode(C, inParams, 0, TypeContextInParam); if(p->sendsClasses(false)) { C << nl << "__os->writePendingObjects();"; } - C << nl << "__result->__endWriteParams();"; + C << nl << "__result->endWriteParams();"; } - C << nl << "__result->__invoke(true);"; + C << nl << "__result->invoke();"; C << eb; C << nl << "catch(const ::Ice::Exception& __ex)"; C << sb; - C << nl << "__result->__invokeExceptionAsync(__ex);"; + C << nl << "__result->abort(__ex);"; C << eb; C << nl << "return __result;"; C << eb; diff --git a/cpp/src/slice2cs/Gen.cpp b/cpp/src/slice2cs/Gen.cpp index b877d04ce69..8ba2d479750 100644 --- a/cpp/src/slice2cs/Gen.cpp +++ b/cpp/src/slice2cs/Gen.cpp @@ -5312,8 +5312,8 @@ Slice::Gen::HelperVisitor::visitClassDefStart(const ClassDefPtr& p) _out << sb; if(op->returnsData()) { - _out << nl << "IceInternal.OutgoingAsync outAsync__ = (IceInternal.OutgoingAsync)r__;"; - _out << nl << "IceInternal.OutgoingAsync.check(outAsync__, this, " << flatName << ");"; + _out << nl << "IceInternal.OutgoingAsync outAsync__ = IceInternal.OutgoingAsync.check(r__, this, " + << flatName << ");"; _out << nl << "try"; _out << sb; @@ -5480,11 +5480,11 @@ Slice::Gen::HelperVisitor::visitClassDefStart(const ClassDefPtr& p) _out << nl << "result__.writeEmptyParams();"; } - _out << nl << "result__.invoke(true);"; + _out << nl << "result__.invoke();"; _out << eb; _out << nl << "catch(Ice.Exception ex__)"; _out << sb; - _out << nl << "result__.invokeExceptionAsync(ex__);"; + _out << nl << "result__.abort(ex__);"; _out << eb; _out << nl << "return result__;"; _out << eb; diff --git a/cpp/src/slice2java/Gen.cpp b/cpp/src/slice2java/Gen.cpp index a288cfd8290..8eaccb5650e 100644 --- a/cpp/src/slice2java/Gen.cpp +++ b/cpp/src/slice2java/Gen.cpp @@ -4752,8 +4752,8 @@ Slice::Gen::HelperVisitor::visitClassDefStart(const ClassDefPtr& p) out << sb; if(op->returnsData()) { - out << nl << "IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult;"; - out << nl << "IceInternal.OutgoingAsyncBase.check(__result, this, __" << op->name() << "_name);"; + out << nl << "IceInternal.OutgoingAsync __result = IceInternal.OutgoingAsync.check(__iresult, this, __" + << op->name() << "_name);"; out << nl << "try"; out << sb; @@ -5737,11 +5737,11 @@ Slice::Gen::HelperVisitor::writeOperation(const ClassDefPtr& p, const string& pa out << nl << "__result.writeEmptyParams();"; } - out << nl << "__result.invoke(true);"; + out << nl << "__result.invoke();"; out << eb; out << nl << "catch(Ice.Exception __ex)"; out << sb; - out << nl << "__result.invokeExceptionAsync(__ex);"; + out << nl << "__result.abort(__ex);"; out << eb; out << nl << "return __result;"; out << eb; diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp index c164220c2ae..406461975b7 100644 --- a/cpp/test/Ice/ami/AllTests.cpp +++ b/cpp/test/Ice/ami/AllTests.cpp @@ -1983,6 +1983,14 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) test(r->isSent()); test(r->isCompleted()); test(p->waitForBatch(2)); + + // Ensure it also works with a twoway proxy + cb = new FlushCallback(); + r = p->ice_getConnection()->begin_flushBatchRequests( + Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); + cb->check(); + test(r->isSent()); + test(r->isCompleted()); } { @@ -2657,6 +2665,73 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) communicator->end_flushBatchRequests(r); } } + + if(p->ice_getConnection()) + { + testController->holdAdapter(); + + Ice::AsyncResultPtr r; + Ice::ByteSeq seq; + seq.resize(1024); // Make sure the request doesn't compress too well. + for(Ice::ByteSeq::iterator q = seq.begin(); q != seq.end(); ++q) + { + *q = static_cast<Ice::Byte>(IceUtilInternal::random(255)); + } + while((r = p->begin_opWithPayload(seq))->sentSynchronously()); + + test(!r->isSent()); + + Ice::AsyncResultPtr r1 = p->begin_ice_ping(); + Ice::AsyncResultPtr r2 = p->begin_ice_id(); + r1->cancel(); + r2->cancel(); + try + { + p->end_ice_ping(r1); + test(false); + } + catch(const Ice::InvocationCanceledException&) + { + } + try + { + p->end_ice_id(r2); + test(false); + } + catch(const Ice::InvocationCanceledException&) + { + } + + testController->resumeAdapter(); + p->ice_ping(); + test(!r1->isSent() && r1->isCompleted()); + test(!r2->isSent() && r2->isCompleted()); + + testController->holdAdapter(); + r1 = p->begin_op(); + r2 = p->begin_ice_id(); + r1->waitForSent(); + r2->waitForSent(); + r1->cancel(); + r2->cancel(); + try + { + p->end_op(r1); + test(false); + } + catch(const Ice::InvocationCanceledException&) + { + } + try + { + p->end_ice_id(r2); + test(false); + } + catch(const Ice::InvocationCanceledException&) + { + } + testController->resumeAdapter(); + } } cout << "ok" << endl; diff --git a/cpp/test/Ice/retry/AllTests.cpp b/cpp/test/Ice/retry/AllTests.cpp index 8722e3aa32c..3df2727cfd3 100644 --- a/cpp/test/Ice/retry/AllTests.cpp +++ b/cpp/test/Ice/retry/AllTests.cpp @@ -112,7 +112,7 @@ allTests(const Ice::CommunicatorPtr& communicator) retry1->op(false); cout << "ok" << endl; - int invocationCount = 3; + testInvocationCount(3); cout << "calling operation to kill connection with second proxy... " << flush; try @@ -127,15 +127,15 @@ allTests(const Ice::CommunicatorPtr& communicator) catch(const Ice::ConnectionLostException&) { } - testInvocationCount(invocationCount + 1); + testInvocationCount(1); testFailureCount(1); testRetryCount(0); cout << "ok" << endl; cout << "calling regular operation with first proxy again... " << flush; retry1->op(false); - testInvocationCount(invocationCount + 2); - testFailureCount(1); + testInvocationCount(1); + testFailureCount(0); testRetryCount(0); cout << "ok" << endl; @@ -145,66 +145,66 @@ allTests(const Ice::CommunicatorPtr& communicator) cout << "calling regular AMI operation with first proxy... " << flush; retry1->begin_op(false, newCallback_Retry_op(cb1, &CallbackSuccess::response, &CallbackSuccess::exception)); cb1->check(); - testInvocationCount(invocationCount + 3); - testFailureCount(1); + testInvocationCount(1); + testFailureCount(0); testRetryCount(0); cout << "ok" << endl; cout << "calling AMI operation to kill connection with second proxy... " << flush; retry2->begin_op(true, newCallback_Retry_op(cb2, &CallbackFail::response, &CallbackFail::exception)); cb2->check(); - testInvocationCount(invocationCount + 4); - testFailureCount(2); + testInvocationCount(1); + testFailureCount(1); testRetryCount(0); cout << "ok" << endl; cout << "calling regular AMI operation with first proxy again... " << flush; retry1->begin_op(false, newCallback_Retry_op(cb1, &CallbackSuccess::response, &CallbackSuccess::exception)); cb1->check(); - testInvocationCount(invocationCount + 5); - testFailureCount(2); + testInvocationCount(1); + testFailureCount(0); testRetryCount(0); cout << "ok" << endl; cout << "testing idempotent operation... " << flush; - test(retry1->opIdempotent(0) == 4); - testInvocationCount(invocationCount + 6); - testFailureCount(2); + test(retry1->opIdempotent(4) == 4); + testInvocationCount(1); + testFailureCount(0); + testRetryCount(4); + test(retry1->end_opIdempotent(retry1->begin_opIdempotent(4)) == 4); + testInvocationCount(1); + testFailureCount(0); testRetryCount(4); - test(retry1->end_opIdempotent(retry1->begin_opIdempotent(4)) == 8); - testInvocationCount(invocationCount + 7); - testFailureCount(2); - testRetryCount(8); cout << "ok" << endl; cout << "testing non-idempotent operation... " << flush; try { - retry1->opNotIdempotent(8); + retry1->opNotIdempotent(); test(false); } catch(const Ice::LocalException&) { } - testInvocationCount(invocationCount + 8); - testFailureCount(3); - testRetryCount(8); + testInvocationCount(1); + testFailureCount(1); + testRetryCount(0); try { - retry1->end_opNotIdempotent(retry1->begin_opNotIdempotent(9)); + retry1->end_opNotIdempotent(retry1->begin_opNotIdempotent()); test(false); } catch(const Ice::LocalException&) { } - testInvocationCount(invocationCount + 9); - testFailureCount(4); - testRetryCount(8); + testInvocationCount(1); + testFailureCount(1); + testRetryCount(0); cout << "ok" << endl; if(!retry1->ice_getConnection()) { - invocationCount = invocationCount + 10; + testInvocationCount(-1); cout << "testing system exception... " << flush; try { @@ -214,9 +214,9 @@ allTests(const Ice::CommunicatorPtr& communicator) catch(const SystemFailure&) { } - test(invocationCount + 1); - testFailureCount(5); - testRetryCount(8); + testInvocationCount(1); + testFailureCount(1); + testRetryCount(0); try { retry1->end_opSystemException(retry1->begin_opSystemException()); @@ -225,11 +225,38 @@ allTests(const Ice::CommunicatorPtr& communicator) catch(const SystemFailure&) { } - testInvocationCount(invocationCount + 2); - testFailureCount(6); - testRetryCount(8); + testInvocationCount(1); + testFailureCount(1); + testRetryCount(0); cout << "ok" << endl; } + cout << "testing invocation timeout and retries... " << flush; + try + { + retry1->ice_invocationTimeout(50)->opIdempotent(4); // No more than 2 retries before timeout kicks-in + test(false); + } + catch(const Ice::InvocationTimeoutException&) + { + testRetryCount(2); + retry1->opIdempotent(-1); // Reset the counter + testRetryCount(-1); + } + try + { + // No more than 2 retries before timeout kicks-in + RetryPrx prx = retry1->ice_invocationTimeout(50); + prx->end_opIdempotent(prx->begin_opIdempotent(4)); + test(false); + } + catch(const Ice::InvocationTimeoutException&) + { + testRetryCount(2); + retry1->opIdempotent(-1); + testRetryCount(-1); + } + cout << "ok" << endl; + return retry1; } diff --git a/cpp/test/Ice/retry/Client.cpp b/cpp/test/Ice/retry/Client.cpp index 5b877b2c1a9..53bed9b8c2f 100644 --- a/cpp/test/Ice/retry/Client.cpp +++ b/cpp/test/Ice/retry/Client.cpp @@ -38,7 +38,7 @@ main(int argc, char* argv[]) initData.properties = Ice::createProperties(argc, argv); initData.observer = getObserver(); - initData.properties->setProperty("Ice.RetryIntervals", "0 10 20 30"); + initData.properties->setProperty("Ice.RetryIntervals", "0 1 100 1"); // // This test kills connections, so we don't want warnings. diff --git a/cpp/test/Ice/retry/Collocated.cpp b/cpp/test/Ice/retry/Collocated.cpp index 8bb1be3b5ca..46729831b47 100644 --- a/cpp/test/Ice/retry/Collocated.cpp +++ b/cpp/test/Ice/retry/Collocated.cpp @@ -45,7 +45,7 @@ main(int argc, char* argv[]) initData.properties = Ice::createProperties(argc, argv); initData.observer = getObserver(); - initData.properties->setProperty("Ice.RetryIntervals", "0 10 20 30"); + initData.properties->setProperty("Ice.RetryIntervals", "0 1 100 1"); // // This test kills connections, so we don't want warnings. diff --git a/cpp/test/Ice/retry/InstrumentationI.cpp b/cpp/test/Ice/retry/InstrumentationI.cpp index a39a2804b38..f60ba5706c9 100644 --- a/cpp/test/Ice/retry/InstrumentationI.cpp +++ b/cpp/test/Ice/retry/InstrumentationI.cpp @@ -143,6 +143,12 @@ Ice::Instrumentation::CommunicatorObserverPtr communicatorObserver = new Communi void testEqual(int& value, int expected) { + if(expected < 0) + { + value = 0; + return; + } + int retry = 0; while(++retry < 100) { @@ -160,6 +166,7 @@ testEqual(int& value, int expected) std::cerr << "value = " << value << ", expected = " << expected << std::endl; test(false); } + value = 0; } } diff --git a/cpp/test/Ice/retry/Test.ice b/cpp/test/Ice/retry/Test.ice index 61297d94557..cd8c2595796 100644 --- a/cpp/test/Ice/retry/Test.ice +++ b/cpp/test/Ice/retry/Test.ice @@ -17,10 +17,10 @@ interface Retry void op(bool kill); idempotent int opIdempotent(int c); - void opNotIdempotent(int c); + void opNotIdempotent(); void opSystemException(); - void shutdown(); + idempotent void shutdown(); }; }; diff --git a/cpp/test/Ice/retry/TestI.cpp b/cpp/test/Ice/retry/TestI.cpp index b3545fa4c7a..9a0ed3fc81d 100644 --- a/cpp/test/Ice/retry/TestI.cpp +++ b/cpp/test/Ice/retry/TestI.cpp @@ -11,13 +11,6 @@ #include <TestI.h> #include <SystemFailure.h> -namespace -{ - -const int nRetry = 4; // See Ice.RetryIntervals configuration in Client.cpp/Collocated.cpp - -} - RetryI::RetryI() : _counter(0) { } @@ -39,9 +32,15 @@ RetryI::op(bool kill, const Ice::Current& current) } int -RetryI::opIdempotent(int counter, const Ice::Current& current) +RetryI::opIdempotent(int nRetry, const Ice::Current& current) { - if(counter + nRetry > _counter) + if(nRetry < 0) + { + _counter = 0; + return 0; + } + + if(nRetry > _counter) { ++_counter; if(current.con) @@ -52,19 +51,16 @@ RetryI::opIdempotent(int counter, const Ice::Current& current) { throw Ice::ConnectionLostException(__FILE__, __LINE__); } + return 0; } - return _counter; + int counter = _counter; + _counter = 0; + return counter; } void -RetryI::opNotIdempotent(int counter, const Ice::Current& current) +RetryI::opNotIdempotent(const Ice::Current& current) { - if(_counter != counter) - { - return; - } - - ++_counter; if(current.con) { current.con->close(true); diff --git a/cpp/test/Ice/retry/TestI.h b/cpp/test/Ice/retry/TestI.h index 24405acbf99..c049fadfbd6 100644 --- a/cpp/test/Ice/retry/TestI.h +++ b/cpp/test/Ice/retry/TestI.h @@ -20,7 +20,7 @@ public: virtual void op(bool, const Ice::Current&); virtual int opIdempotent(int, const Ice::Current&); - virtual void opNotIdempotent(int, const Ice::Current&); + virtual void opNotIdempotent(const Ice::Current&); virtual void opSystemException(const Ice::Current&); virtual void shutdown(const Ice::Current&); |