diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-07-15 10:22:40 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-07-15 10:22:40 +0200 |
commit | 710a9221852d6c92b1727a429a33b38f1f949352 (patch) | |
tree | 6bc9ac9ed04a6b1858d8fc30282d4f18ef04abbb /cpp/src/Ice/CollocatedRequestHandler.cpp | |
parent | - Fix for ICE-5578 - Python build failure (diff) | |
download | ice-710a9221852d6c92b1727a429a33b38f1f949352.tar.bz2 ice-710a9221852d6c92b1727a429a33b38f1f949352.tar.xz ice-710a9221852d6c92b1727a429a33b38f1f949352.zip |
Fixed collocation optimization to use the dispatcher, minor test fixes
Diffstat (limited to 'cpp/src/Ice/CollocatedRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 116 |
1 files changed, 54 insertions, 62 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 896062a4d42..eb80eb5eeb7 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -134,6 +134,7 @@ fillInValue(BasicStream* os, int pos, Int value) CollocatedRequestHandler::CollocatedRequestHandler(const ReferencePtr& ref, const ObjectAdapterPtr& adapter) : RequestHandler(ref), _adapter(ObjectAdapterIPtr::dynamicCast(adapter)), + _dispatcher(_reference->getInstance()->initializationData().dispatcher), _logger(_reference->getInstance()->initializationData().logger), // Cached for better performance. _traceLevels(_reference->getInstance()->traceLevels()), // Cached for better performance. _batchAutoFlush( @@ -191,7 +192,7 @@ CollocatedRequestHandler::finishBatchRequest(BasicStream* os) vector<Byte> lastRequest(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()); _batchStream.b.resize(_batchMarker); - _adapter->getThreadPool()->execute(new InvokeBatchRequests(this, _batchStream, _batchRequestNum)); + _adapter->getThreadPool()->dispatch(new InvokeBatchRequests(this, _batchStream, _batchRequestNum)); // // Reset the batch. @@ -273,23 +274,22 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) _requests.erase(p->second); } InvocationTimeoutException ex(__FILE__, __LINE__); - out->finished(ex, false); + out->finished(ex); _sendRequests.erase(p); + return; } - else + + Outgoing* o = dynamic_cast<Outgoing*>(out); + if(o) { - Outgoing* o = dynamic_cast<Outgoing*>(out); - if(o) + for(map<Int, Outgoing*>::iterator q = _requests.begin(); q != _requests.end(); ++q) { - for(map<Int, Outgoing*>::iterator q = _requests.begin(); q != _requests.end(); ++q) + if(q->second == o) { - if(q->second == o) - { - InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex, true); - _requests.erase(q); - return; // We're done. - } + InvocationTimeoutException ex(__FILE__, __LINE__); + o->finished(ex); + _requests.erase(q); + return; // We're done. } } } @@ -298,46 +298,33 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) void CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) { - OutgoingAsyncMessageCallbackPtr out; - bool sent; + Lock sync(*this); + + map<OutgoingAsyncMessageCallbackPtr, Int>::iterator p = _sendAsyncRequests.find(outAsync); + if(p != _sendAsyncRequests.end()) { - Lock sync(*this); - - map<OutgoingAsyncMessageCallbackPtr, Int>::iterator p = _sendAsyncRequests.find(outAsync); - if(p != _sendAsyncRequests.end()) + if(p->second > 0) { - if(p->second > 0) - { - _asyncRequests.erase(p->second); - } - out = p->first; - sent = false; - _sendAsyncRequests.erase(p); + _asyncRequests.erase(p->second); } - else + _sendAsyncRequests.erase(p); + outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0); + return; + } + + OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync); + if(o) + { + for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { - OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync); - if(o) + if(q->second.get() == o.get()) { - for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) - { - if(q->second.get() == o.get()) - { - out = o; - sent = true; - _asyncRequests.erase(q); - break; - } - } + _asyncRequests.erase(q); + outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0); + return; } } } - - if(out) - { - InvocationTimeoutException ex(__FILE__, __LINE__); - out->__finished(ex, sent); - } } void @@ -362,12 +349,17 @@ CollocatedRequestHandler::invokeRequest(Outgoing* out) if(_reference->getInvocationTimeout() > 0) { - _adapter->getThreadPool()->execute(new InvokeAll(out, out->os(), this, requestId, 1, false)); + // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise. + _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, requestId, 1, false)); } - else + else if(_dispatcher) + { + _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, requestId, 1, false)); + } + else // Optimization: directly call invokeAll if there's no dispatcher. { out->sent(); - invokeAll(out->os(), requestId, 1, false); // Invoke from the user thread. + invokeAll(out->os(), requestId, 1, false); } } @@ -391,7 +383,7 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync) outAsync->__attachCollocatedObserver(_adapter, requestId); - _adapter->getThreadPool()->execute(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, requestId, 1, false)); + _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, requestId, 1, false)); return AsyncStatusQueued; } @@ -434,9 +426,13 @@ CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out) { if(_reference->getInvocationTimeout() > 0) { - _adapter->getThreadPool()->execute(new InvokeAll(out, out->os(), this, 0, invokeNum, true)); + _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, 0, invokeNum, true)); } - else + else if(_dispatcher) + { + _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, 0, invokeNum, true)); + } + else // Optimization: directly call invokeAll if there's no dispatcher. { out->sent(); invokeAll(out->os(), 0, invokeNum, true); // Invoke from the user thread. @@ -484,7 +480,7 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync) if(invokeNum > 0) { - _adapter->getThreadPool()->execute(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, 0, invokeNum, true)); + _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, 0, invokeNum,true)); return AsyncStatusQueued; } else if(outAsync->__sent()) @@ -572,12 +568,8 @@ CollocatedRequestHandler::sent(OutgoingMessageCallback* out) { return false; // The request timed-out. } - out->sent(); - } - else - { - out->sent(); } + out->sent(); return true; } @@ -636,7 +628,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu } catch(const ObjectAdapterDeactivatedException& ex) { - handleException(requestId, ex, false); + handleException(requestId, ex); return; } @@ -647,7 +639,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu } catch(const SystemException& ex) { - handleException(requestId, ex, true); + handleException(requestId, ex); _adapter->decDirectCount(); } --invokeNum; @@ -660,7 +652,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu } void -CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bool sent) +CollocatedRequestHandler::handleException(int requestId, const Exception& ex) { if(requestId == 0) { @@ -674,7 +666,7 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo map<int, Outgoing*>::iterator p = _requests.find(requestId); if(p != _requests.end()) { - p->second->finished(ex, sent); + p->second->finished(ex); _requests.erase(p); } else @@ -689,6 +681,6 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo } if(outAsync) { - outAsync->__finished(ex, sent); + outAsync->__finished(ex); } } |