summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/CollocatedRequestHandler.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-07-15 10:22:40 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-07-15 10:22:40 +0200
commit710a9221852d6c92b1727a429a33b38f1f949352 (patch)
tree6bc9ac9ed04a6b1858d8fc30282d4f18ef04abbb /cpp/src/Ice/CollocatedRequestHandler.cpp
parent- Fix for ICE-5578 - Python build failure (diff)
downloadice-710a9221852d6c92b1727a429a33b38f1f949352.tar.bz2
ice-710a9221852d6c92b1727a429a33b38f1f949352.tar.xz
ice-710a9221852d6c92b1727a429a33b38f1f949352.zip
Fixed collocation optimization to use the dispatcher, minor test fixes
Diffstat (limited to 'cpp/src/Ice/CollocatedRequestHandler.cpp')
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp116
1 files changed, 54 insertions, 62 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index 896062a4d42..eb80eb5eeb7 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -134,6 +134,7 @@ fillInValue(BasicStream* os, int pos, Int value)
CollocatedRequestHandler::CollocatedRequestHandler(const ReferencePtr& ref, const ObjectAdapterPtr& adapter) :
RequestHandler(ref),
_adapter(ObjectAdapterIPtr::dynamicCast(adapter)),
+ _dispatcher(_reference->getInstance()->initializationData().dispatcher),
_logger(_reference->getInstance()->initializationData().logger), // Cached for better performance.
_traceLevels(_reference->getInstance()->traceLevels()), // Cached for better performance.
_batchAutoFlush(
@@ -191,7 +192,7 @@ CollocatedRequestHandler::finishBatchRequest(BasicStream* os)
vector<Byte> lastRequest(_batchStream.b.begin() + _batchMarker, _batchStream.b.end());
_batchStream.b.resize(_batchMarker);
- _adapter->getThreadPool()->execute(new InvokeBatchRequests(this, _batchStream, _batchRequestNum));
+ _adapter->getThreadPool()->dispatch(new InvokeBatchRequests(this, _batchStream, _batchRequestNum));
//
// Reset the batch.
@@ -273,23 +274,22 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
_requests.erase(p->second);
}
InvocationTimeoutException ex(__FILE__, __LINE__);
- out->finished(ex, false);
+ out->finished(ex);
_sendRequests.erase(p);
+ return;
}
- else
+
+ Outgoing* o = dynamic_cast<Outgoing*>(out);
+ if(o)
{
- Outgoing* o = dynamic_cast<Outgoing*>(out);
- if(o)
+ for(map<Int, Outgoing*>::iterator q = _requests.begin(); q != _requests.end(); ++q)
{
- for(map<Int, Outgoing*>::iterator q = _requests.begin(); q != _requests.end(); ++q)
+ if(q->second == o)
{
- if(q->second == o)
- {
- InvocationTimeoutException ex(__FILE__, __LINE__);
- o->finished(ex, true);
- _requests.erase(q);
- return; // We're done.
- }
+ InvocationTimeoutException ex(__FILE__, __LINE__);
+ o->finished(ex);
+ _requests.erase(q);
+ return; // We're done.
}
}
}
@@ -298,46 +298,33 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
void
CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
{
- OutgoingAsyncMessageCallbackPtr out;
- bool sent;
+ Lock sync(*this);
+
+ map<OutgoingAsyncMessageCallbackPtr, Int>::iterator p = _sendAsyncRequests.find(outAsync);
+ if(p != _sendAsyncRequests.end())
{
- Lock sync(*this);
-
- map<OutgoingAsyncMessageCallbackPtr, Int>::iterator p = _sendAsyncRequests.find(outAsync);
- if(p != _sendAsyncRequests.end())
+ if(p->second > 0)
{
- if(p->second > 0)
- {
- _asyncRequests.erase(p->second);
- }
- out = p->first;
- sent = false;
- _sendAsyncRequests.erase(p);
+ _asyncRequests.erase(p->second);
}
- else
+ _sendAsyncRequests.erase(p);
+ outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0);
+ return;
+ }
+
+ OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync);
+ if(o)
+ {
+ for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
{
- OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync);
- if(o)
+ if(q->second.get() == o.get())
{
- for(map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
- {
- if(q->second.get() == o.get())
- {
- out = o;
- sent = true;
- _asyncRequests.erase(q);
- break;
- }
- }
+ _asyncRequests.erase(q);
+ outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0);
+ return;
}
}
}
-
- if(out)
- {
- InvocationTimeoutException ex(__FILE__, __LINE__);
- out->__finished(ex, sent);
- }
}
void
@@ -362,12 +349,17 @@ CollocatedRequestHandler::invokeRequest(Outgoing* out)
if(_reference->getInvocationTimeout() > 0)
{
- _adapter->getThreadPool()->execute(new InvokeAll(out, out->os(), this, requestId, 1, false));
+ // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise.
+ _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, requestId, 1, false));
}
- else
+ else if(_dispatcher)
+ {
+ _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, requestId, 1, false));
+ }
+ else // Optimization: directly call invokeAll if there's no dispatcher.
{
out->sent();
- invokeAll(out->os(), requestId, 1, false); // Invoke from the user thread.
+ invokeAll(out->os(), requestId, 1, false);
}
}
@@ -391,7 +383,7 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync)
outAsync->__attachCollocatedObserver(_adapter, requestId);
- _adapter->getThreadPool()->execute(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, requestId, 1, false));
+ _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, requestId, 1, false));
return AsyncStatusQueued;
}
@@ -434,9 +426,13 @@ CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out)
{
if(_reference->getInvocationTimeout() > 0)
{
- _adapter->getThreadPool()->execute(new InvokeAll(out, out->os(), this, 0, invokeNum, true));
+ _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, 0, invokeNum, true));
}
- else
+ else if(_dispatcher)
+ {
+ _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, 0, invokeNum, true));
+ }
+ else // Optimization: directly call invokeAll if there's no dispatcher.
{
out->sent();
invokeAll(out->os(), 0, invokeNum, true); // Invoke from the user thread.
@@ -484,7 +480,7 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync)
if(invokeNum > 0)
{
- _adapter->getThreadPool()->execute(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, 0, invokeNum, true));
+ _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, 0, invokeNum,true));
return AsyncStatusQueued;
}
else if(outAsync->__sent())
@@ -572,12 +568,8 @@ CollocatedRequestHandler::sent(OutgoingMessageCallback* out)
{
return false; // The request timed-out.
}
- out->sent();
- }
- else
- {
- out->sent();
}
+ out->sent();
return true;
}
@@ -636,7 +628,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu
}
catch(const ObjectAdapterDeactivatedException& ex)
{
- handleException(requestId, ex, false);
+ handleException(requestId, ex);
return;
}
@@ -647,7 +639,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu
}
catch(const SystemException& ex)
{
- handleException(requestId, ex, true);
+ handleException(requestId, ex);
_adapter->decDirectCount();
}
--invokeNum;
@@ -660,7 +652,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int invokeNu
}
void
-CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bool sent)
+CollocatedRequestHandler::handleException(int requestId, const Exception& ex)
{
if(requestId == 0)
{
@@ -674,7 +666,7 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo
map<int, Outgoing*>::iterator p = _requests.find(requestId);
if(p != _requests.end())
{
- p->second->finished(ex, sent);
+ p->second->finished(ex);
_requests.erase(p);
}
else
@@ -689,6 +681,6 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo
}
if(outAsync)
{
- outAsync->__finished(ex, sent);
+ outAsync->__finished(ex);
}
}