summaryrefslogtreecommitdiff
path: root/cpp/include
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2015-03-10 12:12:10 +0100
committerBenoit Foucher <benoit@zeroc.com>2015-03-10 12:12:10 +0100
commitc6ca68d97aa5bbc2a172e3e35171b5452657fa22 (patch)
tree46edcca4c8e313285a205bf6fad7c56c452c0cc0 /cpp/include
parentMinor JS style fixes (diff)
downloadice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.tar.bz2
ice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.tar.xz
ice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.zip
ICE-6170 - fixed behavior of batch requests
Diffstat (limited to 'cpp/include')
-rw-r--r--cpp/include/Ice/.headers2
-rw-r--r--cpp/include/Ice/BatchRequestInterceptor.h48
-rw-r--r--cpp/include/Ice/BatchRequestQueueF.h25
-rw-r--r--cpp/include/Ice/Initialize.h6
-rw-r--r--cpp/include/Ice/Outgoing.h104
-rw-r--r--cpp/include/Ice/OutgoingAsync.h63
-rw-r--r--cpp/include/Ice/OutgoingAsyncF.h6
-rw-r--r--cpp/include/Ice/Proxy.h6
8 files changed, 183 insertions, 77 deletions
diff --git a/cpp/include/Ice/.headers b/cpp/include/Ice/.headers
index 5cad480ba31..47a6188f281 100644
--- a/cpp/include/Ice/.headers
+++ b/cpp/include/Ice/.headers
@@ -4,6 +4,8 @@ SDK_HEADERS = \
$(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\AsyncResult.h \
$(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\AsyncResultF.h \
$(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\BasicStream.h \
+ $(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\BatchRequestInterceptor.h \
+ $(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\BatchRequestQueueF.h \
$(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\Buffer.h \
$(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\BuiltinSequences.h \
$(SDK_INCLUDE_PATH)\$(INCLUDE_DIR)\Communicator.h \
diff --git a/cpp/include/Ice/BatchRequestInterceptor.h b/cpp/include/Ice/BatchRequestInterceptor.h
new file mode 100644
index 00000000000..4c05546cdf0
--- /dev/null
+++ b/cpp/include/Ice/BatchRequestInterceptor.h
@@ -0,0 +1,48 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2015 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#ifndef ICE_BATCH_REQUEST_INTERCEPTOR_H
+#define ICE_BATCH_REQUEST_INTERCEPTOR_H
+
+#include <IceUtil/Shared.h>
+
+#include <Ice/ProxyF.h>
+#ifdef ICE_CPP11
+# include <functional>
+#endif
+
+namespace Ice
+{
+
+class BatchRequest
+{
+public:
+
+ virtual void enqueue() const = 0;
+ virtual int getSize() const = 0;
+ virtual const std::string& getOperation() const = 0;
+ virtual const Ice::ObjectPrx& getProxy() const = 0;
+};
+
+class BatchRequestInterceptor : public IceUtil::Shared
+{
+public:
+
+ virtual void enqueue(const BatchRequest&, int, int) = 0;
+};
+typedef IceUtil::Handle<BatchRequestInterceptor> BatchRequestInterceptorPtr;
+
+#ifdef ICE_CPP11
+ICE_API BatchRequestInterceptorPtr
+newBatchRequestInterceptor(const ::std::function<void (const BatchRequest&, int, int)>&);
+#endif
+
+};
+
+#endif
diff --git a/cpp/include/Ice/BatchRequestQueueF.h b/cpp/include/Ice/BatchRequestQueueF.h
new file mode 100644
index 00000000000..3d33a0e0dc8
--- /dev/null
+++ b/cpp/include/Ice/BatchRequestQueueF.h
@@ -0,0 +1,25 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2015 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#ifndef ICE_BATCH_REQUEST_QUEUE_F_H
+#define ICE_BATCH_REQUEST_QUEUE_F_H
+
+#include <IceUtil/Shared.h>
+#include <Ice/Handle.h>
+
+namespace IceInternal
+{
+
+class BatchRequestQueue;
+ICE_API IceUtil::Shared* upCast(BatchRequestQueue*);
+typedef IceInternal::Handle<BatchRequestQueue> BatchRequestQueuePtr;
+
+}
+
+#endif
diff --git a/cpp/include/Ice/Initialize.h b/cpp/include/Ice/Initialize.h
index e90d98803d2..8cbeeed2c91 100644
--- a/cpp/include/Ice/Initialize.h
+++ b/cpp/include/Ice/Initialize.h
@@ -22,6 +22,7 @@
#include <Ice/BuiltinSequences.h>
#include <Ice/Version.h>
#include <Ice/Plugin.h>
+#include <Ice/BatchRequestInterceptor.h>
namespace Ice
{
@@ -86,6 +87,7 @@ struct InitializationData
ThreadNotificationPtr threadHook;
DispatcherPtr dispatcher;
CompactIdResolverPtr compactIdResolver;
+ BatchRequestInterceptorPtr batchRequestInterceptor;
};
ICE_API CommunicatorPtr initialize(int&, char*[], const InitializationData& = InitializationData(),
@@ -94,7 +96,7 @@ ICE_API CommunicatorPtr initialize(int&, char*[], const InitializationData& = In
ICE_API CommunicatorPtr initialize(Ice::StringSeq&, const InitializationData& = InitializationData(),
Int = ICE_INT_VERSION);
-ICE_API CommunicatorPtr initialize(const InitializationData& = InitializationData(),
+ICE_API CommunicatorPtr initialize(const InitializationData& = InitializationData(),
Int = ICE_INT_VERSION);
@@ -111,7 +113,7 @@ ICE_API InputStreamPtr createInputStream(const CommunicatorPtr&,
ICE_API InputStreamPtr wrapInputStream(const CommunicatorPtr&,
const ::std::pair< const Ice::Byte*, const Ice::Byte*>&);
ICE_API InputStreamPtr wrapInputStream(const CommunicatorPtr&,
- const ::std::pair< const Ice::Byte*, const Ice::Byte*>&,
+ const ::std::pair< const Ice::Byte*, const Ice::Byte*>&,
const EncodingVersion&);
ICE_API OutputStreamPtr createOutputStream(const CommunicatorPtr&);
diff --git a/cpp/include/Ice/Outgoing.h b/cpp/include/Ice/Outgoing.h
index fb469a2ca6c..62aea1f7c63 100644
--- a/cpp/include/Ice/Outgoing.h
+++ b/cpp/include/Ice/Outgoing.h
@@ -40,12 +40,10 @@ class ICE_API OutgoingBase : private IceUtil::noncopyable
public:
virtual ~OutgoingBase() { }
-
- virtual bool send(const Ice::ConnectionIPtr&, bool, bool) = 0;
- virtual void invokeCollocated(CollocatedRequestHandler*) = 0;
virtual void sent() = 0;
virtual void completed(const Ice::Exception&) = 0;
+ virtual void completed(BasicStream&) = 0;
virtual void retryException(const Ice::Exception&) = 0;
BasicStream* os() { return &_os; }
@@ -64,7 +62,7 @@ public:
protected:
- OutgoingBase(Instance*, const std::string&);
+ OutgoingBase(Instance*);
BasicStream _os;
IceUtil::UniquePtr<Ice::Exception> _exception;
@@ -75,23 +73,60 @@ protected:
IceUtil::Monitor<IceUtil::Mutex> _monitor;
};
-class ICE_API Outgoing : public OutgoingBase
+class ICE_API ProxyOutgoingBase : public OutgoingBase
{
public:
- Outgoing(IceProxy::Ice::Object*, const std::string&, Ice::OperationMode, const Ice::Context*);
- ~Outgoing();
+ ProxyOutgoingBase(IceProxy::Ice::Object*, Ice::OperationMode);
+ ~ProxyOutgoingBase();
- virtual bool send(const Ice::ConnectionIPtr&, bool, bool);
- virtual void invokeCollocated(CollocatedRequestHandler*);
+ virtual bool invokeRemote(const Ice::ConnectionIPtr&, bool, bool) = 0;
+ virtual void invokeCollocated(CollocatedRequestHandler*) = 0;
virtual void sent();
virtual void completed(const Ice::Exception&);
+ virtual void completed(BasicStream&);
virtual void retryException(const Ice::Exception&);
+protected:
+
+ bool invokeImpl(); // Returns true if ok, false if user exception.
+
+ //
+ // Optimization. The request handler and the reference may not be
+ // deleted while a stack-allocated Outgoing still holds it.
+ //
+ IceProxy::Ice::Object* _proxy;
+ Ice::OperationMode _mode;
+ RequestHandlerPtr _handler;
+ IceUtil::Time _invocationTimeoutDeadline;
+
+ enum
+ {
+ StateUnsent,
+ StateInProgress,
+ StateRetry,
+ StateOK,
+ StateUserException,
+ StateLocalException,
+ StateFailed
+ } _state;
+};
+
+class ICE_API Outgoing : public ProxyOutgoingBase
+{
+public:
+
+ Outgoing(IceProxy::Ice::Object*, const std::string&, Ice::OperationMode, const Ice::Context*);
+ ~Outgoing();
+
+ virtual bool invokeRemote(const Ice::ConnectionIPtr&, bool, bool);
+ virtual void invokeCollocated(CollocatedRequestHandler*);
+
bool invoke(); // Returns true if ok, false if user exception.
void abort(const Ice::LocalException&);
- void completed(BasicStream&);
+
+ virtual void completed(BasicStream&);
// Inlined for speed optimization.
BasicStream* startReadParams()
@@ -137,7 +172,7 @@ public:
}
}
- bool hasResponse()
+ bool hasResponse()
{
return !_is.b.empty();
}
@@ -146,49 +181,42 @@ public:
private:
- //
- // Optimization. The request handler and the reference may not be
- // deleted while a stack-allocated Outgoing still holds it.
- //
- IceProxy::Ice::Object* _proxy;
- Ice::OperationMode _mode;
- RequestHandlerPtr _handler;
- IceUtil::Time _invocationTimeoutDeadline;
-
- enum
- {
- StateUnsent,
- StateInProgress,
- StateRetry,
- StateOK,
- StateUserException,
- StateLocalException,
- StateFailed
- } _state;
-
Ice::EncodingVersion _encoding;
BasicStream _is;
+ const std::string& _operation;
};
-class FlushBatch : public OutgoingBase
+class ProxyFlushBatch : public ProxyOutgoingBase
{
public:
- FlushBatch(IceProxy::Ice::Object*, const std::string&);
- FlushBatch(Ice::ConnectionI*, Instance*, const std::string&);
-
- void invoke();
+ ProxyFlushBatch(IceProxy::Ice::Object*, const std::string&);
- virtual bool send(const Ice::ConnectionIPtr&, bool, bool);
+ virtual bool invokeRemote(const Ice::ConnectionIPtr&, bool, bool);
virtual void invokeCollocated(CollocatedRequestHandler*);
+ void invoke();
+
+private:
+
+ int _batchRequestNum;
+};
+
+class ConnectionFlushBatch : public OutgoingBase
+{
+public:
+
+ ConnectionFlushBatch(Ice::ConnectionI*, Instance*, const std::string&);
+
+ void invoke();
+
virtual void sent();
virtual void completed(const Ice::Exception&);
+ virtual void completed(BasicStream&);
virtual void retryException(const Ice::Exception&);
private:
- IceProxy::Ice::Object* _proxy;
Ice::ConnectionI* _connection;
};
diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h
index ed1e72f0521..440a39138fb 100644
--- a/cpp/include/Ice/OutgoingAsync.h
+++ b/cpp/include/Ice/OutgoingAsync.h
@@ -32,16 +32,9 @@ class ICE_API OutgoingAsyncBase : public Ice::AsyncResult
{
public:
- //
- // Those methods must be overriden if the invocation is sent
- // through a request handler.
- //
- virtual AsyncStatus send(const Ice::ConnectionIPtr&, bool, bool) { assert(false); return AsyncStatusQueued; }
- virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*) { assert(false); return AsyncStatusQueued; }
-
virtual bool sent();
virtual bool completed(const Ice::Exception&);
- virtual void retryException(const Ice::Exception&);
+ virtual bool completed();
// Those methods are public when called from an OutgoingAsyncBase reference.
using Ice::AsyncResult::cancelable;
@@ -55,7 +48,7 @@ public:
const Ice::Int size = static_cast<Ice::Int>(_os.b.size() - headerSize - 4);
_childObserver.attach(getObserver().getRemoteObserver(c, endpt, requestId, size));
}
-
+
void attachCollocatedObserver(const Ice::ObjectAdapterPtr& adapter, Ice::Int requestId)
{
const Ice::Int size = static_cast<Ice::Int>(_os.b.size() - headerSize - 4);
@@ -67,9 +60,11 @@ public:
return &_os;
}
+ virtual BasicStream* getIs();
+
protected:
- OutgoingAsyncBase(const Ice::CommunicatorPtr&, const InstancePtr&, const std::string&, const CallbackBasePtr&,
+ OutgoingAsyncBase(const Ice::CommunicatorPtr&, const InstancePtr&, const std::string&, const CallbackBasePtr&,
const Ice::LocalObjectPtr&);
bool sent(bool);
@@ -90,11 +85,14 @@ class ICE_API ProxyOutgoingAsyncBase : public OutgoingAsyncBase, protected IceUt
{
public:
+ virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool) = 0;
+ virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*) = 0;
+
virtual Ice::ObjectPrx getProxy() const;
using OutgoingAsyncBase::sent;
virtual bool completed(const Ice::Exception&);
- virtual void retryException(const Ice::Exception&);
+ void retryException(const Ice::Exception&);
virtual void cancelable(const CancellationHandlerPtr&);
void retry();
@@ -102,7 +100,7 @@ public:
protected:
- ProxyOutgoingAsyncBase(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&,
+ ProxyOutgoingAsyncBase(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&,
const Ice::LocalObjectPtr&);
void invokeImpl(bool);
@@ -111,8 +109,7 @@ protected:
bool finished(const Ice::Exception&);
bool finished(bool);
- virtual void handleRetryException(const Ice::Exception&);
- virtual int handleException(const Ice::Exception&);
+ int handleException(const Ice::Exception&);
virtual void runTimerTask();
const Ice::ObjectPrx _proxy;
@@ -138,14 +135,15 @@ public:
virtual bool sent();
- virtual AsyncStatus send(const Ice::ConnectionIPtr&, bool, bool);
+ virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool);
virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*);
+ using ProxyOutgoingAsyncBase::completed;
+ virtual bool completed();
+
void abort(const Ice::Exception&);
void invoke();
- using ProxyOutgoingAsyncBase::completed;
- bool completed();
BasicStream* startWriteParams(Ice::FormatType format)
{
@@ -172,7 +170,7 @@ public:
}
}
- BasicStream* getIs()
+ virtual BasicStream* getIs()
{
return &_is;
}
@@ -185,23 +183,22 @@ private:
//
// Class for handling the proxy's begin_ice_flushBatchRequest request.
//
-class ICE_API ProxyFlushBatch : public ProxyOutgoingAsyncBase
+class ICE_API ProxyFlushBatchAsync : public ProxyOutgoingAsyncBase
{
public:
- ProxyFlushBatch(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&);
+ ProxyFlushBatchAsync(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&);
- virtual AsyncStatus send(const Ice::ConnectionIPtr&, bool, bool);
+ virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool);
virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*);
void invoke();
private:
- virtual void handleRetryException(const Ice::Exception&);
- virtual int handleException(const Ice::Exception&);
+ int _batchRequestNum;
};
-typedef IceUtil::Handle<ProxyFlushBatch> ProxyFlushBatchPtr;
+typedef IceUtil::Handle<ProxyFlushBatchAsync> ProxyFlushBatchAsyncPtr;
//
// Class for handling the proxy's begin_ice_getConnection request.
@@ -212,7 +209,7 @@ public:
ProxyGetConnection(const Ice::ObjectPrx&, const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&);
- virtual AsyncStatus send(const Ice::ConnectionIPtr&, bool, bool);
+ virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool);
virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*);
void invoke();
@@ -222,13 +219,13 @@ typedef IceUtil::Handle<ProxyGetConnection> ProxyGetConnectionPtr;
//
// Class for handling Ice::Connection::begin_flushBatchRequests
//
-class ICE_API ConnectionFlushBatch : public OutgoingAsyncBase
+class ICE_API ConnectionFlushBatchAsync : public OutgoingAsyncBase
{
public:
- ConnectionFlushBatch(const Ice::ConnectionIPtr&, const Ice::CommunicatorPtr&, const InstancePtr&,
- const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&);
-
+ ConnectionFlushBatchAsync(const Ice::ConnectionIPtr&, const Ice::CommunicatorPtr&, const InstancePtr&,
+ const std::string&, const CallbackBasePtr&, const Ice::LocalObjectPtr&);
+
virtual Ice::ConnectionPtr getConnection() const;
void invoke();
@@ -237,17 +234,17 @@ private:
const Ice::ConnectionIPtr _connection;
};
-typedef IceUtil::Handle<ConnectionFlushBatch> ConnectionFlushBatchPtr;
+typedef IceUtil::Handle<ConnectionFlushBatchAsync> ConnectionFlushBatchAsyncPtr;
//
// Class for handling Ice::Communicator::begin_flushBatchRequests
//
-class ICE_API CommunicatorFlushBatch : public Ice::AsyncResult
+class ICE_API CommunicatorFlushBatchAsync : public Ice::AsyncResult
{
public:
- CommunicatorFlushBatch(const Ice::CommunicatorPtr&, const InstancePtr&, const std::string&,
- const CallbackBasePtr&, const Ice::LocalObjectPtr&);
+ CommunicatorFlushBatchAsync(const Ice::CommunicatorPtr&, const InstancePtr&, const std::string&,
+ const CallbackBasePtr&, const Ice::LocalObjectPtr&);
void flushConnection(const Ice::ConnectionIPtr&);
void ready();
diff --git a/cpp/include/Ice/OutgoingAsyncF.h b/cpp/include/Ice/OutgoingAsyncF.h
index ef6b075b082..c889ba32c4c 100644
--- a/cpp/include/Ice/OutgoingAsyncF.h
+++ b/cpp/include/Ice/OutgoingAsyncF.h
@@ -29,9 +29,9 @@ class ProxyOutgoingAsyncBase;
ICE_API IceUtil::Shared* upCast(ProxyOutgoingAsyncBase*);
typedef IceInternal::Handle<ProxyOutgoingAsyncBase> ProxyOutgoingAsyncBasePtr;
-class CommunicatorFlushBatch;
-ICE_API IceUtil::Shared* upCast(CommunicatorFlushBatch*);
-typedef IceInternal::Handle<CommunicatorFlushBatch> CommunicatorFlushBatchPtr;
+class CommunicatorFlushBatchAsync;
+ICE_API IceUtil::Shared* upCast(CommunicatorFlushBatchAsync*);
+typedef IceInternal::Handle<CommunicatorFlushBatchAsync> CommunicatorFlushBatchAsyncPtr;
}
diff --git a/cpp/include/Ice/Proxy.h b/cpp/include/Ice/Proxy.h
index c0257e54a60..0d36364fcc9 100644
--- a/cpp/include/Ice/Proxy.h
+++ b/cpp/include/Ice/Proxy.h
@@ -21,6 +21,7 @@
#include <Ice/ObjectF.h>
#include <Ice/ObjectAdapterF.h>
#include <Ice/ReferenceF.h>
+#include <Ice/BatchRequestQueueF.h>
#include <Ice/AsyncResult.h>
//#include <Ice/RouterF.h> // Can't include RouterF.h here, otherwise we have cyclic includes
//#include <Ice/LocatorF.h> // Can't include RouterF.h here, otherwise we have cyclic includes
@@ -844,7 +845,9 @@ public:
void __end(const ::Ice::AsyncResultPtr&, const std::string&) const;
::IceInternal::RequestHandlerPtr __getRequestHandler();
- void __setRequestHandler(const ::IceInternal::RequestHandlerPtr&, const ::IceInternal::RequestHandlerPtr&);
+ ::IceInternal::BatchRequestQueuePtr __getBatchRequestQueue();
+ ::IceInternal::RequestHandlerPtr __setRequestHandler(const ::IceInternal::RequestHandlerPtr&);
+ void __updateRequestHandler(const ::IceInternal::RequestHandlerPtr&, const ::IceInternal::RequestHandlerPtr&);
protected:
@@ -977,6 +980,7 @@ private:
::IceInternal::ReferencePtr _reference;
::IceInternal::RequestHandlerPtr _requestHandler;
+ ::IceInternal::BatchRequestQueuePtr _batchRequestQueue;
IceUtil::Mutex _mutex;
};