summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/BatchRequestQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/BatchRequestQueue.cpp')
-rw-r--r--cpp/src/Ice/BatchRequestQueue.cpp50
1 files changed, 39 insertions, 11 deletions
diff --git a/cpp/src/Ice/BatchRequestQueue.cpp b/cpp/src/Ice/BatchRequestQueue.cpp
index f00109eade9..0a8d1927da1 100644
--- a/cpp/src/Ice/BatchRequestQueue.cpp
+++ b/cpp/src/Ice/BatchRequestQueue.cpp
@@ -10,8 +10,10 @@
#include <Ice/BatchRequestQueue.h>
#include <Ice/Instance.h>
#include <Ice/Properties.h>
+#include <Ice/Reference.h>
using namespace std;
+using namespace Ice;
using namespace IceInternal;
IceUtil::Shared* IceInternal::upCast(BatchRequestQueue* p) { return p; }
@@ -25,7 +27,7 @@ class BatchRequestI : public Ice::BatchRequest
{
public:
- BatchRequestI(BatchRequestQueue& queue, const Ice::ObjectPrx& proxy, const string& operation, int size) :
+ BatchRequestI(BatchRequestQueue& queue, const Ice::ObjectPrxPtr& proxy, const string& operation, int size) :
_queue(queue), _proxy(proxy), _operation(operation), _size(size)
{
}
@@ -33,7 +35,7 @@ public:
virtual void
enqueue() const
{
- _queue.enqueueBatchRequest();
+ _queue.enqueueBatchRequest(_proxy);
}
virtual int
@@ -48,7 +50,7 @@ public:
return _operation;
}
- virtual const Ice::ObjectPrx&
+ virtual const Ice::ObjectPrxPtr&
getProxy() const
{
return _proxy;
@@ -57,7 +59,7 @@ public:
private:
BatchRequestQueue& _queue;
- const Ice::ObjectPrx& _proxy;
+ const Ice::ObjectPrxPtr& _proxy;
const std::string& _operation;
const int _size;
};
@@ -69,6 +71,7 @@ BatchRequestQueue::BatchRequestQueue(const InstancePtr& instance, bool datagram)
_batchStream(instance.get(), Ice::currentProtocolEncoding),
_batchStreamInUse(false),
_batchStreamCanFlush(false),
+ _batchCompress(false),
_batchRequestNum(0)
{
_batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
@@ -87,21 +90,22 @@ BatchRequestQueue::BatchRequestQueue(const InstancePtr& instance, bool datagram)
}
void
-BatchRequestQueue::prepareBatchRequest(BasicStream* os)
+BatchRequestQueue::prepareBatchRequest(OutputStream* os)
{
Lock sync(*this);
- if(_exception.get())
+ if(_exception)
{
_exception->ice_throw();
}
-
waitStreamInUse(false);
_batchStreamInUse = true;
_batchStream.swap(*os);
}
void
-BatchRequestQueue::finishBatchRequest(BasicStream* os, const Ice::ObjectPrx& proxy, const std::string& operation)
+BatchRequestQueue::finishBatchRequest(OutputStream* os,
+ const Ice::ObjectPrxPtr& proxy,
+ const std::string& operation)
{
//
// No need for synchronization, no other threads are supposed
@@ -116,17 +120,30 @@ BatchRequestQueue::finishBatchRequest(BasicStream* os, const Ice::ObjectPrx& pro
if(_maxSize > 0 && _batchStream.b.size() >= _maxSize)
{
+#ifdef ICE_CPP11_MAPPING
+ proxy->ice_flushBatchRequestsAsync();
+#else
proxy->begin_ice_flushBatchRequests();
+#endif
}
assert(_batchMarker < _batchStream.b.size());
if(_interceptor)
{
BatchRequestI request(*this, proxy, operation, static_cast<int>(_batchStream.b.size() - _batchMarker));
+#ifdef ICE_CPP11_MAPPING
+ _interceptor(request, _batchRequestNum, static_cast<int>(_batchMarker));
+#else
_interceptor->enqueue(request, _batchRequestNum, static_cast<int>(_batchMarker));
+#endif
}
else
{
+ bool compress;
+ if(proxy->_getReference()->getCompressOverride(compress))
+ {
+ _batchCompress |= compress;
+ }
_batchMarker = _batchStream.b.size();
++_batchRequestNum;
}
@@ -149,7 +166,7 @@ BatchRequestQueue::finishBatchRequest(BasicStream* os, const Ice::ObjectPrx& pro
}
void
-BatchRequestQueue::abortBatchRequest(BasicStream* os)
+BatchRequestQueue::abortBatchRequest(OutputStream* os)
{
Lock sync(*this);
if(_batchStreamInUse)
@@ -162,7 +179,7 @@ BatchRequestQueue::abortBatchRequest(BasicStream* os)
}
int
-BatchRequestQueue::swap(BasicStream* os)
+BatchRequestQueue::swap(OutputStream* os, bool& compress)
{
Lock sync(*this);
if(_batchRequestNum == 0)
@@ -181,11 +198,13 @@ BatchRequestQueue::swap(BasicStream* os)
int requestNum = _batchRequestNum;
_batchStream.swap(*os);
+ compress = _batchCompress;
//
// Reset the batch.
//
_batchRequestNum = 0;
+ _batchCompress = false;
_batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
_batchMarker = _batchStream.b.size();
if(!lastRequest.empty())
@@ -199,7 +218,11 @@ void
BatchRequestQueue::destroy(const Ice::LocalException& ex)
{
Lock sync(*this);
+#ifdef ICE_CPP11_MAPPING
+ _exception = ex.ice_clone();
+#else
_exception.reset(ex.ice_clone());
+#endif
}
bool
@@ -219,9 +242,14 @@ BatchRequestQueue::waitStreamInUse(bool flush)
}
void
-BatchRequestQueue::enqueueBatchRequest()
+BatchRequestQueue::enqueueBatchRequest(const Ice::ObjectPrxPtr& proxy)
{
assert(_batchMarker < _batchStream.b.size());
+ bool compress;
+ if(proxy->_getReference()->getCompressOverride(compress))
+ {
+ _batchCompress |= compress;
+ }
_batchMarker = _batchStream.b.size();
++_batchRequestNum;
}