summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectRequestHandler.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
commita4f93259dc3494d98addf38e69b87eb557d432b3 (patch)
treed2b78bb5cea24e33dc1b46be22dba6167e96c9ed /cpp/src/Ice/ConnectRequestHandler.cpp
parentFix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff)
downloadice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r--cpp/src/Ice/ConnectRequestHandler.cpp202
1 files changed, 86 insertions, 116 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp
index 8563775deee..7ff4609d924 100644
--- a/cpp/src/Ice/ConnectRequestHandler.cpp
+++ b/cpp/src/Ice/ConnectRequestHandler.cpp
@@ -29,58 +29,26 @@ class FlushRequestsWithException : public DispatchWorkItem
{
public:
- FlushRequestsWithException(const InstancePtr& instance,
- const ConnectRequestHandlerPtr& handler,
- const Ice::LocalException& ex) :
- DispatchWorkItem(instance),
- _handler(handler),
- _exception(ex.ice_clone())
+ FlushRequestsWithException(const ConnectRequestHandlerPtr& handler) : _handler(handler)
{
}
virtual void
run()
{
- _handler->flushRequestsWithException(*_exception.get());
+ _handler->flushRequestsWithException();
}
private:
const ConnectRequestHandlerPtr _handler;
- const IceUtil::UniquePtr<Ice::LocalException> _exception;
-};
-
-class FlushRequestsWithExceptionWrapper : public DispatchWorkItem
-{
-public:
-
- FlushRequestsWithExceptionWrapper(const InstancePtr& instance,
- const ConnectRequestHandlerPtr& handler,
- const LocalExceptionWrapper& ex) :
- DispatchWorkItem(instance),
- _handler(handler),
- _exception(ex)
- {
- }
-
- virtual void
- run()
- {
- _handler->flushRequestsWithException(_exception);
- }
-
-private:
-
- const ConnectRequestHandlerPtr _handler;
- const LocalExceptionWrapper _exception;
};
class FlushSentRequests : public DispatchWorkItem
{
public:
- FlushSentRequests(const InstancePtr& instance, const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) :
- DispatchWorkItem(instance), _callbacks(callbacks)
+ FlushSentRequests(const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) : _callbacks(callbacks)
{
}
@@ -100,12 +68,9 @@ private:
};
-ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref,
- const Ice::ObjectPrx& proxy,
- const Handle< ::IceDelegate::Ice::Object>& delegate) :
+ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, const Ice::ObjectPrx& proxy) :
RequestHandler(ref),
_proxy(proxy),
- _delegate(delegate),
_batchAutoFlush(
ref->getInstance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0),
_initialized(false),
@@ -148,12 +113,19 @@ ConnectRequestHandler::prepareBatchRequest(BasicStream* os)
{
wait();
}
-
- if(!initialized())
+
+ try
{
- _batchRequestInProgress = true;
- _batchStream.swap(*os);
- return;
+ if(!initialized())
+ {
+ _batchRequestInProgress = true;
+ _batchStream.swap(*os);
+ return;
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ throw RetryException(ex);
}
}
_connection->prepareBatchRequest(os);
@@ -164,7 +136,7 @@ ConnectRequestHandler::finishBatchRequest(BasicStream* os)
{
{
Lock sync(*this);
- if(!initialized())
+ if(!initialized()) // This can't throw until _batchRequestInProgress = false
{
assert(_batchRequestInProgress);
_batchRequestInProgress = false;
@@ -196,7 +168,7 @@ ConnectRequestHandler::abortBatchRequest()
{
{
Lock sync(*this);
- if(!initialized())
+ if(!initialized()) // This can't throw until _batchRequestInProgress = false
{
assert(_batchRequestInProgress);
_batchRequestInProgress = false;
@@ -205,7 +177,6 @@ ConnectRequestHandler::abortBatchRequest()
BasicStream dummy(_reference->getInstance().get(), Ice::currentProtocolEncoding, _batchAutoFlush);
_batchStream.swap(dummy);
_batchRequestsSize = sizeof(requestBatchHdr);
-
return;
}
}
@@ -217,12 +188,19 @@ ConnectRequestHandler::sendRequest(OutgoingMessageCallback* out)
{
{
Lock sync(*this);
- if(!initialized())
+ try
{
- Request req;
- req.out = out;
- _requests.push_back(req);
- return false; // Not sent
+ if(!initialized())
+ {
+ Request req;
+ req.out = out;
+ _requests.push_back(req);
+ return false; // Not sent
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ throw RetryException(ex);
}
}
return out->send(_connection, _compress, _response) && !_response; // Finished if sent and no response.
@@ -233,12 +211,19 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& o
{
{
Lock sync(*this);
- if(!initialized())
+ try
{
- Request req;
- req.outAsync = out;
- _requests.push_back(req);
- return AsyncStatusQueued;
+ if(!initialized())
+ {
+ Request req;
+ req.outAsync = out;
+ _requests.push_back(req);
+ return AsyncStatusQueued;
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ throw RetryException(ex);
}
}
return out->__send(_connection, _compress, _response);
@@ -249,6 +234,11 @@ ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
{
{
Lock sync(*this);
+ if(_exception.get())
+ {
+ return; // The request has been notified of a failure already.
+ }
+
if(!initialized())
{
for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p)
@@ -270,35 +260,50 @@ ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out)
void
ConnectRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
{
+ bool timedOut = false;
{
Lock sync(*this);
+ if(_exception.get())
+ {
+ return; // The request has been notified of a failure already.
+ }
+
if(!initialized())
{
for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p)
{
if(p->outAsync.get() == outAsync.get())
{
- Ice::InvocationTimeoutException ex(__FILE__, __LINE__);
- outAsync->__finished(ex, false);
+ timedOut = true;
_requests.erase(p);
- return;
+ break;
}
}
- assert(false); // The request has to be queued if it timed out and we're not initialized yet.
}
}
+ if(timedOut)
+ {
+ Ice::InvocationTimeoutException ex(__FILE__, __LINE__);
+ outAsync->__finished(ex, false);
+ return;
+ }
_connection->asyncRequestTimedOut(outAsync);
}
-
+
Ice::ConnectionIPtr
ConnectRequestHandler::getConnection(bool waitInit)
{
if(waitInit)
{
+ Lock sync(*this);
+ if(_exception.get())
+ {
+ throw RetryException(*_exception.get());
+ }
+
//
// Wait for the connection establishment to complete or fail.
//
- Lock sync(*this);
while(!_initialized && !_exception.get())
{
wait();
@@ -354,7 +359,6 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex)
_exception.reset(ex.ice_clone());
_proxy = 0; // Break cyclic reference count.
- _delegate = 0; // Break cyclic reference count.
//
// If some requests were queued, we notify them of the failure. This is done from a thread
@@ -363,8 +367,7 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex)
//
if(!_requests.empty())
{
- const InstancePtr instance = _reference->getInstance();
- instance->clientThreadPool()->execute(new FlushRequestsWithException(instance, this, ex));
+ _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this));
}
notifyAll();
@@ -468,31 +471,35 @@ ConnectRequestHandler::flushRequests()
_requests.pop_front();
}
}
- catch(const LocalExceptionWrapper& ex)
+ catch(const RetryException& ex)
{
+ //
+ // If the connection dies shortly after connection
+ // establishment, we don't systematically retry on
+ // RetryException. We handle the exception like it
+ // was an exception that occured while sending the
+ // request.
+ //
Lock sync(*this);
assert(!_exception.get() && !_requests.empty());
_exception.reset(ex.get()->ice_clone());
- const InstancePtr instance = _reference->getInstance();
- instance->clientThreadPool()->execute(new FlushRequestsWithExceptionWrapper(instance, this, ex));
+ _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this));
}
catch(const Ice::LocalException& ex)
{
Lock sync(*this);
assert(!_exception.get() && !_requests.empty());
_exception.reset(ex.ice_clone());
- const InstancePtr instance = _reference->getInstance();
- instance->clientThreadPool()->execute(new FlushRequestsWithException(instance, this, ex));
+ _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this));
}
if(!sentCallbacks.empty())
{
- const InstancePtr instance = _reference->getInstance();
- instance->clientThreadPool()->execute(new FlushSentRequests(instance, sentCallbacks));
+ _reference->getInstance()->clientThreadPool()->execute(new FlushSentRequests(sentCallbacks));
}
//
- // We've finished sending the queued requests and the request handler now send
+ // We've finished sending the queued requests and the request handler now sends
// the requests over the connection directly. It's time to substitute the
// request handler of the proxy with the more efficient connection request
// handler which does not have any synchronization. This also breaks the cyclic
@@ -500,7 +507,7 @@ ConnectRequestHandler::flushRequests()
//
if(_updateRequestHandler && !_exception.get())
{
- _proxy->__setRequestHandler(_delegate, new ConnectionRequestHandler(_reference, _connection, _compress));
+ _proxy->__setRequestHandler(this, new ConnectionRequestHandler(_reference, _connection, _compress));
}
{
@@ -512,61 +519,24 @@ ConnectRequestHandler::flushRequests()
_flushing = false;
}
_proxy = 0; // Break cyclic reference count.
- _delegate = 0; // Break cyclic reference count.
notifyAll();
}
}
void
-ConnectRequestHandler::flushRequestsWithException(const Ice::LocalException& ex)
+ConnectRequestHandler::flushRequestsWithException()
{
- for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- if(p->out)
- {
- p->out->finished(ex, false);
- }
- else if(p->outAsync)
- {
- p->outAsync->__finished(ex, false);
- }
- else
- {
- assert(p->os);
- delete p->os;
- }
- }
- _requests.clear();
-}
+ assert(_exception.get());
-void
-ConnectRequestHandler::flushRequestsWithException(const LocalExceptionWrapper& ex)
-{
for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
{
if(p->out)
{
- Outgoing* out = dynamic_cast<Outgoing*>(p->out);
- if(out)
- {
- out->finished(ex);
- }
- else
- {
- p->out->finished(*ex.get(), false);
- }
+ p->out->finished(*_exception.get(), false);
}
else if(p->outAsync)
{
- OutgoingAsync* outAsync = dynamic_cast<OutgoingAsync*>(p->outAsync.get());
- if(outAsync)
- {
- outAsync->__finished(ex);
- }
- else
- {
- p->outAsync->__finished(*ex.get(), false);
- }
+ p->outAsync->__finished(*_exception.get(), false);
}
else
{