// ********************************************************************** // // Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. // // ********************************************************************** #ifndef ICE_OUTGOING_ASYNC_H #define ICE_OUTGOING_ASYNC_H #include #include #include #include #include #include #include #include #include #include #include #include #include #ifndef ICE_CPP11_MAPPING # include # include #endif #include namespace IceInternal { class RetryException; class CollocatedRequestHandler; class ICE_API OutgoingAsyncCompletionCallback { protected: virtual bool handleSent(bool, bool) = 0; virtual bool handleException(const Ice::Exception&) = 0; virtual bool handleResponse(bool) = 0; virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const = 0; virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const = 0; virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const = 0; }; // // Base class for handling asynchronous invocations. This class is // responsible for the handling of the output stream and the child // invocation observer. // class ICE_API OutgoingAsyncBase : virtual public OutgoingAsyncCompletionCallback, #ifndef ICE_CPP11_MAPPING public Ice::AsyncResult, #endif public Ice::EnableSharedFromThis { public: virtual bool sent(); virtual bool exception(const Ice::Exception&); virtual bool response(); void invokeSentAsync(); void invokeExceptionAsync(); void invokeResponseAsync(); void invokeSent(); void invokeException(); void invokeResponse(); virtual void cancelable(const IceInternal::CancellationHandlerPtr&); void cancel(); #ifndef ICE_CPP11_MAPPING virtual Ice::Int getHash() const; virtual Ice::CommunicatorPtr getCommunicator() const; virtual Ice::ConnectionPtr getConnection() const; virtual Ice::ObjectPrx getProxy() const; virtual Ice::LocalObjectPtr getCookie() const; virtual const std::string& getOperation() const; virtual bool isCompleted() const; virtual void waitForCompleted(); virtual bool isSent() const; virtual void waitForSent(); virtual bool sentSynchronously() const; virtual void throwLocalException() const; virtual bool __wait(); virtual Ice::InputStream* __startReadParams(); virtual void __endReadParams(); virtual void __readEmptyParams(); virtual void __readParamEncaps(const ::Ice::Byte*&, ::Ice::Int&); virtual void __throwUserException(); #endif void attachRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& endpt, Ice::Int requestId) { const Ice::Int size = static_cast(_os.b.size() - headerSize - 4); _childObserver.attach(getObserver().getRemoteObserver(c, endpt, requestId, size)); } void attachCollocatedObserver(const Ice::ObjectAdapterPtr& adapter, Ice::Int requestId) { const Ice::Int size = static_cast(_os.b.size() - headerSize - 4); _childObserver.attach(getObserver().getCollocatedObserver(adapter, requestId, size)); } Ice::OutputStream* getOs() { return &_os; } Ice::InputStream* getIs() { return &_is; } protected: OutgoingAsyncBase(const InstancePtr&); bool sentImpl(bool); bool exceptionImpl(const Ice::Exception&); bool responseImpl(bool); void cancel(const Ice::LocalException&); void checkCanceled(); void warning(const std::exception&) const; void warning() const; // // This virtual method is necessary for the communicator flush // batch requests implementation. // virtual IceInternal::InvocationObserver& getObserver() { return _observer; } const InstancePtr _instance; Ice::ConnectionPtr _cachedConnection; bool _sentSynchronously; bool _doneInSent; unsigned char _state; #ifdef ICE_CPP11_MAPPING std::mutex _m; using Lock = std::lock_guard; std::exception_ptr _ex; std::exception_ptr _cancellationException; #else IceUtil::Monitor _m; typedef IceUtil::Monitor::Lock Lock; IceUtil::UniquePtr _ex; IceUtil::UniquePtr _cancellationException; Ice::LocalObjectPtr _cookie; #endif InvocationObserver _observer; ObserverHelperT _childObserver; Ice::OutputStream _os; Ice::InputStream _is; CancellationHandlerPtr _cancellationHandler; static const unsigned char OK; static const unsigned char Sent; #ifndef ICE_CPP11_MAPPING static const unsigned char Done; static const unsigned char EndCalled; #endif }; // // Base class for proxy based invocations. This class handles the // retry for proxy invocations. It also ensures the child observer is // correct notified of failures and make sure the retry task is // correctly canceled when the invocation completes. // class ICE_API ProxyOutgoingAsyncBase : public OutgoingAsyncBase, public IceUtil::TimerTask, public Ice::EnableSharedFromThis { public: virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool) = 0; virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*) = 0; virtual bool exception(const Ice::Exception&); virtual void cancelable(const CancellationHandlerPtr&); void retryException(const Ice::Exception&); void retry(); void abort(const Ice::Exception&); using Ice::EnableSharedFromThis::shared_from_this; #ifndef ICE_CPP11_MAPPING virtual Ice::ObjectPrx getProxy() const; virtual Ice::CommunicatorPtr getCommunicator() const; #endif protected: ProxyOutgoingAsyncBase(const Ice::ObjectPrxPtr&); ~ProxyOutgoingAsyncBase(); void invokeImpl(bool); bool sentImpl(bool); bool exceptionImpl(const Ice::Exception&); bool responseImpl(bool); virtual void runTimerTask(); const Ice::ObjectPrxPtr _proxy; RequestHandlerPtr _handler; Ice::OperationMode _mode; private: int _cnt; bool _sent; }; // // Class for handling Slice operation invocations // class ICE_API OutgoingAsync : public ProxyOutgoingAsyncBase { public: OutgoingAsync(const Ice::ObjectPrxPtr&); void prepare(const std::string&, Ice::OperationMode, const Ice::Context&); virtual bool sent(); virtual bool response(); virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool); virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*); void abort(const Ice::Exception&); void invoke(const std::string&); #ifdef ICE_CPP11_MAPPING void invoke(const std::string&, Ice::OperationMode, Ice::FormatType, const Ice::Context&, const std::function&); void throwUserException(); #endif Ice::OutputStream* startWriteParams(Ice::FormatType format) { _os.startEncapsulation(_encoding, format); return &_os; } void endWriteParams() { _os.endEncapsulation(); } void writeEmptyParams() { _os.writeEmptyEncapsulation(_encoding); } void writeParamEncaps(const ::Ice::Byte* encaps, ::Ice::Int size) { if(size == 0) { _os.writeEmptyEncapsulation(_encoding); } else { _os.writeEncapsulation(encaps, size); } } protected: const Ice::EncodingVersion _encoding; #ifdef ICE_CPP11_MAPPING std::function _userException; #endif bool _synchronous; }; // // Class for handling the proxy's begin_ice_flushBatchRequest request. // class ICE_API ProxyFlushBatchAsync : public ProxyOutgoingAsyncBase { public: ProxyFlushBatchAsync(const Ice::ObjectPrxPtr&); virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool); virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*); void invoke(const std::string&); private: int _batchRequestNum; }; typedef IceUtil::Handle ProxyFlushBatchAsyncPtr; // // Class for handling the proxy's begin_ice_getConnection request. // class ICE_API ProxyGetConnection : public ProxyOutgoingAsyncBase { public: ProxyGetConnection(const Ice::ObjectPrxPtr&); virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool); virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*); void invoke(const std::string&); }; typedef IceUtil::Handle ProxyGetConnectionPtr; // // Class for handling Ice::Connection::begin_flushBatchRequests // class ICE_API ConnectionFlushBatchAsync : public OutgoingAsyncBase { public: ConnectionFlushBatchAsync(const Ice::ConnectionIPtr&, const InstancePtr&); virtual Ice::ConnectionPtr getConnection() const; void invoke(const std::string&); private: const Ice::ConnectionIPtr _connection; }; typedef IceUtil::Handle ConnectionFlushBatchAsyncPtr; // // Class for handling Ice::Communicator::begin_flushBatchRequests // class ICE_API CommunicatorFlushBatchAsync : public OutgoingAsyncBase, public Ice::EnableSharedFromThis { public: CommunicatorFlushBatchAsync(const InstancePtr&); void flushConnection(const Ice::ConnectionIPtr&); void invoke(const std::string&); using Ice::EnableSharedFromThis::shared_from_this; private: void check(bool); int _useCount; InvocationObserver _observer; }; } namespace IceInternal { #ifdef ICE_CPP11_MAPPING class ICE_API LambdaInvoke : virtual public OutgoingAsyncCompletionCallback { public: LambdaInvoke(std::function&& exception, std::function&& sent) : _exception(std::move(exception)), _sent(std::move(sent)) { } protected: virtual bool handleSent(bool, bool) override; virtual bool handleException(const Ice::Exception&) override; virtual bool handleResponse(bool) override; virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const override; virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const override; virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const override; std::function _exception; std::function _sent; std::function _response; }; template class PromiseInvoke : virtual public OutgoingAsyncCompletionCallback { public: auto getFuture() -> decltype(std::declval().get_future()) { return _promise.get_future(); } protected: Promise _promise; std::function _response; private: virtual bool handleSent(bool, bool) override { return false; } virtual bool handleException(const Ice::Exception& ex) override { try { ex.ice_throw(); } catch(const Ice::Exception&) { _promise.set_exception(std::current_exception()); } return false; } virtual bool handleResponse(bool ok) override { _response(ok); return false; } virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const override { assert(false); } virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const override { assert(false); } virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const override { assert(false); } }; template class OutgoingAsyncT : public OutgoingAsync { public: using OutgoingAsync::OutgoingAsync; void invoke(const std::string& operation, Ice::OperationMode mode, Ice::FormatType format, const Ice::Context& ctx, std::function&& write, std::function&& userException) { _read = [](Ice::InputStream* stream) { T v; stream->read(v); return v; }; _userException = std::move(userException); OutgoingAsync::invoke(operation, mode, format, ctx, write); } void invoke(const std::string& operation, Ice::OperationMode mode, Ice::FormatType format, const Ice::Context& ctx, std::function&& write, std::function&& userException, std::function&& read) { _read = std::move(read); _userException = std::move(userException); OutgoingAsync::invoke(operation, mode, format, ctx, write); } protected: std::function _read; }; template<> class ICE_API OutgoingAsyncT : public OutgoingAsync { public: using OutgoingAsync::OutgoingAsync; void invoke(const std::string& operation, Ice::OperationMode mode, Ice::FormatType format, const Ice::Context& ctx, std::function&& write, std::function&& userException) { _userException = std::move(userException); OutgoingAsync::invoke(operation, mode, format, ctx, write); } }; template class LambdaOutgoing : public OutgoingAsyncT, public LambdaInvoke { public: LambdaOutgoing(const std::shared_ptr& proxy, std::function response, std::function& ex, std::function& sent) : OutgoingAsyncT(proxy), LambdaInvoke(std::move(ex), std::move(sent)) { _response = [this, response](bool ok) { if(ok) { assert(this->_read); this->_is.startEncapsulation(); R v = this->_read(&this->_is); this->_is.endEncapsulation(); if(response) { try { response(std::move(v)); } catch(...) { throw std::current_exception(); } } } else { this->throwUserException(); } }; } }; template<> class ICE_API LambdaOutgoing : public OutgoingAsyncT, public LambdaInvoke { public: LambdaOutgoing(const std::shared_ptr& proxy, std::function response, std::function& ex, std::function& sent) : OutgoingAsyncT(proxy), LambdaInvoke(std::move(ex), std::move(sent)) { _response = [this, response](bool ok) { if(this->_is.b.empty()) { // // If there's no response (oneway, batch-oneway proxies), we just set the promise // on completion without reading anything from the input stream. This is required for // batch invocations. // if(response) { try { response(); } catch(...) { throw std::current_exception(); } } } else if(ok) { this->_is.skipEmptyEncapsulation(); if(response) { try { response(); } catch(...) { throw std::current_exception(); } } } else { this->throwUserException(); } }; } }; template class PromiseOutgoing : public OutgoingAsyncT, public PromiseInvoke

