summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectRequestHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r--cpp/src/Ice/ConnectRequestHandler.cpp195
1 files changed, 101 insertions, 94 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp
index 8c3b37d0cb5..cab236aa9e5 100644
--- a/cpp/src/Ice/ConnectRequestHandler.cpp
+++ b/cpp/src/Ice/ConnectRequestHandler.cpp
@@ -336,7 +336,41 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex)
_proxies.clear();
_proxy = 0; // Break cyclic reference count.
- flushRequestsWithException();
+ //
+ // NOTE: remove the request handler *before* notifying the
+ // requests that the connection failed. It's important to ensure
+ // that future invocations will obtain a new connect request
+ // handler once invocations are notified.
+ //
+ try
+ {
+ _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
+ }
+ catch(const Ice::CommunicatorDestroyedException&)
+ {
+ // Ignore
+ }
+
+ for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ if(p->out)
+ {
+ p->out->completed(*_exception.get());
+ }
+ else if(p->outAsync)
+ {
+ if(p->outAsync->completed(*_exception.get()))
+ {
+ p->outAsync->invokeCompletedAsync();
+ }
+ }
+ else
+ {
+ assert(p->os);
+ delete p->os;
+ }
+ }
+ _requests.clear();
notifyAll();
}
@@ -399,40 +433,66 @@ ConnectRequestHandler::flushRequests()
_flushing = true;
}
- try
+ while(!_requests.empty()) // _requests is immutable when _flushing = true
{
- while(!_requests.empty()) // _requests is immutable when _flushing = true
+ Request& req = _requests.front();
+ if(req.out)
{
- Request& req = _requests.front();
- if(req.out)
+ try
+ {
+ req.out->send(_connection, _compress, _response);
+ }
+ catch(const RetryException& ex)
{
try
{
- req.out->send(_connection, _compress, _response);
+ // Remove the request handler before retrying.
+ _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
}
- catch(const Ice::DatagramLimitException& ex)
+ catch(const Ice::CommunicatorDestroyedException&)
+ {
+ // Ignore
+ }
+ req.out->retryException(*ex.get());
+ }
+ catch(const Ice::Exception& ex)
+ {
+ req.out->completed(ex);
+ }
+ }
+ else if(req.outAsync)
+ {
+ try
+ {
+ if(req.outAsync->send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback)
{
- req.out->completed(ex);
+ req.outAsync->invokeSentAsync();
}
}
- else if(req.outAsync)
+ catch(const RetryException& ex)
{
try
{
- if(req.outAsync->send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback)
- {
- req.outAsync->invokeSentAsync();
- }
+ // Remove the request handler before retrying.
+ _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
}
- catch(const Ice::DatagramLimitException& ex)
+ catch(const Ice::CommunicatorDestroyedException&)
{
- if(req.outAsync->completed(ex))
- {
- req.outAsync->invokeCompletedAsync();
- }
+ // Ignore
}
+ req.outAsync->retryException(*ex.get());
}
- else
+ catch(const Ice::Exception& ex)
+ {
+ if(req.outAsync->completed(ex))
+ {
+ req.outAsync->invokeCompletedAsync();
+ }
+ }
+ }
+ else
+ {
+ try
{
BasicStream os(req.os->instance(), Ice::currentProtocolEncoding);
_connection->prepareBatchRequest(&os);
@@ -452,29 +512,16 @@ ConnectRequestHandler::flushRequests()
_connection->finishBatchRequest(&os, _compress);
delete req.os;
}
- _requests.pop_front();
+ catch(const RetryException&)
+ {
+ delete req.os;
+ }
+ catch(const Ice::Exception&)
+ {
+ delete req.os;
+ }
}
- }
- catch(const RetryException& ex)
- {
- //
- // If the connection dies shortly after connection
- // establishment, we don't systematically retry on
- // RetryException. We handle the exception like it
- // was an exception that occured while sending the
- // request.
- //
- Lock sync(*this);
- assert(!_exception.get() && !_requests.empty());
- _exception.reset(ex.get()->ice_clone());
- flushRequestsWithException();
- }
- catch(const Ice::LocalException& ex)
- {
- Lock sync(*this);
- assert(!_exception.get() && !_requests.empty());
- _exception.reset(ex.ice_clone());
- flushRequestsWithException();
+ _requests.pop_front();
}
//
@@ -483,7 +530,7 @@ ConnectRequestHandler::flushRequests()
// request handler to use the more efficient connection request
// handler.
//
- if(_reference->getCacheConnection() && !_exception.get())
+ if(_reference->getCacheConnection())
{
_connectionRequestHandler = new ConnectionRequestHandler(_reference, _connection, _compress);
for(set<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p)
@@ -495,18 +542,19 @@ ConnectRequestHandler::flushRequests()
{
Lock sync(*this);
assert(!_initialized);
- if(!_exception.get())
+ _initialized = true;
+ _flushing = false;
+ try
{
- _initialized = true;
- _flushing = false;
- try
- {
- _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
- }
- catch(const Ice::CommunicatorDestroyedException&)
- {
- // Ignore
- }
+ //
+ // Only remove once all the requests are flushed to
+ // guarantee serialization.
+ //
+ _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
+ }
+ catch(const Ice::CommunicatorDestroyedException&)
+ {
+ // Ignore
}
_proxies.clear();
_proxy = 0; // Break cyclic reference count.
@@ -514,44 +562,3 @@ ConnectRequestHandler::flushRequests()
}
}
-void
-ConnectRequestHandler::flushRequestsWithException()
-{
- assert(_exception.get());
-
- //
- // NOTE: remove the request handler *before* notifying the
- // requests that the connection failed. It's important to ensure
- // that future invocations will obtain a new connect request
- // handler once invocations are notified.
- //
- try
- {
- _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
- }
- catch(const Ice::CommunicatorDestroyedException&)
- {
- // Ignore
- }
-
- for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- if(p->out)
- {
- p->out->completed(*_exception.get());
- }
- else if(p->outAsync)
- {
- if(p->outAsync->completed(*_exception.get()))
- {
- p->outAsync->invokeCompletedAsync();
- }
- }
- else
- {
- assert(p->os);
- delete p->os;
- }
- }
- _requests.clear();
-}