diff options
Diffstat (limited to 'cpp/src/Ice/CollocatedRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 332 |
1 files changed, 52 insertions, 280 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 8b2e505d8d7..d2e78fcd4a9 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -29,14 +29,13 @@ namespace class InvokeAll : public DispatchWorkItem { public: - + InvokeAll(OutgoingBase* out, BasicStream* os, - CollocatedRequestHandler* handler, + CollocatedRequestHandler* handler, Int requestId, - Int invokeNum, - bool batch) : - _out(out), _os(os), _handler(handler), _requestId(requestId), _invokeNum(invokeNum), _batch(batch) + Int batchRequestNum) : + _out(out), _os(os), _handler(handler), _requestId(requestId), _batchRequestNum(batchRequestNum) { } @@ -45,31 +44,29 @@ public: { if(_handler->sent(_out)) { - _handler->invokeAll(_os, _requestId, _invokeNum, _batch); + _handler->invokeAll(_os, _requestId, _batchRequestNum); } } - + private: - + OutgoingBase* _out; BasicStream* _os; CollocatedRequestHandlerPtr _handler; Int _requestId; - Int _invokeNum; - bool _batch; + Int _batchRequestNum; }; class InvokeAllAsync : public DispatchWorkItem { public: - + InvokeAllAsync(const OutgoingAsyncBasePtr& outAsync, BasicStream* os, - CollocatedRequestHandler* handler, + CollocatedRequestHandler* handler, Int requestId, - Int invokeNum, - bool batch) : - _outAsync(outAsync), _os(os), _handler(handler), _requestId(requestId), _invokeNum(invokeNum), _batch(batch) + Int batchRequestNum) : + _outAsync(outAsync), _os(os), _handler(handler), _requestId(requestId), _batchRequestNum(batchRequestNum) { } @@ -78,46 +75,17 @@ public: { if(_handler->sentAsync(_outAsync.get())) { - _handler->invokeAll(_os, _requestId, _invokeNum, _batch); + _handler->invokeAll(_os, _requestId, _batchRequestNum); } } - + private: - + OutgoingAsyncBasePtr _outAsync; BasicStream* _os; CollocatedRequestHandlerPtr _handler; Int _requestId; - Int _invokeNum; - bool _batch; -}; - -class InvokeBatchRequests : public DispatchWorkItem -{ -public: - - InvokeBatchRequests(const CollocatedRequestHandlerPtr& handler, - BasicStream& stream, - Int invokeNum) : - _handler(handler), - _stream(stream.instance(), currentProtocolEncoding), - _invokeNum(invokeNum) - { - _stream.swap(stream); - } - - virtual void - run() - { - _handler->invokeAll(&_stream, 0, _invokeNum, true); - } - -private: - - const CollocatedRequestHandlerPtr _handler; - const OutgoingAsyncBasePtr _outAsync; - BasicStream _stream; - Int _invokeNum; + Int _batchRequestNum; }; void @@ -134,16 +102,12 @@ fillInValue(BasicStream* os, int pos, Int value) } CollocatedRequestHandler::CollocatedRequestHandler(const ReferencePtr& ref, const ObjectAdapterPtr& adapter) : - RequestHandler(ref), - _adapter(ObjectAdapterIPtr::dynamicCast(adapter)), + RequestHandler(ref), + _adapter(ObjectAdapterIPtr::dynamicCast(adapter)), _dispatcher(_reference->getInstance()->initializationData().dispatcher), _logger(_reference->getInstance()->initializationData().logger), // Cached for better performance. _traceLevels(_reference->getInstance()->traceLevels()), // Cached for better performance. - _batchAutoFlushSize(ref->getInstance()->batchAutoFlushSize()), - _requestId(0), - _batchStreamInUse(false), - _batchRequestNum(0), - _batchStream(ref->getInstance().get(), currentProtocolEncoding) + _requestId(0) { } @@ -152,124 +116,29 @@ CollocatedRequestHandler::~CollocatedRequestHandler() } RequestHandlerPtr -CollocatedRequestHandler::connect(const Ice::ObjectPrx&) -{ - return this; -} - -RequestHandlerPtr CollocatedRequestHandler::update(const RequestHandlerPtr& previousHandler, const RequestHandlerPtr& newHandler) { return previousHandler.get() == this ? newHandler : this; } -void -CollocatedRequestHandler::prepareBatchRequest(BasicStream* os) -{ - Lock sync(*this); - while(_batchStreamInUse) - { - wait(); - } - - if(_batchStream.b.empty()) - { - try - { - _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); - } - catch(const LocalException& ex) - { - ex.ice_throw(); - } - } - - _batchStreamInUse = true; - _batchMarker = _batchStream.b.size(); - _batchStream.swap(*os); -} - -void -CollocatedRequestHandler::finishBatchRequest(BasicStream* os) -{ - try - { - Lock sync(*this); - _batchStream.swap(*os); - - if(_batchAutoFlushSize > 0 && (_batchStream.b.size() > _batchAutoFlushSize)) - { - // - // Temporarily save the last request. - // - vector<Byte> lastRequest(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()); - _batchStream.b.resize(_batchMarker); - - _adapter->getThreadPool()->dispatch(new InvokeBatchRequests(this, _batchStream, _batchRequestNum)); - - // - // Reset the batch. - // - BasicStream dummy(_reference->getInstance().get(), currentProtocolEncoding); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchMarker = 0; - - // - // Start a new batch with the last message that caused us to go over the limit. - // - _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); - _batchStream.writeBlob(&lastRequest[0], lastRequest.size()); - } - - // - // Increment the number of requests in the batch. - // - assert(_batchStreamInUse); - ++_batchRequestNum; - _batchStreamInUse = false; - notifyAll(); - } - catch(const LocalException&) - { - abortBatchRequest(); - throw; - } -} - -void -CollocatedRequestHandler::abortBatchRequest() -{ - Lock sync(*this); - - BasicStream dummy(_reference->getInstance().get(), currentProtocolEncoding); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchMarker = 0; - - assert(_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); -} - bool -CollocatedRequestHandler::sendRequest(OutgoingBase* out) +CollocatedRequestHandler::sendRequest(ProxyOutgoingBase* out) { out->invokeCollocated(this); return !_response && _reference->getInvocationTimeout() == 0; } AsyncStatus -CollocatedRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& outAsync) +CollocatedRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& outAsync) { return outAsync->invokeCollocated(this); } -void +void CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalException& ex) { Lock sync(*this); - + map<OutgoingBase*, Int>::iterator p = _sendRequests.find(out); if(p != _sendRequests.end()) { @@ -286,12 +155,12 @@ CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalExceptio Outgoing* o = dynamic_cast<Outgoing*>(out); if(o) { - for(map<Int, Outgoing*>::iterator q = _requests.begin(); q != _requests.end(); ++q) + for(map<Int, OutgoingBase*>::iterator q = _requests.begin(); q != _requests.end(); ++q) { if(q->second == o) { InvocationTimeoutException ex(__FILE__, __LINE__); - o->completed(ex); + q->second->completed(ex); _requests.erase(q); return; // We're done. } @@ -322,7 +191,7 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync); if(o) { - for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) + for(map<Int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { if(q->second.get() == o.get()) { @@ -338,7 +207,7 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs } void -CollocatedRequestHandler::invokeRequest(Outgoing* out) +CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum) { int requestId = 0; { @@ -348,29 +217,31 @@ CollocatedRequestHandler::invokeRequest(Outgoing* out) requestId = ++_requestId; _requests.insert(make_pair(requestId, out)); } + _sendRequests.insert(make_pair(out, requestId)); - } + } out->attachCollocatedObserver(_adapter, requestId); if(_reference->getInvocationTimeout() > 0) { // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise. - _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, requestId, 1, false)); + _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, requestId, batchRequestNum)); } else if(_dispatcher) { - _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, requestId, 1, false)); + _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, requestId, + batchRequestNum)); } else // Optimization: directly call invokeAll if there's no dispatcher. { out->sent(); - invokeAll(out->os(), requestId, 1, false); + invokeAll(out->os(), requestId, batchRequestNum); } } AsyncStatus -CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync) +CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum) { int requestId = 0; { @@ -383,125 +254,25 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync) requestId = ++_requestId; _asyncRequests.insert(make_pair(requestId, outAsync)); } + _sendAsyncRequests.insert(make_pair(outAsync, requestId)); } outAsync->attachCollocatedObserver(_adapter, requestId); - _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, requestId, 1, false)); + _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, requestId, + batchRequestNum)); return AsyncStatusQueued; } void -CollocatedRequestHandler::invokeBatchRequests(OutgoingBase* out) -{ - Int invokeNum; - { - Lock sync(*this); - while(_batchStreamInUse) - { - wait(); - } - - invokeNum = _batchRequestNum; - - if(_batchRequestNum > 0) - { - _sendRequests.insert(make_pair(out, 0)); - - assert(!_batchStream.b.empty()); - _batchStream.swap(*out->os()); - - // - // Reset the batch stream. - // - BasicStream dummy(_reference->getInstance().get(), currentProtocolEncoding); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchMarker = 0; - } - } - - out->attachCollocatedObserver(_adapter, 0); - - if(invokeNum > 0) - { - if(_reference->getInvocationTimeout() > 0) - { - _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, 0, invokeNum, true)); - } - else if(_dispatcher) - { - _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, 0, invokeNum, true)); - } - else // Optimization: directly call invokeAll if there's no dispatcher. - { - out->sent(); - invokeAll(out->os(), 0, invokeNum, true); // Invoke from the user thread. - } - } - else - { - out->sent(); - } -} - -AsyncStatus -CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync) -{ - Int invokeNum; - { - Lock sync(*this); - while(_batchStreamInUse) - { - wait(); - } - - invokeNum = _batchRequestNum; - if(_batchRequestNum > 0) - { - outAsync->cancelable(this); // This will throw if the request is canceled - - _sendAsyncRequests.insert(make_pair(outAsync, 0)); - - assert(!_batchStream.b.empty()); - _batchStream.swap(*outAsync->getOs()); - - // - // Reset the batch stream. - // - BasicStream dummy(_reference->getInstance().get(), currentProtocolEncoding); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchMarker = 0; - } - } - - outAsync->attachCollocatedObserver(_adapter, 0); - - if(invokeNum > 0) - { - _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, 0, invokeNum,true)); - return AsyncStatusQueued; - } - else if(outAsync->sent()) - { - return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback); - } - else - { - return AsyncStatusSent; - } -} - -void CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte, bool amd) { - OutgoingAsyncPtr outAsync; + OutgoingAsyncBasePtr outAsync; { Lock sync(*this); assert(_response); - + os->i = os->b.begin() + sizeof(replyHdr) + 4; if(_traceLevels->protocol >= 1) @@ -510,7 +281,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte, boo traceRecv(*os, _logger, _traceLevels); } - map<int, Outgoing*>::iterator p = _requests.find(requestId); + map<int, OutgoingBase*>::iterator p = _requests.find(requestId); if(p != _requests.end()) { p->second->completed(*os); @@ -518,7 +289,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte, boo } else { - map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId); + map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); if(q != _asyncRequests.end()) { os->swap(*q->second->getIs()); @@ -565,7 +336,7 @@ CollocatedRequestHandler::systemException(Int requestId, const SystemException& return true; } -void +void CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum, bool amd) { handleException(requestId, ex, amd); @@ -616,9 +387,9 @@ CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync) } void -CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNum, bool batch) +CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchRequestNum) { - if(batch) + if(batchRequestNum > 0) { os->i = os->b.begin() + sizeof(requestBatchHdr); } @@ -634,18 +405,19 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu { fillInValue(os, headerSize, requestId); } - else if(batch) + else if(batchRequestNum > 0) { - fillInValue(os, headerSize, invokeNum); + fillInValue(os, headerSize, batchRequestNum); } traceSend(*os, _logger, _traceLevels); } + int invokeNum = batchRequestNum > 0 ? batchRequestNum : 1; ServantManagerPtr servantManager = _adapter->getServantManager(); try { while(invokeNum > 0) - { + { try { _adapter->incDirectCount(); @@ -675,11 +447,11 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo return; // Ignore exception for oneway messages. } - OutgoingAsyncPtr outAsync; + OutgoingAsyncBasePtr outAsync; { Lock sync(*this); - - map<int, Outgoing*>::iterator p = _requests.find(requestId); + + map<int, OutgoingBase*>::iterator p = _requests.find(requestId); if(p != _requests.end()) { p->second->completed(ex); @@ -687,7 +459,7 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo } else { - map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId); + map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); if(q != _asyncRequests.end()) { if(q->second->completed(ex)) |