summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/CollocatedRequestHandler.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2015-03-10 12:12:10 +0100
committerBenoit Foucher <benoit@zeroc.com>2015-03-10 12:12:10 +0100
commitc6ca68d97aa5bbc2a172e3e35171b5452657fa22 (patch)
tree46edcca4c8e313285a205bf6fad7c56c452c0cc0 /cpp/src/Ice/CollocatedRequestHandler.cpp
parentMinor JS style fixes (diff)
downloadice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.tar.bz2
ice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.tar.xz
ice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.zip
ICE-6170 - fixed behavior of batch requests
Diffstat (limited to 'cpp/src/Ice/CollocatedRequestHandler.cpp')
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp332
1 files changed, 52 insertions, 280 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index 8b2e505d8d7..d2e78fcd4a9 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -29,14 +29,13 @@ namespace
class InvokeAll : public DispatchWorkItem
{
public:
-
+
InvokeAll(OutgoingBase* out,
BasicStream* os,
- CollocatedRequestHandler* handler,
+ CollocatedRequestHandler* handler,
Int requestId,
- Int invokeNum,
- bool batch) :
- _out(out), _os(os), _handler(handler), _requestId(requestId), _invokeNum(invokeNum), _batch(batch)
+ Int batchRequestNum) :
+ _out(out), _os(os), _handler(handler), _requestId(requestId), _batchRequestNum(batchRequestNum)
{
}
@@ -45,31 +44,29 @@ public:
{
if(_handler->sent(_out))
{
- _handler->invokeAll(_os, _requestId, _invokeNum, _batch);
+ _handler->invokeAll(_os, _requestId, _batchRequestNum);
}
}
-
+
private:
-
+
OutgoingBase* _out;
BasicStream* _os;
CollocatedRequestHandlerPtr _handler;
Int _requestId;
- Int _invokeNum;
- bool _batch;
+ Int _batchRequestNum;
};
class InvokeAllAsync : public DispatchWorkItem
{
public:
-
+
InvokeAllAsync(const OutgoingAsyncBasePtr& outAsync,
BasicStream* os,
- CollocatedRequestHandler* handler,
+ CollocatedRequestHandler* handler,
Int requestId,
- Int invokeNum,
- bool batch) :
- _outAsync(outAsync), _os(os), _handler(handler), _requestId(requestId), _invokeNum(invokeNum), _batch(batch)
+ Int batchRequestNum) :
+ _outAsync(outAsync), _os(os), _handler(handler), _requestId(requestId), _batchRequestNum(batchRequestNum)
{
}
@@ -78,46 +75,17 @@ public:
{
if(_handler->sentAsync(_outAsync.get()))
{
- _handler->invokeAll(_os, _requestId, _invokeNum, _batch);
+ _handler->invokeAll(_os, _requestId, _batchRequestNum);
}
}
-
+
private:
-
+
OutgoingAsyncBasePtr _outAsync;
BasicStream* _os;
CollocatedRequestHandlerPtr _handler;
Int _requestId;
- Int _invokeNum;
- bool _batch;
-};
-
-class InvokeBatchRequests : public DispatchWorkItem
-{
-public:
-
- InvokeBatchRequests(const CollocatedRequestHandlerPtr& handler,
- BasicStream& stream,
- Int invokeNum) :
- _handler(handler),
- _stream(stream.instance(), currentProtocolEncoding),
- _invokeNum(invokeNum)
- {
- _stream.swap(stream);
- }
-
- virtual void
- run()
- {
- _handler->invokeAll(&_stream, 0, _invokeNum, true);
- }
-
-private:
-
- const CollocatedRequestHandlerPtr _handler;
- const OutgoingAsyncBasePtr _outAsync;
- BasicStream _stream;
- Int _invokeNum;
+ Int _batchRequestNum;
};
void
@@ -134,16 +102,12 @@ fillInValue(BasicStream* os, int pos, Int value)
}
CollocatedRequestHandler::CollocatedRequestHandler(const ReferencePtr& ref, const ObjectAdapterPtr& adapter) :
- RequestHandler(ref),
- _adapter(ObjectAdapterIPtr::dynamicCast(adapter)),
+ RequestHandler(ref),
+ _adapter(ObjectAdapterIPtr::dynamicCast(adapter)),
_dispatcher(_reference->getInstance()->initializationData().dispatcher),
_logger(_reference->getInstance()->initializationData().logger), // Cached for better performance.
_traceLevels(_reference->getInstance()->traceLevels()), // Cached for better performance.
- _batchAutoFlushSize(ref->getInstance()->batchAutoFlushSize()),
- _requestId(0),
- _batchStreamInUse(false),
- _batchRequestNum(0),
- _batchStream(ref->getInstance().get(), currentProtocolEncoding)
+ _requestId(0)
{
}
@@ -152,124 +116,29 @@ CollocatedRequestHandler::~CollocatedRequestHandler()
}
RequestHandlerPtr
-CollocatedRequestHandler::connect(const Ice::ObjectPrx&)
-{
- return this;
-}
-
-RequestHandlerPtr
CollocatedRequestHandler::update(const RequestHandlerPtr& previousHandler, const RequestHandlerPtr& newHandler)
{
return previousHandler.get() == this ? newHandler : this;
}
-void
-CollocatedRequestHandler::prepareBatchRequest(BasicStream* os)
-{
- Lock sync(*this);
- while(_batchStreamInUse)
- {
- wait();
- }
-
- if(_batchStream.b.empty())
- {
- try
- {
- _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
- }
- catch(const LocalException& ex)
- {
- ex.ice_throw();
- }
- }
-
- _batchStreamInUse = true;
- _batchMarker = _batchStream.b.size();
- _batchStream.swap(*os);
-}
-
-void
-CollocatedRequestHandler::finishBatchRequest(BasicStream* os)
-{
- try
- {
- Lock sync(*this);
- _batchStream.swap(*os);
-
- if(_batchAutoFlushSize > 0 && (_batchStream.b.size() > _batchAutoFlushSize))
- {
- //
- // Temporarily save the last request.
- //
- vector<Byte> lastRequest(_batchStream.b.begin() + _batchMarker, _batchStream.b.end());
- _batchStream.b.resize(_batchMarker);
-
- _adapter->getThreadPool()->dispatch(new InvokeBatchRequests(this, _batchStream, _batchRequestNum));
-
- //
- // Reset the batch.
- //
- BasicStream dummy(_reference->getInstance().get(), currentProtocolEncoding);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchMarker = 0;
-
- //
- // Start a new batch with the last message that caused us to go over the limit.
- //
- _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
- _batchStream.writeBlob(&lastRequest[0], lastRequest.size());
- }
-
- //
- // Increment the number of requests in the batch.
- //
- assert(_batchStreamInUse);
- ++_batchRequestNum;
- _batchStreamInUse = false;
- notifyAll();
- }
- catch(const LocalException&)
- {
- abortBatchRequest();
- throw;
- }
-}
-
-void
-CollocatedRequestHandler::abortBatchRequest()
-{
- Lock sync(*this);
-
- BasicStream dummy(_reference->getInstance().get(), currentProtocolEncoding);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchMarker = 0;
-
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
-}
-
bool
-CollocatedRequestHandler::sendRequest(OutgoingBase* out)
+CollocatedRequestHandler::sendRequest(ProxyOutgoingBase* out)
{
out->invokeCollocated(this);
return !_response && _reference->getInvocationTimeout() == 0;
}
AsyncStatus
-CollocatedRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& outAsync)
+CollocatedRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& outAsync)
{
return outAsync->invokeCollocated(this);
}
-void
+void
CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalException& ex)
{
Lock sync(*this);
-
+
map<OutgoingBase*, Int>::iterator p = _sendRequests.find(out);
if(p != _sendRequests.end())
{
@@ -286,12 +155,12 @@ CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalExceptio
Outgoing* o = dynamic_cast<Outgoing*>(out);
if(o)
{
- for(map<Int, Outgoing*>::iterator q = _requests.begin(); q != _requests.end(); ++q)
+ for(map<Int, OutgoingBase*>::iterator q = _requests.begin(); q != _requests.end(); ++q)
{
if(q->second == o)
{
InvocationTimeoutException ex(__FILE__, __LINE__);
- o->completed(ex);
+ q->second->completed(ex);
_requests.erase(q);
return; // We're done.
}
@@ -322,7 +191,7 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs
OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync);
if(o)
{
- for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
+ for(map<Int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
{
if(q->second.get() == o.get())
{
@@ -338,7 +207,7 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs
}
void
-CollocatedRequestHandler::invokeRequest(Outgoing* out)
+CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum)
{
int requestId = 0;
{
@@ -348,29 +217,31 @@ CollocatedRequestHandler::invokeRequest(Outgoing* out)
requestId = ++_requestId;
_requests.insert(make_pair(requestId, out));
}
+
_sendRequests.insert(make_pair(out, requestId));
- }
+ }
out->attachCollocatedObserver(_adapter, requestId);
if(_reference->getInvocationTimeout() > 0)
{
// Don't invoke from the user thread, invocation timeouts wouldn't work otherwise.
- _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, requestId, 1, false));
+ _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, requestId, batchRequestNum));
}
else if(_dispatcher)
{
- _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, requestId, 1, false));
+ _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, requestId,
+ batchRequestNum));
}
else // Optimization: directly call invokeAll if there's no dispatcher.
{
out->sent();
- invokeAll(out->os(), requestId, 1, false);
+ invokeAll(out->os(), requestId, batchRequestNum);
}
}
AsyncStatus
-CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync)
+CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum)
{
int requestId = 0;
{
@@ -383,125 +254,25 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync)
requestId = ++_requestId;
_asyncRequests.insert(make_pair(requestId, outAsync));
}
+
_sendAsyncRequests.insert(make_pair(outAsync, requestId));
}
outAsync->attachCollocatedObserver(_adapter, requestId);
- _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, requestId, 1, false));
+ _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, requestId,
+ batchRequestNum));
return AsyncStatusQueued;
}
void
-CollocatedRequestHandler::invokeBatchRequests(OutgoingBase* out)
-{
- Int invokeNum;
- {
- Lock sync(*this);
- while(_batchStreamInUse)
- {
- wait();
- }
-
- invokeNum = _batchRequestNum;
-
- if(_batchRequestNum > 0)
- {
- _sendRequests.insert(make_pair(out, 0));
-
- assert(!_batchStream.b.empty());
- _batchStream.swap(*out->os());
-
- //
- // Reset the batch stream.
- //
- BasicStream dummy(_reference->getInstance().get(), currentProtocolEncoding);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchMarker = 0;
- }
- }
-
- out->attachCollocatedObserver(_adapter, 0);
-
- if(invokeNum > 0)
- {
- if(_reference->getInvocationTimeout() > 0)
- {
- _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, 0, invokeNum, true));
- }
- else if(_dispatcher)
- {
- _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, 0, invokeNum, true));
- }
- else // Optimization: directly call invokeAll if there's no dispatcher.
- {
- out->sent();
- invokeAll(out->os(), 0, invokeNum, true); // Invoke from the user thread.
- }
- }
- else
- {
- out->sent();
- }
-}
-
-AsyncStatus
-CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync)
-{
- Int invokeNum;
- {
- Lock sync(*this);
- while(_batchStreamInUse)
- {
- wait();
- }
-
- invokeNum = _batchRequestNum;
- if(_batchRequestNum > 0)
- {
- outAsync->cancelable(this); // This will throw if the request is canceled
-
- _sendAsyncRequests.insert(make_pair(outAsync, 0));
-
- assert(!_batchStream.b.empty());
- _batchStream.swap(*outAsync->getOs());
-
- //
- // Reset the batch stream.
- //
- BasicStream dummy(_reference->getInstance().get(), currentProtocolEncoding);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchMarker = 0;
- }
- }
-
- outAsync->attachCollocatedObserver(_adapter, 0);
-
- if(invokeNum > 0)
- {
- _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, 0, invokeNum,true));
- return AsyncStatusQueued;
- }
- else if(outAsync->sent())
- {
- return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
- }
- else
- {
- return AsyncStatusSent;
- }
-}
-
-void
CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte, bool amd)
{
- OutgoingAsyncPtr outAsync;
+ OutgoingAsyncBasePtr outAsync;
{
Lock sync(*this);
assert(_response);
-
+
os->i = os->b.begin() + sizeof(replyHdr) + 4;
if(_traceLevels->protocol >= 1)
@@ -510,7 +281,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte, boo
traceRecv(*os, _logger, _traceLevels);
}
- map<int, Outgoing*>::iterator p = _requests.find(requestId);
+ map<int, OutgoingBase*>::iterator p = _requests.find(requestId);
if(p != _requests.end())
{
p->second->completed(*os);
@@ -518,7 +289,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte, boo
}
else
{
- map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId);
+ map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId);
if(q != _asyncRequests.end())
{
os->swap(*q->second->getIs());
@@ -565,7 +336,7 @@ CollocatedRequestHandler::systemException(Int requestId, const SystemException&
return true;
}
-void
+void
CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum, bool amd)
{
handleException(requestId, ex, amd);
@@ -616,9 +387,9 @@ CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync)
}
void
-CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNum, bool batch)
+CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchRequestNum)
{
- if(batch)
+ if(batchRequestNum > 0)
{
os->i = os->b.begin() + sizeof(requestBatchHdr);
}
@@ -634,18 +405,19 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu
{
fillInValue(os, headerSize, requestId);
}
- else if(batch)
+ else if(batchRequestNum > 0)
{
- fillInValue(os, headerSize, invokeNum);
+ fillInValue(os, headerSize, batchRequestNum);
}
traceSend(*os, _logger, _traceLevels);
}
+ int invokeNum = batchRequestNum > 0 ? batchRequestNum : 1;
ServantManagerPtr servantManager = _adapter->getServantManager();
try
{
while(invokeNum > 0)
- {
+ {
try
{
_adapter->incDirectCount();
@@ -675,11 +447,11 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo
return; // Ignore exception for oneway messages.
}
- OutgoingAsyncPtr outAsync;
+ OutgoingAsyncBasePtr outAsync;
{
Lock sync(*this);
-
- map<int, Outgoing*>::iterator p = _requests.find(requestId);
+
+ map<int, OutgoingBase*>::iterator p = _requests.find(requestId);
if(p != _requests.end())
{
p->second->completed(ex);
@@ -687,7 +459,7 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo
}
else
{
- map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId);
+ map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId);
if(q != _asyncRequests.end())
{
if(q->second->completed(ex))