diff options
Diffstat (limited to 'cpp/src/Ice/CollocatedRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 84 |
1 files changed, 39 insertions, 45 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 5e9dee2b5a3..f6ab4092aa5 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -37,11 +37,7 @@ public: Int batchRequestNum) : _out(out), _os(os), -#ifdef ICE_CPP11_MAPPING - _handler(dynamic_pointer_cast<CollocatedRequestHandler>(handler->shared_from_this())), -#else - _handler(handler), -#endif + _handler(handler->shared_from_this()), _requestId(requestId), _batchRequestNum(batchRequestNum) { @@ -190,9 +186,9 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs _asyncRequests.erase(p->second); } _sendAsyncRequests.erase(p); - if(outAsync->completed(ex)) + if(outAsync->exception(ex)) { - outAsync->invokeCompletedAsync(); + outAsync->invokeExceptionAsync(); } _adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count. return; @@ -206,9 +202,9 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs if(q->second.get() == o.get()) { _asyncRequests.erase(q); - if(outAsync->completed(ex)) + if(outAsync->exception(ex)) { - outAsync->invokeCompletedAsync(); + outAsync->invokeExceptionAsync(); } return; } @@ -257,7 +253,7 @@ CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum) } AsyncStatus -CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum) +CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum, bool synchronous) { // // Increase the direct count to prevent the thread pool from being destroyed before @@ -278,20 +274,10 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int ba if(_response) { requestId = ++_requestId; -#ifdef ICE_CPP11_MAPPING - _asyncRequests.insert(make_pair(requestId, - dynamic_pointer_cast<OutgoingAsyncBase>(outAsync->shared_from_this()))); -#else - _asyncRequests.insert(make_pair(requestId, outAsync)); -#endif + _asyncRequests.insert(make_pair(requestId, outAsync->shared_from_this())); } -#ifdef ICE_CPP11_MAPPING - _sendAsyncRequests.insert(make_pair( - dynamic_pointer_cast<OutgoingAsyncBase>(outAsync->shared_from_this()), requestId)); -#else - _sendAsyncRequests.insert(make_pair(outAsync, requestId)); -#endif + _sendAsyncRequests.insert(make_pair(outAsync->shared_from_this(), requestId)); } catch(...) { @@ -301,17 +287,30 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int ba outAsync->attachCollocatedObserver(_adapter, requestId); -#ifdef ICE_CPP11_MAPPING - _adapter->getThreadPool()->dispatch(new InvokeAllAsync( - dynamic_pointer_cast<OutgoingAsyncBase>(outAsync->shared_from_this()), - outAsync->getOs(), - dynamic_pointer_cast<CollocatedRequestHandler>(shared_from_this()), - requestId, - batchRequestNum)); -#else - _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, requestId, - batchRequestNum)); -#endif + if(!synchronous || !_response || _reference->getInvocationTimeout() > 0) + { + // Don't invoke from the user thread if async or invocation timeout is set + _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync->shared_from_this(), + outAsync->getOs(), + shared_from_this(), + requestId, + batchRequestNum)); + } + else if(_dispatcher) + { + _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAllAsync(outAsync->shared_from_this(), + outAsync->getOs(), + shared_from_this(), + requestId, + batchRequestNum)); + } + else // Optimization: directly call invokeAll if there's no dispatcher. + { + if(sentAsync(outAsync)) + { + invokeAll(outAsync->getOs(), requestId, batchRequestNum); + } + } return AsyncStatusQueued; } @@ -349,7 +348,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, OutputStream* os, Byte, bo if(q != _asyncRequests.end()) { is.swap(*q->second->getIs()); - if(q->second->completed()) + if(q->second->response()) { outAsync = q->second; } @@ -367,11 +366,11 @@ CollocatedRequestHandler::sendResponse(Int requestId, OutputStream* os, Byte, bo // if(amd) { - outAsync->invokeCompletedAsync(); + outAsync->invokeResponseAsync(); } else { - outAsync->invokeCompleted(); + outAsync->invokeResponse(); } } @@ -428,12 +427,7 @@ CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync) { { Lock sync(*this); -#ifdef ICE_CPP11_MAPPING - if(_sendAsyncRequests.erase(outAsync ? - dynamic_pointer_cast<IceInternal::OutgoingAsyncBase>(outAsync->shared_from_this()) : nullptr) == 0) -#else - if(_sendAsyncRequests.erase(outAsync) == 0) -#endif + if(_sendAsyncRequests.erase(outAsync->shared_from_this()) == 0) { return false; // The request timed-out. } @@ -533,7 +527,7 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); if(q != _asyncRequests.end()) { - if(q->second->completed(ex)) + if(q->second->exception(ex)) { outAsync = q->second; } @@ -551,11 +545,11 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo // if(amd) { - outAsync->invokeCompletedAsync(); + outAsync->invokeExceptionAsync(); } else { - outAsync->invokeCompleted(); + outAsync->invokeException(); } } } |