summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/CollocatedRequestHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/CollocatedRequestHandler.cpp')
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp273
1 files changed, 84 insertions, 189 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index e5fb76c4bca..a193c9f5f5d 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -14,7 +14,6 @@
#include <Ice/Reference.h>
#include <Ice/Instance.h>
#include <Ice/TraceLevels.h>
-#include <Ice/Outgoing.h>
#include <Ice/OutgoingAsync.h>
#include <Ice/TraceUtil.h>
@@ -26,44 +25,13 @@ using namespace IceInternal;
namespace
{
-class InvokeAll : public DispatchWorkItem
-{
-public:
-
- InvokeAll(OutgoingBase* out,
- BasicStream* os,
- CollocatedRequestHandler* handler,
- Int requestId,
- Int batchRequestNum) :
- _out(out), _os(os), _handler(handler), _requestId(requestId), _batchRequestNum(batchRequestNum)
- {
- }
-
- virtual void
- run()
- {
- if(_handler->sent(_out))
- {
- _handler->invokeAll(_os, _requestId, _batchRequestNum);
- }
- }
-
-private:
-
- OutgoingBase* _out;
- BasicStream* _os;
- CollocatedRequestHandlerPtr _handler;
- Int _requestId;
- Int _batchRequestNum;
-};
-
class InvokeAllAsync : public DispatchWorkItem
{
public:
InvokeAllAsync(const OutgoingAsyncBasePtr& outAsync,
- BasicStream* os,
- CollocatedRequestHandler* handler,
+ OutputStream* os,
+ const CollocatedRequestHandlerPtr& handler,
Int requestId,
Int batchRequestNum) :
_outAsync(outAsync), _os(os), _handler(handler), _requestId(requestId), _batchRequestNum(batchRequestNum)
@@ -82,14 +50,14 @@ public:
private:
OutgoingAsyncBasePtr _outAsync;
- BasicStream* _os;
+ OutputStream* _os;
CollocatedRequestHandlerPtr _handler;
Int _requestId;
Int _batchRequestNum;
};
void
-fillInValue(BasicStream* os, int pos, Int value)
+fillInValue(OutputStream* os, int pos, Int value)
{
const Byte* p = reinterpret_cast<const Byte*>(&value);
#ifdef ICE_BIG_ENDIAN
@@ -103,7 +71,7 @@ fillInValue(BasicStream* os, int pos, Int value)
CollocatedRequestHandler::CollocatedRequestHandler(const ReferencePtr& ref, const ObjectAdapterPtr& adapter) :
RequestHandler(ref),
- _adapter(ObjectAdapterIPtr::dynamicCast(adapter)),
+ _adapter(ICE_DYNAMIC_CAST(ObjectAdapterI, adapter)),
_dispatcher(_reference->getInstance()->initializationData().dispatcher),
_logger(_reference->getInstance()->initializationData().logger), // Cached for better performance.
_traceLevels(_reference->getInstance()->traceLevels()), // Cached for better performance.
@@ -118,14 +86,7 @@ CollocatedRequestHandler::~CollocatedRequestHandler()
RequestHandlerPtr
CollocatedRequestHandler::update(const RequestHandlerPtr& previousHandler, const RequestHandlerPtr& newHandler)
{
- return previousHandler.get() == this ? newHandler : this;
-}
-
-bool
-CollocatedRequestHandler::sendRequest(ProxyOutgoingBase* out)
-{
- out->invokeCollocated(this);
- return !_response && _reference->getInvocationTimeout() == 0;
+ return previousHandler.get() == this ? newHandler : ICE_SHARED_FROM_THIS;
}
AsyncStatus
@@ -135,41 +96,6 @@ CollocatedRequestHandler::sendAsyncRequest(const ProxyOutgoingAsyncBasePtr& outA
}
void
-CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalException& ex)
-{
- Lock sync(*this);
-
- map<OutgoingBase*, Int>::iterator p = _sendRequests.find(out);
- if(p != _sendRequests.end())
- {
- if(p->second > 0)
- {
- _requests.erase(p->second);
- }
- InvocationTimeoutException ex(__FILE__, __LINE__);
- out->completed(ex);
- _sendRequests.erase(p);
- _adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count.
- return;
- }
-
- Outgoing* o = dynamic_cast<Outgoing*>(out);
- if(o)
- {
- for(map<Int, OutgoingBase*>::iterator q = _requests.begin(); q != _requests.end(); ++q)
- {
- if(q->second == o)
- {
- InvocationTimeoutException ex(__FILE__, __LINE__);
- q->second->completed(ex);
- _requests.erase(q);
- return; // We're done.
- }
- }
- }
-}
-
-void
CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex)
{
Lock sync(*this);
@@ -182,15 +108,15 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs
_asyncRequests.erase(p->second);
}
_sendAsyncRequests.erase(p);
- if(outAsync->completed(ex))
+ if(outAsync->exception(ex))
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeExceptionAsync();
}
_adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count.
return;
}
- OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync);
+ OutgoingAsyncPtr o = ICE_DYNAMIC_CAST(OutgoingAsync, outAsync);
if(o)
{
for(map<Int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
@@ -198,9 +124,9 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs
if(q->second.get() == o.get())
{
_asyncRequests.erase(q);
- if(outAsync->completed(ex))
+ if(outAsync->exception(ex))
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeExceptionAsync();
}
return;
}
@@ -208,8 +134,8 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs
}
}
-void
-CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum)
+AsyncStatus
+CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum, bool synchronous)
{
//
// Increase the direct count to prevent the thread pool from being destroyed before
@@ -218,107 +144,95 @@ CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum)
_adapter->incDirectCount();
int requestId = 0;
+ try
{
Lock sync(*this);
+
+ //
+ // This will throw if the request is canceled
+ //
+ outAsync->cancelable(ICE_SHARED_FROM_THIS);
+
if(_response)
{
requestId = ++_requestId;
- _requests.insert(make_pair(requestId, out));
+ _asyncRequests.insert(make_pair(requestId, ICE_GET_SHARED_FROM_THIS(outAsync)));
}
- _sendRequests.insert(make_pair(out, requestId));
+ _sendAsyncRequests.insert(make_pair(ICE_GET_SHARED_FROM_THIS(outAsync), requestId));
+ }
+ catch(...)
+ {
+ _adapter->decDirectCount();
+ throw;
}
- out->attachCollocatedObserver(_adapter, requestId);
+ outAsync->attachCollocatedObserver(_adapter, requestId);
- if(_reference->getInvocationTimeout() > 0)
+ if(!synchronous || !_response || _reference->getInvocationTimeout() > 0)
{
- // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise.
- _adapter->getThreadPool()->dispatch(new InvokeAll(out, out->os(), this, requestId, batchRequestNum));
+ // Don't invoke from the user thread if async or invocation timeout is set
+ _adapter->getThreadPool()->dispatch(new InvokeAllAsync(ICE_GET_SHARED_FROM_THIS(outAsync),
+ outAsync->getOs(),
+ ICE_SHARED_FROM_THIS,
+ requestId,
+ batchRequestNum));
}
else if(_dispatcher)
{
- _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAll(out, out->os(), this, requestId,
- batchRequestNum));
+ _adapter->getThreadPool()->dispatchFromThisThread(new InvokeAllAsync(ICE_GET_SHARED_FROM_THIS(outAsync),
+ outAsync->getOs(),
+ ICE_SHARED_FROM_THIS,
+ requestId,
+ batchRequestNum));
}
else // Optimization: directly call invokeAll if there's no dispatcher.
{
- out->sent();
- invokeAll(out->os(), requestId, batchRequestNum);
- }
-}
-
-AsyncStatus
-CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum)
-{
- //
- // Increase the direct count to prevent the thread pool from being destroyed before
- // invokeAll is called. This will also throw if the object adapter has been deactivated.
- //
- _adapter->incDirectCount();
-
- int requestId = 0;
- try
- {
- Lock sync(*this);
-
- outAsync->cancelable(this); // This will throw if the request is canceled
+ //
+ // Make sure to hold a reference on this handler while the call is being
+ // dispatched. Otherwise, the handler could be deleted during the dispatch
+ // if a retry occurs.
+ //
- if(_response)
+ CollocatedRequestHandlerPtr self(ICE_SHARED_FROM_THIS);
+ if(sentAsync(outAsync))
{
- requestId = ++_requestId;
- _asyncRequests.insert(make_pair(requestId, outAsync));
+ invokeAll(outAsync->getOs(), requestId, batchRequestNum);
}
-
- _sendAsyncRequests.insert(make_pair(outAsync, requestId));
- }
- catch(...)
- {
- _adapter->decDirectCount();
- throw;
}
-
- outAsync->attachCollocatedObserver(_adapter, requestId);
-
- _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, requestId,
- batchRequestNum));
return AsyncStatusQueued;
}
void
-CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte, bool amd)
+CollocatedRequestHandler::sendResponse(Int requestId, OutputStream* os, Byte, bool amd)
{
OutgoingAsyncBasePtr outAsync;
{
Lock sync(*this);
assert(_response);
- os->i = os->b.begin() + sizeof(replyHdr) + 4;
-
if(_traceLevels->protocol >= 1)
{
fillInValue(os, 10, static_cast<Int>(os->b.size()));
- traceRecv(*os, _logger, _traceLevels);
}
- map<int, OutgoingBase*>::iterator p = _requests.find(requestId);
- if(p != _requests.end())
+ InputStream is(os->instance(), os->getEncoding(), *os, true); // Adopting the OutputStream's buffer.
+ is.pos(sizeof(replyHdr) + 4);
+
+ if(_traceLevels->protocol >= 1)
{
- p->second->completed(*os);
- _requests.erase(p);
+ traceRecv(is, _logger, _traceLevels);
}
- else
+
+ map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId);
+ if(q != _asyncRequests.end())
{
- map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId);
- if(q != _asyncRequests.end())
+ is.swap(*q->second->getIs());
+ if(q->second->response())
{
- os->swap(*q->second->getIs());
- if(q->second->completed())
- {
- outAsync = q->second;
- }
- _asyncRequests.erase(q);
+ outAsync = q->second;
}
+ _asyncRequests.erase(q);
}
}
@@ -331,11 +245,11 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte, boo
//
if(amd)
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeResponseAsync();
}
else
{
- outAsync->invokeCompleted();
+ outAsync->invokeResponse();
}
}
@@ -376,23 +290,11 @@ CollocatedRequestHandler::waitForConnection()
}
bool
-CollocatedRequestHandler::sent(OutgoingBase* out)
-{
- Lock sync(*this);
- if(_sendRequests.erase(out) == 0)
- {
- return false; // The request timed-out.
- }
- out->sent();
- return true;
-}
-
-bool
CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync)
{
{
Lock sync(*this);
- if(_sendAsyncRequests.erase(outAsync) == 0)
+ if(_sendAsyncRequests.erase(ICE_GET_SHARED_FROM_THIS(outAsync)) == 0)
{
return false; // The request timed-out.
}
@@ -407,17 +309,8 @@ CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync)
}
void
-CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchRequestNum)
+CollocatedRequestHandler::invokeAll(OutputStream* os, Int requestId, Int batchRequestNum)
{
- if(batchRequestNum > 0)
- {
- os->i = os->b.begin() + sizeof(requestBatchHdr);
- }
- else
- {
- os->i = os->b.begin() + sizeof(requestHdr);
- }
-
if(_traceLevels->protocol >= 1)
{
fillInValue(os, 10, static_cast<Int>(os->b.size()));
@@ -432,6 +325,17 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchReq
traceSend(*os, _logger, _traceLevels);
}
+ InputStream is(os->instance(), os->getEncoding(), *os);
+
+ if(batchRequestNum > 0)
+ {
+ is.pos(sizeof(requestBatchHdr));
+ }
+ else
+ {
+ is.pos(sizeof(requestHdr));
+ }
+
int invokeNum = batchRequestNum > 0 ? batchRequestNum : 1;
ServantManagerPtr servantManager = _adapter->getServantManager();
try
@@ -455,7 +359,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchReq
}
Incoming in(_reference->getInstance().get(), this, 0, _adapter, _response, 0, requestId);
- in.invoke(servantManager, os);
+ in.invoke(servantManager, &is);
--invokeNum;
}
}
@@ -479,23 +383,14 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo
{
Lock sync(*this);
- map<int, OutgoingBase*>::iterator p = _requests.find(requestId);
- if(p != _requests.end())
- {
- p->second->completed(ex);
- _requests.erase(p);
- }
- else
+ map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId);
+ if(q != _asyncRequests.end())
{
- map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId);
- if(q != _asyncRequests.end())
+ if(q->second->exception(ex))
{
- if(q->second->completed(ex))
- {
- outAsync = q->second;
- }
- _asyncRequests.erase(q);
+ outAsync = q->second;
}
+ _asyncRequests.erase(q);
}
}
@@ -508,11 +403,11 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex, bo
//
if(amd)
{
- outAsync->invokeCompletedAsync();
+ outAsync->invokeExceptionAsync();
}
else
{
- outAsync->invokeCompleted();
+ outAsync->invokeException();
}
}
}