diff options
Diffstat (limited to 'cpp/include')
-rw-r--r-- | cpp/include/Ice/.headers | 2 | ||||
-rw-r--r-- | cpp/include/Ice/BatchRequestInterceptor.h | 48 | ||||
-rw-r--r-- | cpp/include/Ice/BatchRequestQueueF.h | 25 | ||||
-rw-r--r-- | cpp/include/Ice/Initialize.h | 6 | ||||
-rw-r--r-- | cpp/include/Ice/Outgoing.h | 104 | ||||
-rw-r--r-- | cpp/include/Ice/OutgoingAsync.h | 63 | ||||
-rw-r--r-- | cpp/include/Ice/OutgoingAsyncF.h | 6 | ||||
-rw-r--r-- | cpp/include/Ice/Proxy.h | 6 |
8 files changed, 183 insertions, 77 deletions
diff --git a/cpp/include/Ice/.headers b/cpp/include/Ice/.headers index 5cad480ba31..47a6188f281 100644 --- a/cpp/include/Ice/.headers +++ b/cpp/include/Ice/.headers @@ -4,6 +4,8 @@ SDK_HEADERS = \ $(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\AsyncResult.h \ $(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\AsyncResultF.h \ $(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\BasicStream.h \ + $(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\BatchRequestInterceptor.h \ + $(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\BatchRequestQueueF.h \ $(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\Buffer.h \ $(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\BuiltinSequences.h \ $(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\Communicator.h \ diff --git a/cpp/include/Ice/BatchRequestInterceptor.h b/cpp/include/Ice/BatchRequestInterceptor.h new file mode 100644 index 00000000000..4c05546cdf0 --- /dev/null +++ b/cpp/include/Ice/BatchRequestInterceptor.h @@ -0,0 +1,48 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2015 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_BATCH_REQUEST_INTERCEPTOR_H +#define ICE_BATCH_REQUEST_INTERCEPTOR_H + +#include <IceUtil/Shared.h> + +#include <Ice/ProxyF.h> +#ifdef ICE_CPP11 +# include <functional> +#endif + +namespace Ice +{ + +class BatchRequest +{ +public: + + virtual void enqueue() const = 0; + virtual int getSize() const = 0; + virtual const std::string& getOperation() const = 0; + virtual const Ice::ObjectPrx& getProxy() const = 0; +}; + +class BatchRequestInterceptor : public IceUtil::Shared +{ +public: + + virtual void enqueue(const BatchRequest&, int, int) = 0; +}; +typedef IceUtil::Handle<BatchRequestInterceptor> BatchRequestInterceptorPtr; + +#ifdef ICE_CPP11 +ICE_API BatchRequestInterceptorPtr +newBatchRequestInterceptor(const ::std::function<void (const BatchRequest&, int, int)>&); +#endif + +}; + +#endif diff --git a/cpp/include/Ice/BatchRequestQueueF.h b/cpp/include/Ice/BatchRequestQueueF.h new file mode 100644 index 00000000000..3d33a0e0dc8 --- /dev/null +++ b/cpp/include/Ice/BatchRequestQueueF.h @@ -0,0 +1,25 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2015 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_BATCH_REQUEST_QUEUE_F_H +#define ICE_BATCH_REQUEST_QUEUE_F_H + +#include <IceUtil/Shared.h> +#include <Ice/Handle.h> + +namespace IceInternal +{ + +class BatchRequestQueue; +ICE_API IceUtil::Shared* upCast(BatchRequestQueue*); +typedef IceInternal::Handle<BatchRequestQueue> BatchRequestQueuePtr; + +} + +#endif diff --git a/cpp/include/Ice/Initialize.h b/cpp/include/Ice/Initialize.h index e90d98803d2..8cbeeed2c91 100644 --- a/cpp/include/Ice/Initialize.h +++ b/cpp/include/Ice/Initialize.h @@ -22,6 +22,7 @@ #include <Ice/BuiltinSequences.h> #include <Ice/Version.h> #include <Ice/Plugin.h> +#include <Ice/BatchRequestInterceptor.h> namespace Ice { @@ -86,6 +87,7 @@ struct InitializationData ThreadNotificationPtr threadHook; DispatcherPtr dispatcher; CompactIdResolverPtr compactIdResolver; + BatchRequestInterceptorPtr batchRequestInterceptor; }; ICE_API CommunicatorPtr initialize(int&, char*[], const InitializationData& = InitializationData(), @@ -94,7 +96,7 @@ ICE_API CommunicatorPtr initialize(int&, char*[], const InitializationData& = In ICE_API CommunicatorPtr initialize(Ice::StringSeq&, const InitializationData& = InitializationData(), Int = ICE_INT_VERSION); -ICE_API CommunicatorPtr initialize(const InitializationData& = InitializationData(), +ICE_API CommunicatorPtr initialize(const InitializationData& = InitializationData(), Int = ICE_INT_VERSION); @@ -111,7 +113,7 @@ ICE_API InputStreamPtr createInputStream(const CommunicatorPtr&, ICE_API InputStreamPtr wrapInputStream(const CommunicatorPtr&, const ::std::pair< const Ice::Byte*, const Ice::Byte*>&); ICE_API InputStreamPtr wrapInputStream(const CommunicatorPtr&, - const ::std::pair< const Ice::Byte*, const Ice::Byte*>&, + const ::std::pair< const Ice::Byte*, const Ice::Byte*>&, const EncodingVersion&); ICE_API OutputStreamPtr createOutputStream(const CommunicatorPtr&); diff --git a/cpp/include/Ice/Outgoing.h b/cpp/include/Ice/Outgoing.h index fb469a2ca6c..62aea1f7c63 100644 --- a/cpp/include/Ice/Outgoing.h +++ b/cpp/include/Ice/Outgoing.h @@ -40,12 +40,10 @@ class ICE_API OutgoingBase : private IceUtil::noncopyable public: virtual ~OutgoingBase() { } - - virtual bool send(const Ice::ConnectionIPtr&, bool, bool) = 0; - virtual void invokeCollocated(CollocatedRequestHandler*) = 0; virtual void sent() = 0; virtual void completed(const Ice::Exception&) = 0; + virtual void completed(BasicStream&) = 0; virtual void retryException(const Ice::Exception&) = 0; BasicStream* os() { return &_os; } @@ -64,7 +62,7 @@ public: protected: - OutgoingBase(Instance*, const std::string&); + OutgoingBase(Instance*); BasicStream _os; IceUtil::UniquePtr<Ice::Exception> _exception; @@ -75,23 +73,60 @@ protected: IceUtil::Monitor<IceUtil::Mutex> _monitor; }; -class ICE_API Outgoing : public OutgoingBase +class ICE_API ProxyOutgoingBase : public OutgoingBase { public: - Outgoing(IceProxy::Ice::Object*, const std::string&, Ice::OperationMode, const Ice::Context*); - ~Outgoing(); + ProxyOutgoingBase(IceProxy::Ice::Object*, Ice::OperationMode); + ~ProxyOutgoingBase(); - virtual bool send(const Ice::ConnectionIPtr&, bool, bool); - virtual void invokeCollocated(CollocatedRequestHandler*); + virtual bool invokeRemote(const Ice::ConnectionIPtr&, bool, bool) = 0; + virtual void invokeCollocated(CollocatedRequestHandler*) = 0; virtual void sent(); virtual void completed(const Ice::Exception&); + virtual void completed(BasicStream&); virtual void retryException(const Ice::Exception&); +protected: + + bool invokeImpl(); // Returns true if ok, false if user exception. + + // + // Optimization. The request handler and the reference may not be + // deleted while a stack-allocated Outgoing still holds it. + // + IceProxy::Ice::Object* _proxy; + Ice::OperationMode _mode; + RequestHandlerPtr _handler; + IceUtil::Time _invocationTimeoutDeadline; + + enum + { + StateUnsent, + StateInProgress, + StateRetry, + StateOK, + StateUserException, + StateLocalException, + StateFailed + } _state; +}; + +class ICE_API Outgoing : public ProxyOutgoingBase +{ +public: + + Outgoing(IceProxy::Ice::Object*, const std::string&, Ice::OperationMode, const Ice::Context*); + ~Outgoing(); + + virtual bool invokeRemote(const Ice::ConnectionIPtr&, bool, bool); + virtual void invokeCollocated(CollocatedRequestHandler*); + bool invoke(); // Returns true if ok, false if user exception. void abort(const Ice::LocalException&); - void completed(BasicStream&); + + virtual void completed(BasicStream&); // Inlined for speed optimization. BasicStream* startReadParams() @@ -137,7 +172,7 @@ public: } } - bool hasResponse() + bool hasResponse() { return !_is.b.empty(); } @@ -146,49 +181,42 @@ public: private: - // - // Optimization. The request handler and the reference may not be - // deleted while a stack-allocated Outgoing still holds it. - // - IceProxy::Ice::Object* _proxy; - Ice::OperationMode _mode; - RequestHandlerPtr _handler; - IceUtil::Time _invocationTimeoutDeadline; - - enum - { - StateUnsent, - StateInProgress, - StateRetry, - StateOK, - StateUserException, - StateLocalException, - StateFailed - } _state; - Ice::EncodingVersion _encoding; BasicStream _is; + const std::string& _operation; }; -class FlushBatch : public OutgoingBase +class ProxyFlushBatch : public ProxyOutgoingBase { public: - FlushBatch(IceProxy::Ice::Object*, const std::string&); - FlushBatch(Ice::ConnectionI*, Instance*, const std::string&); - - void invoke(); + ProxyFlushBatch(IceProxy::Ice::Object*, const std::string&); - virtual bool send(const Ice::ConnectionIPtr&, bool, bool); + virtual bool invokeRemote(const Ice::ConnectionIPtr&, bool, bool); virtual void invokeCollocated(CollocatedRequestHandler*); + void invoke(); + +private: + + int _batchRequestNum; +}; + +class ConnectionFlushBatch : public OutgoingBase +{ +public: + + ConnectionFlushBatch(Ice::ConnectionI*, Instance*, const std::string&); + + void invoke(); + virtual void sent(); virtual void completed(const Ice::Exception&); + virtual void completed(BasicStream&); virtual void retryException(const Ice::Exception&); private: - IceProxy::Ice::Object* _proxy; Ice::ConnectionI* _connection; }; diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h index ed1e72f0521..440a39138fb 100644 --- a/cpp/include/Ice/OutgoingAsync.h +++ b/cpp/include/Ice/OutgoingAsync.h @@ -32,16 +32,9 @@ class ICE_API OutgoingAsyncBase : public Ice::AsyncResult { public: - // - // Those methods must be overriden if the invocation is sent - // through a request handler. - // - virtual AsyncStatus send(const Ice::ConnectionIPtr&, bool, bool) { assert(false); return AsyncStatusQueued; } - virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*) { assert(false); return AsyncStatusQueued; } - virtual bool sent(); virtual bool completed(const Ice::Exception&); - virtual void retryException(const Ice::Exception&); + virtual bool completed(); // Those methods are public when called from an OutgoingAsyncBase reference. using Ice::AsyncResult::cancelable; @@ -55,7 +48,7 @@ public: const Ice::Int size = static_cast<Ice::Int>(_os.b.size() - headerSize - 4); _childObserver.attach(getObserver().getRemoteObserver(c, endpt, requestId, size)); } - + void attachCollocatedObserver(const Ice::ObjectAdapterPtr& adapter, Ice::Int requestId) { const Ice::Int size = static_cast<Ice::Int>(_os.b.size() - headerSize - 4); @@ -67,9 +60,11 @@ public: return &_os; } + virtual BasicStream* getIs(); + protected: - OutgoingAsyncBase(const Ice::CommunicatorPtr&, const InstancePtr&, const std::string&, const CallbackBasePtr&, + OutgoingAsyncBase(const Ice::CommunicatorPtr&, const InstancePtr&, const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&); bool sent(bool); @@ -90,11 +85,14 @@ class ICE_API ProxyOutgoingAsyncBase : public OutgoingAsyncBase, protected IceUt { public: + virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool) = 0; + virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*) = 0; + virtual Ice::ObjectPrx getProxy() const; using OutgoingAsyncBase::sent; virtual bool completed(const Ice::Exception&); - virtual void retryException(const Ice::Exception&); + void retryException(const Ice::Exception&); virtual void cancelable(const CancellationHandlerPtr&); void retry(); @@ -102,7 +100,7 @@ public: protected: - ProxyOutgoingAsyncBase(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, + ProxyOutgoingAsyncBase(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&); void invokeImpl(bool); @@ -111,8 +109,7 @@ protected: bool finished(const Ice::Exception&); bool finished(bool); - virtual void handleRetryException(const Ice::Exception&); - virtual int handleException(const Ice::Exception&); + int handleException(const Ice::Exception&); virtual void runTimerTask(); const Ice::ObjectPrx _proxy; @@ -138,14 +135,15 @@ public: virtual bool sent(); - virtual AsyncStatus send(const Ice::ConnectionIPtr&, bool, bool); + virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool); virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*); + using ProxyOutgoingAsyncBase::completed; + virtual bool completed(); + void abort(const Ice::Exception&); void invoke(); - using ProxyOutgoingAsyncBase::completed; - bool completed(); BasicStream* startWriteParams(Ice::FormatType format) { @@ -172,7 +170,7 @@ public: } } - BasicStream* getIs() + virtual BasicStream* getIs() { return &_is; } @@ -185,23 +183,22 @@ private: // // Class for handling the proxy's begin_ice_flushBatchRequest request. // -class ICE_API ProxyFlushBatch : public ProxyOutgoingAsyncBase +class ICE_API ProxyFlushBatchAsync : public ProxyOutgoingAsyncBase { public: - ProxyFlushBatch(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&); + ProxyFlushBatchAsync(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&); - virtual AsyncStatus send(const Ice::ConnectionIPtr&, bool, bool); + virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool); virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*); void invoke(); private: - virtual void handleRetryException(const Ice::Exception&); - virtual int handleException(const Ice::Exception&); + int _batchRequestNum; }; -typedef IceUtil::Handle<ProxyFlushBatch> ProxyFlushBatchPtr; +typedef IceUtil::Handle<ProxyFlushBatchAsync> ProxyFlushBatchAsyncPtr; // // Class for handling the proxy's begin_ice_getConnection request. @@ -212,7 +209,7 @@ public: ProxyGetConnection(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&); - virtual AsyncStatus send(const Ice::ConnectionIPtr&, bool, bool); + virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool); virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*); void invoke(); @@ -222,13 +219,13 @@ typedef IceUtil::Handle<ProxyGetConnection> ProxyGetConnectionPtr; // // Class for handling Ice::Connection::begin_flushBatchRequests // -class ICE_API ConnectionFlushBatch : public OutgoingAsyncBase +class ICE_API ConnectionFlushBatchAsync : public OutgoingAsyncBase { public: - ConnectionFlushBatch(const Ice::ConnectionIPtr&, const Ice::CommunicatorPtr&, const InstancePtr&, - const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&); - + ConnectionFlushBatchAsync(const Ice::ConnectionIPtr&, const Ice::CommunicatorPtr&, const InstancePtr&, + const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&); + virtual Ice::ConnectionPtr getConnection() const; void invoke(); @@ -237,17 +234,17 @@ private: const Ice::ConnectionIPtr _connection; }; -typedef IceUtil::Handle<ConnectionFlushBatch> ConnectionFlushBatchPtr; +typedef IceUtil::Handle<ConnectionFlushBatchAsync> ConnectionFlushBatchAsyncPtr; // // Class for handling Ice::Communicator::begin_flushBatchRequests // -class ICE_API CommunicatorFlushBatch : public Ice::AsyncResult +class ICE_API CommunicatorFlushBatchAsync : public Ice::AsyncResult { public: - CommunicatorFlushBatch(const Ice::CommunicatorPtr&, const InstancePtr&, const std::string&, - const CallbackBasePtr&, const Ice::LocalObjectPtr&); + CommunicatorFlushBatchAsync(const Ice::CommunicatorPtr&, const InstancePtr&, const std::string&, + const CallbackBasePtr&, const Ice::LocalObjectPtr&); void flushConnection(const Ice::ConnectionIPtr&); void ready(); diff --git a/cpp/include/Ice/OutgoingAsyncF.h b/cpp/include/Ice/OutgoingAsyncF.h index ef6b075b082..c889ba32c4c 100644 --- a/cpp/include/Ice/OutgoingAsyncF.h +++ b/cpp/include/Ice/OutgoingAsyncF.h @@ -29,9 +29,9 @@ class ProxyOutgoingAsyncBase; ICE_API IceUtil::Shared* upCast(ProxyOutgoingAsyncBase*); typedef IceInternal::Handle<ProxyOutgoingAsyncBase> ProxyOutgoingAsyncBasePtr; -class CommunicatorFlushBatch; -ICE_API IceUtil::Shared* upCast(CommunicatorFlushBatch*); -typedef IceInternal::Handle<CommunicatorFlushBatch> CommunicatorFlushBatchPtr; +class CommunicatorFlushBatchAsync; +ICE_API IceUtil::Shared* upCast(CommunicatorFlushBatchAsync*); +typedef IceInternal::Handle<CommunicatorFlushBatchAsync> CommunicatorFlushBatchAsyncPtr; } diff --git a/cpp/include/Ice/Proxy.h b/cpp/include/Ice/Proxy.h index c0257e54a60..0d36364fcc9 100644 --- a/cpp/include/Ice/Proxy.h +++ b/cpp/include/Ice/Proxy.h @@ -21,6 +21,7 @@ #include <Ice/ObjectF.h> #include <Ice/ObjectAdapterF.h> #include <Ice/ReferenceF.h> +#include <Ice/BatchRequestQueueF.h> #include <Ice/AsyncResult.h> //#include <Ice/RouterF.h> // Can't include RouterF.h here, otherwise we have cyclic includes //#include <Ice/LocatorF.h> // Can't include RouterF.h here, otherwise we have cyclic includes @@ -844,7 +845,9 @@ public: void __end(const ::Ice::AsyncResultPtr&, const std::string&) const; ::IceInternal::RequestHandlerPtr __getRequestHandler(); - void __setRequestHandler(const ::IceInternal::RequestHandlerPtr&, const ::IceInternal::RequestHandlerPtr&); + ::IceInternal::BatchRequestQueuePtr __getBatchRequestQueue(); + ::IceInternal::RequestHandlerPtr __setRequestHandler(const ::IceInternal::RequestHandlerPtr&); + void __updateRequestHandler(const ::IceInternal::RequestHandlerPtr&, const ::IceInternal::RequestHandlerPtr&); protected: @@ -977,6 +980,7 @@ private: ::IceInternal::ReferencePtr _reference; ::IceInternal::RequestHandlerPtr _requestHandler; + ::IceInternal::BatchRequestQueuePtr _batchRequestQueue; IceUtil::Mutex _mutex; }; |