diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-11-27 14:34:15 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-11-27 14:34:15 +0100 |
commit | 08c354c1cb70485c1f0ed83f814b04383a24e233 (patch) | |
tree | 0229ebe7cf0cb38b8a00a34df7cfd8a90ddf8d3a /cpp/src | |
parent | ICE-5995 Use variable GRADLE not gradlew in makefile (diff) | |
download | ice-08c354c1cb70485c1f0ed83f814b04383a24e233.tar.bz2 ice-08c354c1cb70485c1f0ed83f814b04383a24e233.tar.xz ice-08c354c1cb70485c1f0ed83f814b04383a24e233.zip |
Fixed ICE-5985: Java background test failures
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.cpp | 195 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectRequestHandler.h | 2 | ||||
-rw-r--r-- | cpp/src/Ice/Outgoing.cpp | 28 | ||||
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 30 | ||||
-rw-r--r-- | cpp/src/Ice/RequestHandlerFactory.cpp | 3 |
5 files changed, 154 insertions, 104 deletions
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 8c3b37d0cb5..cab236aa9e5 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -336,7 +336,41 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex) _proxies.clear(); _proxy = 0; // Break cyclic reference count. - flushRequestsWithException(); + // + // NOTE: remove the request handler *before* notifying the + // requests that the connection failed. It's important to ensure + // that future invocations will obtain a new connect request + // handler once invocations are notified. + // + try + { + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); + } + catch(const Ice::CommunicatorDestroyedException&) + { + // Ignore + } + + for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + if(p->out) + { + p->out->completed(*_exception.get()); + } + else if(p->outAsync) + { + if(p->outAsync->completed(*_exception.get())) + { + p->outAsync->invokeCompletedAsync(); + } + } + else + { + assert(p->os); + delete p->os; + } + } + _requests.clear(); notifyAll(); } @@ -399,40 +433,66 @@ ConnectRequestHandler::flushRequests() _flushing = true; } - try + while(!_requests.empty()) // _requests is immutable when _flushing = true { - while(!_requests.empty()) // _requests is immutable when _flushing = true + Request& req = _requests.front(); + if(req.out) { - Request& req = _requests.front(); - if(req.out) + try + { + req.out->send(_connection, _compress, _response); + } + catch(const RetryException& ex) { try { - req.out->send(_connection, _compress, _response); + // Remove the request handler before retrying. + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); } - catch(const Ice::DatagramLimitException& ex) + catch(const Ice::CommunicatorDestroyedException&) + { + // Ignore + } + req.out->retryException(*ex.get()); + } + catch(const Ice::Exception& ex) + { + req.out->completed(ex); + } + } + else if(req.outAsync) + { + try + { + if(req.outAsync->send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) { - req.out->completed(ex); + req.outAsync->invokeSentAsync(); } } - else if(req.outAsync) + catch(const RetryException& ex) { try { - if(req.outAsync->send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) - { - req.outAsync->invokeSentAsync(); - } + // Remove the request handler before retrying. + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); } - catch(const Ice::DatagramLimitException& ex) + catch(const Ice::CommunicatorDestroyedException&) { - if(req.outAsync->completed(ex)) - { - req.outAsync->invokeCompletedAsync(); - } + // Ignore } + req.outAsync->retryException(*ex.get()); } - else + catch(const Ice::Exception& ex) + { + if(req.outAsync->completed(ex)) + { + req.outAsync->invokeCompletedAsync(); + } + } + } + else + { + try { BasicStream os(req.os->instance(), Ice::currentProtocolEncoding); _connection->prepareBatchRequest(&os); @@ -452,29 +512,16 @@ ConnectRequestHandler::flushRequests() _connection->finishBatchRequest(&os, _compress); delete req.os; } - _requests.pop_front(); + catch(const RetryException&) + { + delete req.os; + } + catch(const Ice::Exception&) + { + delete req.os; + } } - } - catch(const RetryException& ex) - { - // - // If the connection dies shortly after connection - // establishment, we don't systematically retry on - // RetryException. We handle the exception like it - // was an exception that occured while sending the - // request. - // - Lock sync(*this); - assert(!_exception.get() && !_requests.empty()); - _exception.reset(ex.get()->ice_clone()); - flushRequestsWithException(); - } - catch(const Ice::LocalException& ex) - { - Lock sync(*this); - assert(!_exception.get() && !_requests.empty()); - _exception.reset(ex.ice_clone()); - flushRequestsWithException(); + _requests.pop_front(); } // @@ -483,7 +530,7 @@ ConnectRequestHandler::flushRequests() // request handler to use the more efficient connection request // handler. // - if(_reference->getCacheConnection() && !_exception.get()) + if(_reference->getCacheConnection()) { _connectionRequestHandler = new ConnectionRequestHandler(_reference, _connection, _compress); for(set<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p) @@ -495,18 +542,19 @@ ConnectRequestHandler::flushRequests() { Lock sync(*this); assert(!_initialized); - if(!_exception.get()) + _initialized = true; + _flushing = false; + try { - _initialized = true; - _flushing = false; - try - { - _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); - } - catch(const Ice::CommunicatorDestroyedException&) - { - // Ignore - } + // + // Only remove once all the requests are flushed to + // guarantee serialization. + // + _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); + } + catch(const Ice::CommunicatorDestroyedException&) + { + // Ignore } _proxies.clear(); _proxy = 0; // Break cyclic reference count. @@ -514,44 +562,3 @@ ConnectRequestHandler::flushRequests() } } -void -ConnectRequestHandler::flushRequestsWithException() -{ - assert(_exception.get()); - - // - // NOTE: remove the request handler *before* notifying the - // requests that the connection failed. It's important to ensure - // that future invocations will obtain a new connect request - // handler once invocations are notified. - // - try - { - _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this); - } - catch(const Ice::CommunicatorDestroyedException&) - { - // Ignore - } - - for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) - { - if(p->out) - { - p->out->completed(*_exception.get()); - } - else if(p->outAsync) - { - if(p->outAsync->completed(*_exception.get())) - { - p->outAsync->invokeCompletedAsync(); - } - } - else - { - assert(p->os); - delete p->os; - } - } - _requests.clear(); -} diff --git a/cpp/src/Ice/ConnectRequestHandler.h b/cpp/src/Ice/ConnectRequestHandler.h index 76f93803325..9bfb41a12f4 100644 --- a/cpp/src/Ice/ConnectRequestHandler.h +++ b/cpp/src/Ice/ConnectRequestHandler.h @@ -58,8 +58,6 @@ public: virtual void addedProxy(); - void flushRequestsWithException(); - private: bool initialized(); diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index 5f0361ba1ba..ee9464c28a8 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -196,6 +196,15 @@ Outgoing::completed(const Ice::Exception& ex) _monitor.notify(); } +void +Outgoing::retryException(const Ice::Exception&) +{ + Monitor<Mutex>::Lock sync(_monitor); + assert(_state <= StateInProgress); + _state = StateRetry; + _monitor.notify(); +} + bool Outgoing::invoke() { @@ -258,8 +267,12 @@ Outgoing::invoke() { Time now = Time::now(Time::Monotonic); timedOut = now >= _invocationTimeoutDeadline; - while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut) + while((_state == StateInProgress || !_sent) && _state != StateFailed && _state != StateRetry) { + if(timedOut) + { + break; + } _monitor.timedWait(_invocationTimeoutDeadline - now); if((_state == StateInProgress || !_sent) && _state != StateFailed) @@ -271,7 +284,7 @@ Outgoing::invoke() } else { - while((_state == StateInProgress || !_sent) && _state != StateFailed) + while((_state == StateInProgress || !_sent) && _state != StateFailed && _state != StateRetry) { _monitor.wait(); } @@ -305,6 +318,11 @@ Outgoing::invoke() { _exception->ice_throw(); } + else if(_state == StateRetry) + { + _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. + continue; + } else { assert(_state != StateInProgress); @@ -715,3 +733,9 @@ FlushBatch::completed(const Ice::Exception& ex) _exception.reset(ex.ice_clone()); _monitor.notify(); } + +void +FlushBatch::retryException(const Ice::Exception& ex) +{ + completed(ex); +} diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 1719adc6b35..518867e06f2 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -40,6 +40,12 @@ OutgoingAsyncBase::completed(const Exception& ex) return finished(ex); } +void +OutgoingAsyncBase::retryException(const Exception& ex) +{ + assert(false); +} + OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator, const InstancePtr& instance, const string& operation, @@ -112,6 +118,22 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc) } } +void +ProxyOutgoingAsyncBase::retryException(const Exception& ex) +{ + try + { + handleRetryException(ex); + _instance->retryQueue()->add(this, 0); + } + catch(const Ice::Exception& exc) + { + if(completed(exc)) + { + invokeCompletedAsync(); + } + } +} void ProxyOutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler) @@ -212,7 +234,7 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread) } catch(const RetryException& ex) { - handleRetryException(ex); + handleRetryException(*ex.get()); } catch(const Exception& ex) { @@ -286,7 +308,7 @@ ProxyOutgoingAsyncBase::finished(bool ok) } void -ProxyOutgoingAsyncBase::handleRetryException(const RetryException& exc) +ProxyOutgoingAsyncBase::handleRetryException(const Ice::Exception&) { _proxy->__setRequestHandler(_handler, 0); // Clear request handler and always retry. } @@ -647,10 +669,10 @@ ProxyFlushBatch::invoke() } void -ProxyFlushBatch::handleRetryException(const RetryException& ex) +ProxyFlushBatch::handleRetryException(const Ice::Exception& ex) { _proxy->__setRequestHandler(_handler, 0); // Clear request handler - ex.get()->ice_throw(); // No retries, we want to notify the user of potentially lost batch requests + ex.ice_throw(); // No retries, we want to notify the user of potentially lost batch requests } int diff --git a/cpp/src/Ice/RequestHandlerFactory.cpp b/cpp/src/Ice/RequestHandlerFactory.cpp index e21344c98d9..3a30cc8b182 100644 --- a/cpp/src/Ice/RequestHandlerFactory.cpp +++ b/cpp/src/Ice/RequestHandlerFactory.cpp @@ -61,8 +61,7 @@ IceInternal::RequestHandlerFactory::removeRequestHandler(const ReferencePtr& ref { Lock sync(*this); map<ReferencePtr, RequestHandlerPtr>::iterator p = _handlers.find(ref); - assert(p != _handlers.end() && p->second.get() == handler.get()); - if(p != _handlers.end()) + if(p != _handlers.end() && p->second.get() == handler.get()) { _handlers.erase(p); } |