summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/CollocatedRequestHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/CollocatedRequestHandler.cpp')
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp104
1 files changed, 59 insertions, 45 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index 0b08198726a..3fc321735cb 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -14,6 +14,8 @@
#include <Ice/Reference.h>
#include <Ice/Instance.h>
#include <Ice/TraceLevels.h>
+#include <Ice/Outgoing.h>
+#include <Ice/OutgoingAsync.h>
#include <Ice/TraceUtil.h>
@@ -28,7 +30,7 @@ class InvokeAll : public DispatchWorkItem
{
public:
- InvokeAll(OutgoingMessageCallback* out,
+ InvokeAll(OutgoingBase* out,
BasicStream* os,
CollocatedRequestHandler* handler,
Int requestId,
@@ -49,7 +51,7 @@ public:
private:
- OutgoingMessageCallback* _out;
+ OutgoingBase* _out;
BasicStream* _os;
CollocatedRequestHandlerPtr _handler;
Int _requestId;
@@ -61,7 +63,7 @@ class InvokeAllAsync : public DispatchWorkItem
{
public:
- InvokeAllAsync(const OutgoingAsyncMessageCallbackPtr& outAsync,
+ InvokeAllAsync(const OutgoingAsyncBasePtr& outAsync,
BasicStream* os,
CollocatedRequestHandler* handler,
Int requestId,
@@ -82,7 +84,7 @@ public:
private:
- OutgoingAsyncMessageCallbackPtr _outAsync;
+ OutgoingAsyncBasePtr _outAsync;
BasicStream* _os;
CollocatedRequestHandlerPtr _handler;
Int _requestId;
@@ -113,7 +115,7 @@ public:
private:
const CollocatedRequestHandlerPtr _handler;
- const OutgoingAsyncMessageCallbackPtr _outAsync;
+ const OutgoingAsyncBasePtr _outAsync;
BasicStream _stream;
Int _invokeNum;
};
@@ -261,24 +263,24 @@ CollocatedRequestHandler::abortBatchRequest()
}
bool
-CollocatedRequestHandler::sendRequest(OutgoingMessageCallback* out)
+CollocatedRequestHandler::sendRequest(OutgoingBase* out)
{
out->invokeCollocated(this);
return !_response && _reference->getInvocationTimeout() == 0;
}
AsyncStatus
-CollocatedRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& outAsync)
+CollocatedRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& outAsync)
{
- return outAsync->__invokeCollocated(this);
+ return outAsync->invokeCollocated(this);
}
void
-CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
+CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalException& ex)
{
Lock sync(*this);
- map<OutgoingMessageCallback*, Int>::iterator p = _sendRequests.find(out);
+ map<OutgoingBase*, Int>::iterator p = _sendRequests.find(out);
if(p != _sendRequests.end())
{
if(p->second > 0)
@@ -286,7 +288,7 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
_requests.erase(p->second);
}
InvocationTimeoutException ex(__FILE__, __LINE__);
- out->finished(ex);
+ out->completed(ex);
_sendRequests.erase(p);
return;
}
@@ -299,7 +301,7 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
if(q->second == o)
{
InvocationTimeoutException ex(__FILE__, __LINE__);
- o->finished(ex);
+ o->completed(ex);
_requests.erase(q);
return; // We're done.
}
@@ -307,12 +309,12 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
}
}
-void
-CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
+void
+CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex)
{
Lock sync(*this);
-
- map<OutgoingAsyncMessageCallbackPtr, Int>::iterator p = _sendAsyncRequests.find(outAsync);
+
+ map<OutgoingAsyncBasePtr, Int>::iterator p = _sendAsyncRequests.find(outAsync);
if(p != _sendAsyncRequests.end())
{
if(p->second > 0)
@@ -320,7 +322,10 @@ CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbac
_asyncRequests.erase(p->second);
}
_sendAsyncRequests.erase(p);
- outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0);
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompletedAsync();
+ }
return;
}
@@ -332,7 +337,10 @@ CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbac
if(q->second.get() == o.get())
{
_asyncRequests.erase(q);
- outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0);
+ if(outAsync->completed(ex))
+ {
+ outAsync->invokeCompletedAsync();
+ }
return;
}
}
@@ -391,16 +399,17 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync)
{
_sendAsyncRequests.insert(make_pair(outAsync, requestId));
}
+ outAsync->cancelable(this);
}
- outAsync->__attachCollocatedObserver(_adapter, requestId);
+ outAsync->attachCollocatedObserver(_adapter, requestId);
- _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, requestId, 1, false));
+ _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, requestId, 1, false));
return AsyncStatusQueued;
}
void
-CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out)
+CollocatedRequestHandler::invokeBatchRequests(OutgoingBase* out)
{
Int invokeNum;
{
@@ -457,7 +466,7 @@ CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out)
}
AsyncStatus
-CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync)
+CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync)
{
Int invokeNum;
{
@@ -473,10 +482,12 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync)
if(_reference->getInvocationTimeout() > 0)
{
_sendAsyncRequests.insert(make_pair(outAsync, 0));
+
+ outAsync->cancelable(this);
}
assert(!_batchStream.b.empty());
- _batchStream.swap(*outAsync->__getOs());
+ _batchStream.swap(*outAsync->getOs());
//
// Reset the batch stream.
@@ -488,14 +499,14 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync)
}
}
- outAsync->__attachCollocatedObserver(_adapter, 0);
+ outAsync->attachCollocatedObserver(_adapter, 0);
if(invokeNum > 0)
{
- _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, 0, invokeNum,true));
+ _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, 0, invokeNum,true));
return AsyncStatusQueued;
}
- else if(outAsync->__sent())
+ else if(outAsync->sent())
{
return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
}
@@ -512,7 +523,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte)
{
Lock sync(*this);
assert(_response);
-
+
os->i = os->b.begin() + sizeof(replyHdr) + 4;
if(_traceLevels->protocol >= 1)
@@ -524,7 +535,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte)
map<int, Outgoing*>::iterator p = _requests.find(requestId);
if(p != _requests.end())
{
- p->second->finished(*os);
+ p->second->completed(*os);
_requests.erase(p);
}
else
@@ -532,17 +543,21 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte)
map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId);
if(q != _asyncRequests.end())
{
- os->swap(*q->second->__getIs());
- outAsync = q->second;
+ os->swap(*q->second->getIs());
+ if(q->second->completed())
+ {
+ outAsync = q->second;
+ }
_asyncRequests.erase(q);
}
}
}
- if(outAsync && outAsync->__finished())
+ if(outAsync)
{
- outAsync->__invokeCompleted();
+ outAsync->invokeCompleted();
}
+
_adapter->decDirectCount();
}
@@ -563,12 +578,7 @@ CollocatedRequestHandler::systemException(Int requestId, const SystemException&
void
CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum)
{
- if(requestId > 0)
- {
- Lock sync(*this);
- _requests.erase(requestId);
- _asyncRequests.erase(requestId);
- }
+ handleException(requestId, ex);
_adapter->decDirectCount();
}
@@ -585,7 +595,7 @@ CollocatedRequestHandler::waitForConnection()
}
bool
-CollocatedRequestHandler::sent(OutgoingMessageCallback* out)
+CollocatedRequestHandler::sent(OutgoingBase* out)
{
if(_reference->getInvocationTimeout() > 0)
{
@@ -600,7 +610,7 @@ CollocatedRequestHandler::sent(OutgoingMessageCallback* out)
}
bool
-CollocatedRequestHandler::sentAsync(OutgoingAsyncMessageCallback* outAsync)
+CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync)
{
if(_reference->getInvocationTimeout() > 0)
{
@@ -610,9 +620,9 @@ CollocatedRequestHandler::sentAsync(OutgoingAsyncMessageCallback* outAsync)
return false; // The request timed-out.
}
}
- if(outAsync->__sent())
+ if(outAsync->sent())
{
- outAsync->__invokeSent();
+ outAsync->invokeSent();
}
return true;
}
@@ -684,7 +694,7 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex)
map<int, Outgoing*>::iterator p = _requests.find(requestId);
if(p != _requests.end())
{
- p->second->finished(ex);
+ p->second->completed(ex);
_requests.erase(p);
}
else
@@ -692,13 +702,17 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex)
map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId);
if(q != _asyncRequests.end())
{
- outAsync = q->second;
+ if(q->second->completed(ex))
+ {
+ outAsync = q->second;
+ }
_asyncRequests.erase(q);
}
}
}
+
if(outAsync)
{
- outAsync->__finished(ex);
+ outAsync->invokeCompleted();
}
}