{ public: PromiseOutgoing(const std::shared_ptr& proxy, bool synchronous) : OutgoingAsyncT(proxy) { this->_synchronous = synchronous; this->_response = [this](bool ok) { if(ok) { assert(this->_read); this->_is.startEncapsulation(); R v = this->_read(&this->_is); this->_is.endEncapsulation(); this->_promise.set_value(v); } else { this->throwUserException(); } }; } }; template class PromiseOutgoing : public OutgoingAsyncT, public PromiseInvoke

{ public: PromiseOutgoing(const std::shared_ptr& proxy, bool synchronous) : OutgoingAsyncT(proxy) { this->_synchronous = synchronous; this->_response = [&](bool ok) { if(this->_is.b.empty()) { // // If there's no response (oneway, batch-oneway proxies), we just set the promise // on completion without reading anything from the input stream. This is required for // batch invocations. // this->_promise.set_value(); } else if(ok) { this->_is.skipEmptyEncapsulation(); this->_promise.set_value(); } else { this->throwUserException(); } }; } virtual bool handleSent(bool done, bool) override { if(done) { PromiseInvoke

::_promise.set_value(); } return false; } }; #else // // Base class for all callbacks. // class ICE_API CallbackBase : public IceUtil::Shared { public: void checkCallback(bool, bool); virtual void completed(const ::Ice::AsyncResultPtr&) const = 0; virtual IceUtil::Handle verify(const ::Ice::LocalObjectPtr&) = 0; virtual void sent(const ::Ice::AsyncResultPtr&) const = 0; virtual bool hasSentCallback() const = 0; }; typedef IceUtil::Handle CallbackBasePtr; // // Base class for generic callbacks. // class ICE_API GenericCallbackBase : public virtual CallbackBase { }; // // See comments in OutgoingAsync.cpp // extern ICE_API CallbackBasePtr __dummyCallback; // // Generic callback template that requires the caller to down-cast the // proxy and the cookie that are obtained from the AsyncResult. // template class AsyncCallback : public GenericCallbackBase { public: typedef T callback_type; typedef IceUtil::Handle TPtr; typedef void (T::*Callback)(const ::Ice::AsyncResultPtr&); AsyncCallback(const TPtr& instance, Callback cb, Callback sentcb = 0) : _callback(instance), _completed(cb), _sent(sentcb) { checkCallback(instance, cb != 0); } virtual void completed(const ::Ice::AsyncResultPtr& result) const { (_callback.get()->*_completed)(result); } virtual CallbackBasePtr verify(const ::Ice::LocalObjectPtr&) { return this; // Nothing to do, the cookie is not type-safe. } virtual void sent(const ::Ice::AsyncResultPtr& result) const { if(_sent) { (_callback.get()->*_sent)(result); } } virtual bool hasSentCallback() const { return _sent != 0; } private: TPtr _callback; Callback _completed; Callback _sent; }; class CallbackCompletion : virtual public OutgoingAsyncCompletionCallback { public: CallbackCompletion(const CallbackBasePtr& cb, const Ice::LocalObjectPtr& cookie) : _callback(cb) { if(!_callback) { throw IceUtil::IllegalArgumentException(__FILE__, __LINE__); } const_cast(_callback) = _callback->verify(cookie); } virtual bool handleSent(bool, bool alreadySent) { return _callback && _callback->hasSentCallback() && !alreadySent; } virtual bool handleException(const Ice::Exception&) { return _callback; } virtual bool handleResponse(bool) { return _callback; } virtual void handleInvokeSent(bool, OutgoingAsyncBase* outAsync) const { _callback->sent(outAsync); } virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase* outAsync) const { _callback->completed(outAsync); } virtual void handleInvokeResponse(bool, OutgoingAsyncBase* outAsync) const { _callback->completed(outAsync); } private: const CallbackBasePtr _callback; }; class ICE_API CallbackOutgoing : public OutgoingAsync, public CallbackCompletion { public: CallbackOutgoing(const Ice::ObjectPrx& proxy, const std::string& operation, const CallbackBasePtr& cb, const Ice::LocalObjectPtr& cookie) : OutgoingAsync(proxy), CallbackCompletion(cb, cookie), _operation(operation) { _cookie = cookie; } virtual const std::string& getOperation() const { return _operation; } private: const std::string& _operation; }; #endif } #ifndef ICE_CPP11_MAPPING namespace Ice { typedef IceUtil::Handle< ::IceInternal::GenericCallbackBase> CallbackPtr; template CallbackPtr newCallback(const IceUtil::Handle& instance, void (T::*cb)(const AsyncResultPtr&), void (T::*sentcb)(const AsyncResultPtr&) = 0) { return new ::IceInternal::AsyncCallback(instance, cb, sentcb); } template CallbackPtr newCallback(T* instance, void (T::*cb)(const AsyncResultPtr&), void (T::*sentcb)(const AsyncResultPtr&) = 0) { return new ::IceInternal::AsyncCallback(instance, cb, sentcb); } } // // Operation callbacks are specified in Proxy.h // #endif #endif