diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
commit | a4f93259dc3494d98addf38e69b87eb557d432b3 (patch) | |
tree | d2b78bb5cea24e33dc1b46be22dba6167e96c9ed /cpp/src/Ice/ConnectRequestHandler.cpp | |
parent | Fix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff) | |
download | ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2 ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip |
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'cpp/src/Ice/ConnectRequestHandler.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.cpp | 202 |
1 files changed, 86 insertions, 116 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 8563775deee..7ff4609d924 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -29,58 +29,26 @@ class FlushRequestsWithException : public DispatchWorkItem { public: - FlushRequestsWithException(const InstancePtr& instance, - const ConnectRequestHandlerPtr& handler, - const Ice::LocalException& ex) : - DispatchWorkItem(instance), - _handler(handler), - _exception(ex.ice_clone()) + FlushRequestsWithException(const ConnectRequestHandlerPtr& handler) : _handler(handler) { } virtual void run() { - _handler->flushRequestsWithException(*_exception.get()); + _handler->flushRequestsWithException(); } private: const ConnectRequestHandlerPtr _handler; - const IceUtil::UniquePtr<Ice::LocalException> _exception; -}; - -class FlushRequestsWithExceptionWrapper : public DispatchWorkItem -{ -public: - - FlushRequestsWithExceptionWrapper(const InstancePtr& instance, - const ConnectRequestHandlerPtr& handler, - const LocalExceptionWrapper& ex) : - DispatchWorkItem(instance), - _handler(handler), - _exception(ex) - { - } - - virtual void - run() - { - _handler->flushRequestsWithException(_exception); - } - -private: - - const ConnectRequestHandlerPtr _handler; - const LocalExceptionWrapper _exception; }; class FlushSentRequests : public DispatchWorkItem { public: - FlushSentRequests(const InstancePtr& instance, const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) : - DispatchWorkItem(instance), _callbacks(callbacks) + FlushSentRequests(const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) : _callbacks(callbacks) { } @@ -100,12 +68,9 @@ private: }; -ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, - const Ice::ObjectPrx& proxy, - const Handle< ::IceDelegate::Ice::Object>& delegate) : +ConnectRequestHandler::ConnectRequestHandler(const ReferencePtr& ref, const Ice::ObjectPrx& proxy) : RequestHandler(ref), _proxy(proxy), - _delegate(delegate), _batchAutoFlush( ref->getInstance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0), _initialized(false), @@ -148,12 +113,19 @@ ConnectRequestHandler::prepareBatchRequest(BasicStream* os) { wait(); } - - if(!initialized()) + + try { - _batchRequestInProgress = true; - _batchStream.swap(*os); - return; + if(!initialized()) + { + _batchRequestInProgress = true; + _batchStream.swap(*os); + return; + } + } + catch(const Ice::LocalException& ex) + { + throw RetryException(ex); } } _connection->prepareBatchRequest(os); @@ -164,7 +136,7 @@ ConnectRequestHandler::finishBatchRequest(BasicStream* os) { { Lock sync(*this); - if(!initialized()) + if(!initialized()) // This can't throw until _batchRequestInProgress = false { assert(_batchRequestInProgress); _batchRequestInProgress = false; @@ -196,7 +168,7 @@ ConnectRequestHandler::abortBatchRequest() { { Lock sync(*this); - if(!initialized()) + if(!initialized()) // This can't throw until _batchRequestInProgress = false { assert(_batchRequestInProgress); _batchRequestInProgress = false; @@ -205,7 +177,6 @@ ConnectRequestHandler::abortBatchRequest() BasicStream dummy(_reference->getInstance().get(), Ice::currentProtocolEncoding, _batchAutoFlush); _batchStream.swap(dummy); _batchRequestsSize = sizeof(requestBatchHdr); - return; } } @@ -217,12 +188,19 @@ ConnectRequestHandler::sendRequest(OutgoingMessageCallback* out) { { Lock sync(*this); - if(!initialized()) + try { - Request req; - req.out = out; - _requests.push_back(req); - return false; // Not sent + if(!initialized()) + { + Request req; + req.out = out; + _requests.push_back(req); + return false; // Not sent + } + } + catch(const Ice::LocalException& ex) + { + throw RetryException(ex); } } return out->send(_connection, _compress, _response) && !_response; // Finished if sent and no response. @@ -233,12 +211,19 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& o { { Lock sync(*this); - if(!initialized()) + try { - Request req; - req.outAsync = out; - _requests.push_back(req); - return AsyncStatusQueued; + if(!initialized()) + { + Request req; + req.outAsync = out; + _requests.push_back(req); + return AsyncStatusQueued; + } + } + catch(const Ice::LocalException& ex) + { + throw RetryException(ex); } } return out->__send(_connection, _compress, _response); @@ -249,6 +234,11 @@ ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out) { { Lock sync(*this); + if(_exception.get()) + { + return; // The request has been notified of a failure already. + } + if(!initialized()) { for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p) @@ -270,35 +260,50 @@ ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out) void ConnectRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) { + bool timedOut = false; { Lock sync(*this); + if(_exception.get()) + { + return; // The request has been notified of a failure already. + } + if(!initialized()) { for(deque<Request>::iterator p = _requests.begin(); p != _requests.end(); ++p) { if(p->outAsync.get() == outAsync.get()) { - Ice::InvocationTimeoutException ex(__FILE__, __LINE__); - outAsync->__finished(ex, false); + timedOut = true; _requests.erase(p); - return; + break; } } - assert(false); // The request has to be queued if it timed out and we're not initialized yet. } } + if(timedOut) + { + Ice::InvocationTimeoutException ex(__FILE__, __LINE__); + outAsync->__finished(ex, false); + return; + } _connection->asyncRequestTimedOut(outAsync); } - + Ice::ConnectionIPtr ConnectRequestHandler::getConnection(bool waitInit) { if(waitInit) { + Lock sync(*this); + if(_exception.get()) + { + throw RetryException(*_exception.get()); + } + // // Wait for the connection establishment to complete or fail. // - Lock sync(*this); while(!_initialized && !_exception.get()) { wait(); @@ -354,7 +359,6 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex) _exception.reset(ex.ice_clone()); _proxy = 0; // Break cyclic reference count. - _delegate = 0; // Break cyclic reference count. // // If some requests were queued, we notify them of the failure. This is done from a thread @@ -363,8 +367,7 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex) // if(!_requests.empty()) { - const InstancePtr instance = _reference->getInstance(); - instance->clientThreadPool()->execute(new FlushRequestsWithException(instance, this, ex)); + _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this)); } notifyAll(); @@ -468,31 +471,35 @@ ConnectRequestHandler::flushRequests() _requests.pop_front(); } } - catch(const LocalExceptionWrapper& ex) + catch(const RetryException& ex) { + // + // If the connection dies shortly after connection + // establishment, we don't systematically retry on + // RetryException. We handle the exception like it + // was an exception that occured while sending the + // request. + // Lock sync(*this); assert(!_exception.get() && !_requests.empty()); _exception.reset(ex.get()->ice_clone()); - const InstancePtr instance = _reference->getInstance(); - instance->clientThreadPool()->execute(new FlushRequestsWithExceptionWrapper(instance, this, ex)); + _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this)); } catch(const Ice::LocalException& ex) { Lock sync(*this); assert(!_exception.get() && !_requests.empty()); _exception.reset(ex.ice_clone()); - const InstancePtr instance = _reference->getInstance(); - instance->clientThreadPool()->execute(new FlushRequestsWithException(instance, this, ex)); + _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this)); } if(!sentCallbacks.empty()) { - const InstancePtr instance = _reference->getInstance(); - instance->clientThreadPool()->execute(new FlushSentRequests(instance, sentCallbacks)); + _reference->getInstance()->clientThreadPool()->execute(new FlushSentRequests(sentCallbacks)); } // - // We've finished sending the queued requests and the request handler now send + // We've finished sending the queued requests and the request handler now sends // the requests over the connection directly. It's time to substitute the // request handler of the proxy with the more efficient connection request // handler which does not have any synchronization. This also breaks the cyclic @@ -500,7 +507,7 @@ ConnectRequestHandler::flushRequests() // if(_updateRequestHandler && !_exception.get()) { - _proxy->__setRequestHandler(_delegate, new ConnectionRequestHandler(_reference, _connection, _compress)); + _proxy->__setRequestHandler(this, new ConnectionRequestHandler(_reference, _connection, _compress)); } { @@ -512,61 +519,24 @@ ConnectRequestHandler::flushRequests() _flushing = false; } _proxy = 0; // Break cyclic reference count. - _delegate = 0; // Break cyclic reference count. notifyAll(); } } void -ConnectRequestHandler::flushRequestsWithException(const Ice::LocalException& ex) +ConnectRequestHandler::flushRequestsWithException() { - for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) - { - if(p->out) - { - p->out->finished(ex, false); - } - else if(p->outAsync) - { - p->outAsync->__finished(ex, false); - } - else - { - assert(p->os); - delete p->os; - } - } - _requests.clear(); -} + assert(_exception.get()); -void -ConnectRequestHandler::flushRequestsWithException(const LocalExceptionWrapper& ex) -{ for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { if(p->out) { - Outgoing* out = dynamic_cast<Outgoing*>(p->out); - if(out) - { - out->finished(ex); - } - else - { - p->out->finished(*ex.get(), false); - } + p->out->finished(*_exception.get(), false); } else if(p->outAsync) { - OutgoingAsync* outAsync = dynamic_cast<OutgoingAsync*>(p->outAsync.get()); - if(outAsync) - { - outAsync->__finished(ex); - } - else - { - p->outAsync->__finished(*ex.get(), false); - } + p->outAsync->__finished(*_exception.get(), false); } else { |