summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/CollocatedRequestHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/CollocatedRequestHandler.cpp')
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp84
1 files changed, 39 insertions, 45 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index 5e9dee2b5a3..f6ab4092aa5 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -37,11 +37,7 @@ public:
Int batchRequestNum) :
_out(out),
_os(os),
-#ifdef ICE_CPP11_MAPPING
- _handler(dynamic_pointer_cast<CollocatedRequestHandler>(handler->shared_from_this())),
-#else
- _handler(handler),
-#endif
+ _handler(handler->shared_from_this()),
_requestId(requestId),
_batchRequestNum(batchRequestNum)
{
@@ -190,9 +186,9 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs
_asyncRequests.erase(p->second);
}
_sendAsyncRequests.erase(p);
- if(outAsync->completed(ex))
+ if(outAsync->exception(ex))
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeExceptionAsync();
}
_adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count.
return;
@@ -206,9 +202,9 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs
if(q->second.get() == o.get())
{
_asyncRequests.erase(q);
- if(outAsync->completed(ex))
+ if(outAsync->exception(ex))
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeExceptionAsync();
}
return;
}
@@ -257,7 +253,7 @@ CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum)
}
AsyncStatus
-CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum)
+CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum, bool synchronous)
{
//
// Increase the direct count to prevent the thread pool from being destroyed before
@@ -278,20 +274,10 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int ba
if(_response)
{
requestId = ++_requestId;
-#ifdef ICE_CPP11_MAPPING
- _asyncRequests.insert(make_pair(requestId,
- dynamic_pointer_cast<OutgoingAsyncBase>(outAsync->shared_from_this())));
-#else
- _asyncRequests.insert(make_pair(requestId, outAsync));
-#endif
+ _asyncRequests.insert(make_pair(requestId, outAsync->shared_from_this()));
}
-#ifdef ICE_CPP11_MAPPING
- _sendAsyncRequests.insert(make_pair(
- dynamic_pointer_cast<OutgoingAsyncBase>(outAsync->shared_from_this()), requestId));
-#else
- _sendAsyncRequests.insert(make_pair(outAsync, requestId));
-#endif
+ _sendAsyncRequests.insert(make_pair(outAsync->shared_from_this(), requestId));
}
catch(...)
{
@@ -301,17 +287,30 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int ba
outAsync->attachCollocatedObserver(_adapter, requestId);
-#ifdef ICE_CPP11_MAPPING
- _adapter->getThreadPool()->dispatch(new InvokeAllAsync(
- dynamic_pointer_cast<OutgoingAsyncBase>(outAsync->shared_from_this()),
- outAsync->getOs(),
- dynamic_pointer_cast<CollocatedRequestHandler>(shared_from_this()),
- requestId,
- batchRequestNum));
-#else
- _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, requestId,
- batchRequestNum));
-#endif
+ if(!synchronous || !_response || _reference->getInvocationTimeout() > 0)
+ {
+ // Don't invoke from the user thread if async or invocation timeout is set
+ _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync->shared_from_this(),
+ outAsync->getOs(),
+ shared_from_this(),
+ requestId,
+ batchRequestNum));
+ }
+ else if(_dispatcher)
+ {
+ _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAllAsync(outAsync->shared_from_this(),
+ outAsync->getOs(),
+ shared_from_this(),
+ requestId,
+ batchRequestNum));
+ }
+ else // Optimization: directly call invokeAll if there's no dispatcher.
+ {
+ if(sentAsync(outAsync))
+ {
+ invokeAll(outAsync->getOs(), requestId, batchRequestNum);
+ }
+ }
return AsyncStatusQueued;
}
@@ -349,7 +348,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, OutputStream* os, Byte, bo
if(q != _asyncRequests.end())
{
is.swap(*q->second->getIs());
- if(q->second->completed())
+ if(q->second->response())
{
outAsync = q->second;
}
@@ -367,11 +366,11 @@ CollocatedRequestHandler::sendResponse(Int requestId, OutputStream* os, Byte, bo
//
if(amd)
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeResponseAsync();
}
else
{
- outAsync->invokeCompleted();
+ outAsync->invokeResponse();
}
}
@@ -428,12 +427,7 @@ CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync)
{
{
Lock sync(*this);
-#ifdef ICE_CPP11_MAPPING
- if(_sendAsyncRequests.erase(outAsync ?
- dynamic_pointer_cast<IceInternal::OutgoingAsyncBase>(outAsync->shared_from_this()) : nullptr) == 0)
-#else
- if(_sendAsyncRequests.erase(outAsync) == 0)
-#endif
+ if(_sendAsyncRequests.erase(outAsync->shared_from_this()) == 0)
{
return false; // The request timed-out.
}
@@ -533,7 +527,7 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo
map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId);
if(q != _asyncRequests.end())
{
- if(q->second->completed(ex))
+ if(q->second->exception(ex))
{
outAsync = q->second;
}
@@ -551,11 +545,11 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo
//
if(amd)
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeExceptionAsync();
}
else
{
- outAsync->invokeCompleted();
+ outAsync->invokeException();
}
}
}