diff options
Diffstat (limited to 'cpp/src/Ice/CollocatedRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 273 |
1 files changed, 84 insertions, 189 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index e5fb76c4bca..a193c9f5f5d 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -14,7 +14,6 @@ #include <Ice/Reference.h> #include <Ice/Instance.h> #include <Ice/TraceLevels.h> -#include <Ice/Outgoing.h> #include <Ice/OutgoingAsync.h> #include <Ice/TraceUtil.h> @@ -26,44 +25,13 @@ using namespace IceInternal; namespace { -class InvokeAll : public DispatchWorkItem -{ -public: - - InvokeAll(OutgoingBase* out, - BasicStream* os, - CollocatedRequestHandler* handler, - Int requestId, - Int batchRequestNum) : - _out(out), _os(os), _handler(handler), _requestId(requestId), _batchRequestNum(batchRequestNum) - { - } - - virtual void - run() - { - if(_handler->sent(_out)) - { - _handler->invokeAll(_os, _requestId, _batchRequestNum); - } - } - -private: - - OutgoingBase* _out; - BasicStream* _os; - CollocatedRequestHandlerPtr _handler; - Int _requestId; - Int _batchRequestNum; -}; - class InvokeAllAsync : public DispatchWorkItem { public: InvokeAllAsync(const OutgoingAsyncBasePtr& outAsync, - BasicStream* os, - CollocatedRequestHandler* handler, + OutputStream* os, + const CollocatedRequestHandlerPtr& handler, Int requestId, Int batchRequestNum) : _outAsync(outAsync), _os(os), _handler(handler), _requestId(requestId), _batchRequestNum(batchRequestNum) @@ -82,14 +50,14 @@ public: private: OutgoingAsyncBasePtr _outAsync; - BasicStream* _os; + OutputStream* _os; CollocatedRequestHandlerPtr _handler; Int _requestId; Int _batchRequestNum; }; void -fillInValue(BasicStream* os, int pos, Int value) +fillInValue(OutputStream* os, int pos, Int value) { const Byte* p = reinterpret_cast<const Byte*>(&value); #ifdef ICE_BIG_ENDIAN @@ -103,7 +71,7 @@ fillInValue(BasicStream* os, int pos, Int value) CollocatedRequestHandler::CollocatedRequestHandler(const ReferencePtr& ref, const ObjectAdapterPtr& adapter) : RequestHandler(ref), - _adapter(ObjectAdapterIPtr::dynamicCast(adapter)), + _adapter(ICE_DYNAMIC_CAST(ObjectAdapterI, adapter)), _dispatcher(_reference->getInstance()->initializationData().dispatcher), _logger(_reference->getInstance()->initializationData().logger), // Cached for better performance. _traceLevels(_reference->getInstance()->traceLevels()), // Cached for better performance. @@ -118,14 +86,7 @@ CollocatedRequestHandler::~CollocatedRequestHandler() RequestHandlerPtr CollocatedRequestHandler::update(const RequestHandlerPtr& previousHandler, const RequestHandlerPtr& newHandler) { - return previousHandler.get() == this ? newHandler : this; -} - -bool -CollocatedRequestHandler::sendRequest(ProxyOutgoingBase* out) -{ - out->invokeCollocated(this); - return !_response && _reference->getInvocationTimeout() == 0; + return previousHandler.get() == this ? newHandler : ICE_SHARED_FROM_THIS; } AsyncStatus @@ -135,41 +96,6 @@ CollocatedRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& outA } void -CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalException& ex) -{ - Lock sync(*this); - - map<OutgoingBase*, Int>::iterator p = _sendRequests.find(out); - if(p != _sendRequests.end()) - { - if(p->second > 0) - { - _requests.erase(p->second); - } - InvocationTimeoutException ex(__FILE__, __LINE__); - out->completed(ex); - _sendRequests.erase(p); - _adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count. - return; - } - - Outgoing* o = dynamic_cast<Outgoing*>(out); - if(o) - { - for(map<Int, OutgoingBase*>::iterator q = _requests.begin(); q != _requests.end(); ++q) - { - if(q->second == o) - { - InvocationTimeoutException ex(__FILE__, __LINE__); - q->second->completed(ex); - _requests.erase(q); - return; // We're done. - } - } - } -} - -void CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex) { Lock sync(*this); @@ -182,15 +108,15 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs _asyncRequests.erase(p->second); } _sendAsyncRequests.erase(p); - if(outAsync->completed(ex)) + if(outAsync->exception(ex)) { - outAsync->invokeCompletedAsync(); + outAsync->invokeExceptionAsync(); } _adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count. return; } - OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync); + OutgoingAsyncPtr o = ICE_DYNAMIC_CAST(OutgoingAsync, outAsync); if(o) { for(map<Int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) @@ -198,9 +124,9 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs if(q->second.get() == o.get()) { _asyncRequests.erase(q); - if(outAsync->completed(ex)) + if(outAsync->exception(ex)) { - outAsync->invokeCompletedAsync(); + outAsync->invokeExceptionAsync(); } return; } @@ -208,8 +134,8 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs } } -void -CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum) +AsyncStatus +CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum, bool synchronous) { // // Increase the direct count to prevent the thread pool from being destroyed before @@ -218,107 +144,95 @@ CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum) _adapter->incDirectCount(); int requestId = 0; + try { Lock sync(*this); + + // + // This will throw if the request is canceled + // + outAsync->cancelable(ICE_SHARED_FROM_THIS); + if(_response) { requestId = ++_requestId; - _requests.insert(make_pair(requestId, out)); + _asyncRequests.insert(make_pair(requestId, ICE_GET_SHARED_FROM_THIS(outAsync))); } - _sendRequests.insert(make_pair(out, requestId)); + _sendAsyncRequests.insert(make_pair(ICE_GET_SHARED_FROM_THIS(outAsync), requestId)); + } + catch(...) + { + _adapter->decDirectCount(); + throw; } - out->attachCollocatedObserver(_adapter, requestId); + outAsync->attachCollocatedObserver(_adapter, requestId); - if(_reference->getInvocationTimeout() > 0) + if(!synchronous || !_response || _reference->getInvocationTimeout() > 0) { - // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise. - _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, requestId, batchRequestNum)); + // Don't invoke from the user thread if async or invocation timeout is set + _adapter->getThreadPool()->dispatch(new InvokeAllAsync(ICE_GET_SHARED_FROM_THIS(outAsync), + outAsync->getOs(), + ICE_SHARED_FROM_THIS, + requestId, + batchRequestNum)); } else if(_dispatcher) { - _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, requestId, - batchRequestNum)); + _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAllAsync(ICE_GET_SHARED_FROM_THIS(outAsync), + outAsync->getOs(), + ICE_SHARED_FROM_THIS, + requestId, + batchRequestNum)); } else // Optimization: directly call invokeAll if there's no dispatcher. { - out->sent(); - invokeAll(out->os(), requestId, batchRequestNum); - } -} - -AsyncStatus -CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum) -{ - // - // Increase the direct count to prevent the thread pool from being destroyed before - // invokeAll is called. This will also throw if the object adapter has been deactivated. - // - _adapter->incDirectCount(); - - int requestId = 0; - try - { - Lock sync(*this); - - outAsync->cancelable(this); // This will throw if the request is canceled + // + // Make sure to hold a reference on this handler while the call is being + // dispatched. Otherwise, the handler could be deleted during the dispatch + // if a retry occurs. + // - if(_response) + CollocatedRequestHandlerPtr self(ICE_SHARED_FROM_THIS); + if(sentAsync(outAsync)) { - requestId = ++_requestId; - _asyncRequests.insert(make_pair(requestId, outAsync)); + invokeAll(outAsync->getOs(), requestId, batchRequestNum); } - - _sendAsyncRequests.insert(make_pair(outAsync, requestId)); - } - catch(...) - { - _adapter->decDirectCount(); - throw; } - - outAsync->attachCollocatedObserver(_adapter, requestId); - - _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, requestId, - batchRequestNum)); return AsyncStatusQueued; } void -CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte, bool amd) +CollocatedRequestHandler::sendResponse(Int requestId, OutputStream* os, Byte, bool amd) { OutgoingAsyncBasePtr outAsync; { Lock sync(*this); assert(_response); - os->i = os->b.begin() + sizeof(replyHdr) + 4; - if(_traceLevels->protocol >= 1) { fillInValue(os, 10, static_cast<Int>(os->b.size())); - traceRecv(*os, _logger, _traceLevels); } - map<int, OutgoingBase*>::iterator p = _requests.find(requestId); - if(p != _requests.end()) + InputStream is(os->instance(), os->getEncoding(), *os, true); // Adopting the OutputStream's buffer. + is.pos(sizeof(replyHdr) + 4); + + if(_traceLevels->protocol >= 1) { - p->second->completed(*os); - _requests.erase(p); + traceRecv(is, _logger, _traceLevels); } - else + + map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); + if(q != _asyncRequests.end()) { - map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); - if(q != _asyncRequests.end()) + is.swap(*q->second->getIs()); + if(q->second->response()) { - os->swap(*q->second->getIs()); - if(q->second->completed()) - { - outAsync = q->second; - } - _asyncRequests.erase(q); + outAsync = q->second; } + _asyncRequests.erase(q); } } @@ -331,11 +245,11 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte, boo // if(amd) { - outAsync->invokeCompletedAsync(); + outAsync->invokeResponseAsync(); } else { - outAsync->invokeCompleted(); + outAsync->invokeResponse(); } } @@ -376,23 +290,11 @@ CollocatedRequestHandler::waitForConnection() } bool -CollocatedRequestHandler::sent(OutgoingBase* out) -{ - Lock sync(*this); - if(_sendRequests.erase(out) == 0) - { - return false; // The request timed-out. - } - out->sent(); - return true; -} - -bool CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync) { { Lock sync(*this); - if(_sendAsyncRequests.erase(outAsync) == 0) + if(_sendAsyncRequests.erase(ICE_GET_SHARED_FROM_THIS(outAsync)) == 0) { return false; // The request timed-out. } @@ -407,17 +309,8 @@ CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync) } void -CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchRequestNum) +CollocatedRequestHandler::invokeAll(OutputStream* os, Int requestId, Int batchRequestNum) { - if(batchRequestNum > 0) - { - os->i = os->b.begin() + sizeof(requestBatchHdr); - } - else - { - os->i = os->b.begin() + sizeof(requestHdr); - } - if(_traceLevels->protocol >= 1) { fillInValue(os, 10, static_cast<Int>(os->b.size())); @@ -432,6 +325,17 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchReq traceSend(*os, _logger, _traceLevels); } + InputStream is(os->instance(), os->getEncoding(), *os); + + if(batchRequestNum > 0) + { + is.pos(sizeof(requestBatchHdr)); + } + else + { + is.pos(sizeof(requestHdr)); + } + int invokeNum = batchRequestNum > 0 ? batchRequestNum : 1; ServantManagerPtr servantManager = _adapter->getServantManager(); try @@ -455,7 +359,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchReq } Incoming in(_reference->getInstance().get(), this, 0, _adapter, _response, 0, requestId); - in.invoke(servantManager, os); + in.invoke(servantManager, &is); --invokeNum; } } @@ -479,23 +383,14 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo { Lock sync(*this); - map<int, OutgoingBase*>::iterator p = _requests.find(requestId); - if(p != _requests.end()) - { - p->second->completed(ex); - _requests.erase(p); - } - else + map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); + if(q != _asyncRequests.end()) { - map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); - if(q != _asyncRequests.end()) + if(q->second->exception(ex)) { - if(q->second->completed(ex)) - { - outAsync = q->second; - } - _asyncRequests.erase(q); + outAsync = q->second; } + _asyncRequests.erase(q); } } @@ -508,11 +403,11 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo // if(amd) { - outAsync->invokeCompletedAsync(); + outAsync->invokeExceptionAsync(); } else { - outAsync->invokeCompleted(); + outAsync->invokeException(); } } } |