summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectRequestHandler.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-05-23 11:59:44 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-05-23 11:59:44 +0200
commitd81701ca8182942b7936f9fd84a019b695e9c890 (patch)
treedc036c9d701fbbe1afad67782bd78572c0f61974 /cpp/src/Ice/ConnectRequestHandler.cpp
parentFixed bug ICE-5543: stringToIdentity bug with escaped escapes (diff)
downloadice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.bz2
ice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.xz
ice-d81701ca8182942b7936f9fd84a019b695e9c890.zip
Added support for invocation timeouts and ACM heartbeats
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r--cpp/src/Ice/ConnectRequestHandler.cpp117
1 files changed, 77 insertions, 40 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp
index 6aee1ddb8c8..8563775deee 100644
--- a/cpp/src/Ice/ConnectRequestHandler.cpp
+++ b/cpp/src/Ice/ConnectRequestHandler.cpp
@@ -89,7 +89,7 @@ public:
{
for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = _callbacks.begin(); p != _callbacks.end(); ++p)
{
- (*p)->__sent();
+ (*p)->__invokeSent();
}
}
@@ -212,58 +212,82 @@ ConnectRequestHandler::abortBatchRequest()
_connection->abortBatchRequest();
}
-Ice::ConnectionI*
-ConnectRequestHandler::sendRequest(Outgoing* out)
+bool
+ConnectRequestHandler::sendRequest(OutgoingMessageCallback* out)
{
- // Must be called first, _compress might not be initialized before this returns.
- Ice::ConnectionIPtr connection = getConnection(true);
- assert(connection);
- if(!connection->sendRequest(out, _compress, _response) || _response)
- {
- return _connection.get(); // The request hasn't been sent or we're expecting a response.
- }
- else
{
- return 0; // The request has been sent.
+ Lock sync(*this);
+ if(!initialized())
+ {
+ Request req;
+ req.out = out;
+ _requests.push_back(req);
+ return false; // Not sent
+ }
}
+ return out->send(_connection, _compress, _response) && !_response; // Finished if sent and no response.
}
AsyncStatus
-ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncPtr& out)
+ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& out)
{
{
Lock sync(*this);
if(!initialized())
{
Request req;
- req.out = out;
+ req.outAsync = out;
_requests.push_back(req);
return AsyncStatusQueued;
}
}
- return _connection->sendAsyncRequest(out, _compress, _response);
+ return out->__send(_connection, _compress, _response);
}
-bool
-ConnectRequestHandler::flushBatchRequests(BatchOutgoing* out)
+void
+ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
{
- return getConnection(true)->flushBatchRequests(out);
+ {
+ Lock sync(*this);
+ if(!initialized())
+ {
+ for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ if(p->out == out)
+ {
+ Ice::InvocationTimeoutException ex(__FILE__, __LINE__);
+ out->finished(ex, false);
+ _requests.erase(p);
+ return;
+ }
+ }
+ assert(false); // The request has to be queued if it timed out and we're not initialized yet.
+ }
+ }
+ _connection->requestTimedOut(out);
}
-AsyncStatus
-ConnectRequestHandler::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& out)
+void
+ConnectRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
{
{
Lock sync(*this);
if(!initialized())
{
- Request req;
- req.batchOut = out;
- _requests.push_back(req);
- return AsyncStatusQueued;
+ for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ if(p->outAsync.get() == outAsync.get())
+ {
+ Ice::InvocationTimeoutException ex(__FILE__, __LINE__);
+ outAsync->__finished(ex, false);
+ _requests.erase(p);
+ return;
+ }
+ }
+ assert(false); // The request has to be queued if it timed out and we're not initialized yet.
}
}
- return _connection->flushAsyncBatchRequests(out);
+ _connection->asyncRequestTimedOut(outAsync);
}
Ice::ConnectionIPtr
@@ -413,16 +437,13 @@ ConnectRequestHandler::flushRequests()
Request& req = _requests.front();
if(req.out)
{
- if(_connection->sendAsyncRequest(req.out, _compress, _response) & AsyncStatusInvokeSentCallback)
- {
- sentCallbacks.push_back(req.out);
- }
+ req.out->send(_connection, _compress, _response);
}
- else if(req.batchOut)
+ else if(req.outAsync)
{
- if(_connection->flushAsyncBatchRequests(req.batchOut) & AsyncStatusInvokeSentCallback)
+ if(req.outAsync->__send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback)
{
- sentCallbacks.push_back(req.batchOut);
+ sentCallbacks.push_back(req.outAsync);
}
}
else
@@ -503,11 +524,11 @@ ConnectRequestHandler::flushRequestsWithException(const Ice::LocalException& ex)
{
if(p->out)
{
- p->out->__finished(ex, false);
+ p->out->finished(ex, false);
}
- else if(p->batchOut)
- {
- p->batchOut->__finished(ex, false);
+ else if(p->outAsync)
+ {
+ p->outAsync->__finished(ex, false);
}
else
{
@@ -525,11 +546,27 @@ ConnectRequestHandler::flushRequestsWithException(const LocalExceptionWrapper& e
{
if(p->out)
{
- p->out->__finished(ex);
+ Outgoing* out = dynamic_cast<Outgoing*>(p->out);
+ if(out)
+ {
+ out->finished(ex);
+ }
+ else
+ {
+ p->out->finished(*ex.get(), false);
+ }
}
- else if(p->batchOut)
- {
- p->batchOut->__finished(*ex.get(), false);
+ else if(p->outAsync)
+ {
+ OutgoingAsync* outAsync = dynamic_cast<OutgoingAsync*>(p->outAsync.get());
+ if(outAsync)
+ {
+ outAsync->__finished(ex);
+ }
+ else
+ {
+ p->outAsync->__finished(*ex.get(), false);
+ }
}
else
{