diff options
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.cpp | 195 |
1 files changed, 101 insertions, 94 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 8c3b37d0cb5..cab236aa9e5 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -336,7 +336,41 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex) _proxies.clear(); _proxy = 0; // Break cyclic reference count. - flushRequestsWithException(); + // + // NOTE: remove the request handler *before* notifying the + // requests that the connection failed. It's important to ensure + // that future invocations will obtain a new connect request + // handler once invocations are notified. + // + try + { + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); + } + catch(const Ice::CommunicatorDestroyedException&) + { + // Ignore + } + + for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + if(p->out) + { + p->out->completed(*_exception.get()); + } + else if(p->outAsync) + { + if(p->outAsync->completed(*_exception.get())) + { + p->outAsync->invokeCompletedAsync(); + } + } + else + { + assert(p->os); + delete p->os; + } + } + _requests.clear(); notifyAll(); } @@ -399,40 +433,66 @@ ConnectRequestHandler::flushRequests() _flushing = true; } - try + while(!_requests.empty()) // _requests is immutable when _flushing = true { - while(!_requests.empty()) // _requests is immutable when _flushing = true + Request& req = _requests.front(); + if(req.out) { - Request& req = _requests.front(); - if(req.out) + try + { + req.out->send(_connection, _compress, _response); + } + catch(const RetryException& ex) { try { - req.out->send(_connection, _compress, _response); + // Remove the request handler before retrying. + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); } - catch(const Ice::DatagramLimitException& ex) + catch(const Ice::CommunicatorDestroyedException&) + { + // Ignore + } + req.out->retryException(*ex.get()); + } + catch(const Ice::Exception& ex) + { + req.out->completed(ex); + } + } + else if(req.outAsync) + { + try + { + if(req.outAsync->send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) { - req.out->completed(ex); + req.outAsync->invokeSentAsync(); } } - else if(req.outAsync) + catch(const RetryException& ex) { try { - if(req.outAsync->send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) - { - req.outAsync->invokeSentAsync(); - } + // Remove the request handler before retrying. + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); } - catch(const Ice::DatagramLimitException& ex) + catch(const Ice::CommunicatorDestroyedException&) { - if(req.outAsync->completed(ex)) - { - req.outAsync->invokeCompletedAsync(); - } + // Ignore } + req.outAsync->retryException(*ex.get()); } - else + catch(const Ice::Exception& ex) + { + if(req.outAsync->completed(ex)) + { + req.outAsync->invokeCompletedAsync(); + } + } + } + else + { + try { BasicStream os(req.os->instance(), Ice::currentProtocolEncoding); _connection->prepareBatchRequest(&os); @@ -452,29 +512,16 @@ ConnectRequestHandler::flushRequests() _connection->finishBatchRequest(&os, _compress); delete req.os; } - _requests.pop_front(); + catch(const RetryException&) + { + delete req.os; + } + catch(const Ice::Exception&) + { + delete req.os; + } } - } - catch(const RetryException& ex) - { - // - // If the connection dies shortly after connection - // establishment, we don't systematically retry on - // RetryException. We handle the exception like it - // was an exception that occured while sending the - // request. - // - Lock sync(*this); - assert(!_exception.get() && !_requests.empty()); - _exception.reset(ex.get()->ice_clone()); - flushRequestsWithException(); - } - catch(const Ice::LocalException& ex) - { - Lock sync(*this); - assert(!_exception.get() && !_requests.empty()); - _exception.reset(ex.ice_clone()); - flushRequestsWithException(); + _requests.pop_front(); } // @@ -483,7 +530,7 @@ ConnectRequestHandler::flushRequests() // request handler to use the more efficient connection request // handler. // - if(_reference->getCacheConnection() && !_exception.get()) + if(_reference->getCacheConnection()) { _connectionRequestHandler = new ConnectionRequestHandler(_reference, _connection, _compress); for(set<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p) @@ -495,18 +542,19 @@ ConnectRequestHandler::flushRequests() { Lock sync(*this); assert(!_initialized); - if(!_exception.get()) + _initialized = true; + _flushing = false; + try { - _initialized = true; - _flushing = false; - try - { - _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); - } - catch(const Ice::CommunicatorDestroyedException&) - { - // Ignore - } + // + // Only remove once all the requests are flushed to + // guarantee serialization. + // + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); + } + catch(const Ice::CommunicatorDestroyedException&) + { + // Ignore } _proxies.clear(); _proxy = 0; // Break cyclic reference count. @@ -514,44 +562,3 @@ ConnectRequestHandler::flushRequests() } } -void -ConnectRequestHandler::flushRequestsWithException() -{ - assert(_exception.get()); - - // - // NOTE: remove the request handler *before* notifying the - // requests that the connection failed. It's important to ensure - // that future invocations will obtain a new connect request - // handler once invocations are notified. - // - try - { - _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); - } - catch(const Ice::CommunicatorDestroyedException&) - { - // Ignore - } - - for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) - { - if(p->out) - { - p->out->completed(*_exception.get()); - } - else if(p->outAsync) - { - if(p->outAsync->completed(*_exception.get())) - { - p->outAsync->invokeCompletedAsync(); - } - } - else - { - assert(p->os); - delete p->os; - } - } - _requests.clear(); -} |