diff options
Diffstat (limited to 'cpp/src/Ice/CollocatedRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 104 |
1 files changed, 59 insertions, 45 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 0b08198726a..3fc321735cb 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -14,6 +14,8 @@ #include <Ice/Reference.h> #include <Ice/Instance.h> #include <Ice/TraceLevels.h> +#include <Ice/Outgoing.h> +#include <Ice/OutgoingAsync.h> #include <Ice/TraceUtil.h> @@ -28,7 +30,7 @@ class InvokeAll : public DispatchWorkItem { public: - InvokeAll(OutgoingMessageCallback* out, + InvokeAll(OutgoingBase* out, BasicStream* os, CollocatedRequestHandler* handler, Int requestId, @@ -49,7 +51,7 @@ public: private: - OutgoingMessageCallback* _out; + OutgoingBase* _out; BasicStream* _os; CollocatedRequestHandlerPtr _handler; Int _requestId; @@ -61,7 +63,7 @@ class InvokeAllAsync : public DispatchWorkItem { public: - InvokeAllAsync(const OutgoingAsyncMessageCallbackPtr& outAsync, + InvokeAllAsync(const OutgoingAsyncBasePtr& outAsync, BasicStream* os, CollocatedRequestHandler* handler, Int requestId, @@ -82,7 +84,7 @@ public: private: - OutgoingAsyncMessageCallbackPtr _outAsync; + OutgoingAsyncBasePtr _outAsync; BasicStream* _os; CollocatedRequestHandlerPtr _handler; Int _requestId; @@ -113,7 +115,7 @@ public: private: const CollocatedRequestHandlerPtr _handler; - const OutgoingAsyncMessageCallbackPtr _outAsync; + const OutgoingAsyncBasePtr _outAsync; BasicStream _stream; Int _invokeNum; }; @@ -261,24 +263,24 @@ CollocatedRequestHandler::abortBatchRequest() } bool -CollocatedRequestHandler::sendRequest(OutgoingMessageCallback* out) +CollocatedRequestHandler::sendRequest(OutgoingBase* out) { out->invokeCollocated(this); return !_response && _reference->getInvocationTimeout() == 0; } AsyncStatus -CollocatedRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& outAsync) +CollocatedRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& outAsync) { - return outAsync->__invokeCollocated(this); + return outAsync->invokeCollocated(this); } void -CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) +CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalException& ex) { Lock sync(*this); - map<OutgoingMessageCallback*, Int>::iterator p = _sendRequests.find(out); + map<OutgoingBase*, Int>::iterator p = _sendRequests.find(out); if(p != _sendRequests.end()) { if(p->second > 0) @@ -286,7 +288,7 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) _requests.erase(p->second); } InvocationTimeoutException ex(__FILE__, __LINE__); - out->finished(ex); + out->completed(ex); _sendRequests.erase(p); return; } @@ -299,7 +301,7 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) if(q->second == o) { InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex); + o->completed(ex); _requests.erase(q); return; // We're done. } @@ -307,12 +309,12 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) } } -void -CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) +void +CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex) { Lock sync(*this); - - map<OutgoingAsyncMessageCallbackPtr, Int>::iterator p = _sendAsyncRequests.find(outAsync); + + map<OutgoingAsyncBasePtr, Int>::iterator p = _sendAsyncRequests.find(outAsync); if(p != _sendAsyncRequests.end()) { if(p->second > 0) @@ -320,7 +322,10 @@ CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbac _asyncRequests.erase(p->second); } _sendAsyncRequests.erase(p); - outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0); + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } return; } @@ -332,7 +337,10 @@ CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbac if(q->second.get() == o.get()) { _asyncRequests.erase(q); - outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0); + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } return; } } @@ -391,16 +399,17 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync) { _sendAsyncRequests.insert(make_pair(outAsync, requestId)); } + outAsync->cancelable(this); } - outAsync->__attachCollocatedObserver(_adapter, requestId); + outAsync->attachCollocatedObserver(_adapter, requestId); - _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, requestId, 1, false)); + _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, requestId, 1, false)); return AsyncStatusQueued; } void -CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out) +CollocatedRequestHandler::invokeBatchRequests(OutgoingBase* out) { Int invokeNum; { @@ -457,7 +466,7 @@ CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out) } AsyncStatus -CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync) +CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync) { Int invokeNum; { @@ -473,10 +482,12 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync) if(_reference->getInvocationTimeout() > 0) { _sendAsyncRequests.insert(make_pair(outAsync, 0)); + + outAsync->cancelable(this); } assert(!_batchStream.b.empty()); - _batchStream.swap(*outAsync->__getOs()); + _batchStream.swap(*outAsync->getOs()); // // Reset the batch stream. @@ -488,14 +499,14 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync) } } - outAsync->__attachCollocatedObserver(_adapter, 0); + outAsync->attachCollocatedObserver(_adapter, 0); if(invokeNum > 0) { - _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, 0, invokeNum,true)); + _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, 0, invokeNum,true)); return AsyncStatusQueued; } - else if(outAsync->__sent()) + else if(outAsync->sent()) { return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback); } @@ -512,7 +523,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte) { Lock sync(*this); assert(_response); - + os->i = os->b.begin() + sizeof(replyHdr) + 4; if(_traceLevels->protocol >= 1) @@ -524,7 +535,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte) map<int, Outgoing*>::iterator p = _requests.find(requestId); if(p != _requests.end()) { - p->second->finished(*os); + p->second->completed(*os); _requests.erase(p); } else @@ -532,17 +543,21 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte) map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId); if(q != _asyncRequests.end()) { - os->swap(*q->second->__getIs()); - outAsync = q->second; + os->swap(*q->second->getIs()); + if(q->second->completed()) + { + outAsync = q->second; + } _asyncRequests.erase(q); } } } - if(outAsync && outAsync->__finished()) + if(outAsync) { - outAsync->__invokeCompleted(); + outAsync->invokeCompleted(); } + _adapter->decDirectCount(); } @@ -563,12 +578,7 @@ CollocatedRequestHandler::systemException(Int requestId, const SystemException& void CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum) { - if(requestId > 0) - { - Lock sync(*this); - _requests.erase(requestId); - _asyncRequests.erase(requestId); - } + handleException(requestId, ex); _adapter->decDirectCount(); } @@ -585,7 +595,7 @@ CollocatedRequestHandler::waitForConnection() } bool -CollocatedRequestHandler::sent(OutgoingMessageCallback* out) +CollocatedRequestHandler::sent(OutgoingBase* out) { if(_reference->getInvocationTimeout() > 0) { @@ -600,7 +610,7 @@ CollocatedRequestHandler::sent(OutgoingMessageCallback* out) } bool -CollocatedRequestHandler::sentAsync(OutgoingAsyncMessageCallback* outAsync) +CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync) { if(_reference->getInvocationTimeout() > 0) { @@ -610,9 +620,9 @@ CollocatedRequestHandler::sentAsync(OutgoingAsyncMessageCallback* outAsync) return false; // The request timed-out. } } - if(outAsync->__sent()) + if(outAsync->sent()) { - outAsync->__invokeSent(); + outAsync->invokeSent(); } return true; } @@ -684,7 +694,7 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex) map<int, Outgoing*>::iterator p = _requests.find(requestId); if(p != _requests.end()) { - p->second->finished(ex); + p->second->completed(ex); _requests.erase(p); } else @@ -692,13 +702,17 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex) map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId); if(q != _asyncRequests.end()) { - outAsync = q->second; + if(q->second->completed(ex)) + { + outAsync = q->second; + } _asyncRequests.erase(q); } } } + if(outAsync) { - outAsync->__finished(ex); + outAsync->invokeCompleted(); } } |