diff options
Diffstat (limited to 'cpp/src/Ice/CollocatedRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 170 |
1 files changed, 11 insertions, 159 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 1c061dc2b9c..ee35365b042 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -14,7 +14,6 @@ #include <Ice/Reference.h> #include <Ice/Instance.h> #include <Ice/TraceLevels.h> -#include <Ice/Outgoing.h> #include <Ice/OutgoingAsync.h> #include <Ice/TraceUtil.h> @@ -26,41 +25,6 @@ using namespace IceInternal; namespace { -class InvokeAll : public DispatchWorkItem -{ -public: - - InvokeAll(OutgoingBase* out, - OutputStream* os, - CollocatedRequestHandler* handler, - Int requestId, - Int batchRequestNum) : - _out(out), - _os(os), - _handler(ICE_GET_SHARED_FROM_THIS(handler)), - _requestId(requestId), - _batchRequestNum(batchRequestNum) - { - } - - virtual void - run() - { - if(_handler->sent(_out)) - { - _handler->invokeAll(_os, _requestId, _batchRequestNum); - } - } - -private: - - OutgoingBase* _out; - OutputStream* _os; - CollocatedRequestHandlerPtr _handler; - Int _requestId; - Int _batchRequestNum; -}; - class InvokeAllAsync : public DispatchWorkItem { public: @@ -125,13 +89,6 @@ CollocatedRequestHandler::update(const RequestHandlerPtr& previousHandler, const return previousHandler.get() == this ? newHandler : ICE_SHARED_FROM_THIS; } -bool -CollocatedRequestHandler::sendRequest(ProxyOutgoingBase* out) -{ - out->invokeCollocated(this); - return !_response && _reference->getInvocationTimeout() == 0; -} - AsyncStatus CollocatedRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& outAsync) { @@ -139,41 +96,6 @@ CollocatedRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& outA } void -CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalException& ex) -{ - Lock sync(*this); - - map<OutgoingBase*, Int>::iterator p = _sendRequests.find(out); - if(p != _sendRequests.end()) - { - if(p->second > 0) - { - _requests.erase(p->second); - } - InvocationTimeoutException ex(__FILE__, __LINE__); - out->completed(ex); - _sendRequests.erase(p); - _adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count. - return; - } - - Outgoing* o = dynamic_cast<Outgoing*>(out); - if(o) - { - for(map<Int, OutgoingBase*>::iterator q = _requests.begin(); q != _requests.end(); ++q) - { - if(q->second == o) - { - InvocationTimeoutException ex(__FILE__, __LINE__); - q->second->completed(ex); - _requests.erase(q); - return; // We're done. - } - } - } -} - -void CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex) { Lock sync(*this); @@ -212,46 +134,6 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs } } -void -CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum) -{ - // - // Increase the direct count to prevent the thread pool from being destroyed before - // invokeAll is called. This will also throw if the object adapter has been deactivated. - // - _adapter->incDirectCount(); - - int requestId = 0; - { - Lock sync(*this); - if(_response) - { - requestId = ++_requestId; - _requests.insert(make_pair(requestId, out)); - } - - _sendRequests.insert(make_pair(out, requestId)); - } - - out->attachCollocatedObserver(_adapter, requestId); - - if(_reference->getInvocationTimeout() > 0) - { - // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise. - _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, requestId, batchRequestNum)); - } - else if(_dispatcher) - { - _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, requestId, - batchRequestNum)); - } - else // Optimization: directly call invokeAll if there's no dispatcher. - { - out->sent(); - invokeAll(out->os(), requestId, batchRequestNum); - } -} - AsyncStatus CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum, bool synchronous) { @@ -342,24 +224,15 @@ CollocatedRequestHandler::sendResponse(Int requestId, OutputStream* os, Byte, bo traceRecv(is, _logger, _traceLevels); } - map<int, OutgoingBase*>::iterator p = _requests.find(requestId); - if(p != _requests.end()) - { - p->second->completed(is); - _requests.erase(p); - } - else + map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); + if(q != _asyncRequests.end()) { - map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); - if(q != _asyncRequests.end()) + is.swap(*q->second->getIs()); + if(q->second->response()) { - is.swap(*q->second->getIs()); - if(q->second->response()) - { - outAsync = q->second; - } - _asyncRequests.erase(q); + outAsync = q->second; } + _asyncRequests.erase(q); } } @@ -417,18 +290,6 @@ CollocatedRequestHandler::waitForConnection() } bool -CollocatedRequestHandler::sent(OutgoingBase* out) -{ - Lock sync(*this); - if(_sendRequests.erase(out) == 0) - { - return false; // The request timed-out. - } - out->sent(); - return true; -} - -bool CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync) { { @@ -522,23 +383,14 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo { Lock sync(*this); - map<int, OutgoingBase*>::iterator p = _requests.find(requestId); - if(p != _requests.end()) + map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); + if(q != _asyncRequests.end()) { - p->second->completed(ex); - _requests.erase(p); - } - else - { - map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId); - if(q != _asyncRequests.end()) + if(q->second->exception(ex)) { - if(q->second->exception(ex)) - { - outAsync = q->second; - } - _asyncRequests.erase(q); + outAsync = q->second; } + _asyncRequests.erase(q); } } |