diff options
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.cpp | 117 |
1 files changed, 77 insertions, 40 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 6aee1ddb8c8..8563775deee 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -89,7 +89,7 @@ public: { for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = _callbacks.begin(); p != _callbacks.end(); ++p) { - (*p)->__sent(); + (*p)->__invokeSent(); } } @@ -212,58 +212,82 @@ ConnectRequestHandler::abortBatchRequest() _connection->abortBatchRequest(); } -Ice::ConnectionI* -ConnectRequestHandler::sendRequest(Outgoing* out) +bool +ConnectRequestHandler::sendRequest(OutgoingMessageCallback* out) { - // Must be called first, _compress might not be initialized before this returns. - Ice::ConnectionIPtr connection = getConnection(true); - assert(connection); - if(!connection->sendRequest(out, _compress, _response) || _response) - { - return _connection.get(); // The request hasn't been sent or we're expecting a response. - } - else { - return 0; // The request has been sent. + Lock sync(*this); + if(!initialized()) + { + Request req; + req.out = out; + _requests.push_back(req); + return false; // Not sent + } } + return out->send(_connection, _compress, _response) && !_response; // Finished if sent and no response. } AsyncStatus -ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncPtr& out) +ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& out) { { Lock sync(*this); if(!initialized()) { Request req; - req.out = out; + req.outAsync = out; _requests.push_back(req); return AsyncStatusQueued; } } - return _connection->sendAsyncRequest(out, _compress, _response); + return out->__send(_connection, _compress, _response); } -bool -ConnectRequestHandler::flushBatchRequests(BatchOutgoing* out) +void +ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out) { - return getConnection(true)->flushBatchRequests(out); + { + Lock sync(*this); + if(!initialized()) + { + for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p) + { + if(p->out == out) + { + Ice::InvocationTimeoutException ex(__FILE__, __LINE__); + out->finished(ex, false); + _requests.erase(p); + return; + } + } + assert(false); // The request has to be queued if it timed out and we're not initialized yet. + } + } + _connection->requestTimedOut(out); } -AsyncStatus -ConnectRequestHandler::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& out) +void +ConnectRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) { { Lock sync(*this); if(!initialized()) { - Request req; - req.batchOut = out; - _requests.push_back(req); - return AsyncStatusQueued; + for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p) + { + if(p->outAsync.get() == outAsync.get()) + { + Ice::InvocationTimeoutException ex(__FILE__, __LINE__); + outAsync->__finished(ex, false); + _requests.erase(p); + return; + } + } + assert(false); // The request has to be queued if it timed out and we're not initialized yet. } } - return _connection->flushAsyncBatchRequests(out); + _connection->asyncRequestTimedOut(outAsync); } Ice::ConnectionIPtr @@ -413,16 +437,13 @@ ConnectRequestHandler::flushRequests() Request& req = _requests.front(); if(req.out) { - if(_connection->sendAsyncRequest(req.out, _compress, _response) & AsyncStatusInvokeSentCallback) - { - sentCallbacks.push_back(req.out); - } + req.out->send(_connection, _compress, _response); } - else if(req.batchOut) + else if(req.outAsync) { - if(_connection->flushAsyncBatchRequests(req.batchOut) & AsyncStatusInvokeSentCallback) + if(req.outAsync->__send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) { - sentCallbacks.push_back(req.batchOut); + sentCallbacks.push_back(req.outAsync); } } else @@ -503,11 +524,11 @@ ConnectRequestHandler::flushRequestsWithException(const Ice::LocalException& ex) { if(p->out) { - p->out->__finished(ex, false); + p->out->finished(ex, false); } - else if(p->batchOut) - { - p->batchOut->__finished(ex, false); + else if(p->outAsync) + { + p->outAsync->__finished(ex, false); } else { @@ -525,11 +546,27 @@ ConnectRequestHandler::flushRequestsWithException(const LocalExceptionWrapper& e { if(p->out) { - p->out->__finished(ex); + Outgoing* out = dynamic_cast<Outgoing*>(p->out); + if(out) + { + out->finished(ex); + } + else + { + p->out->finished(*ex.get(), false); + } } - else if(p->batchOut) - { - p->batchOut->__finished(*ex.get(), false); + else if(p->outAsync) + { + OutgoingAsync* outAsync = dynamic_cast<OutgoingAsync*>(p->outAsync.get()); + if(outAsync) + { + outAsync->__finished(ex); + } + else + { + p->outAsync->__finished(*ex.get(), false); + } } else { |