diff options
Diffstat (limited to 'cpp/src/Ice/RetryQueue.cpp')
-rw-r--r-- | cpp/src/Ice/RetryQueue.cpp | 58 |
1 files changed, 46 insertions, 12 deletions
diff --git a/cpp/src/Ice/RetryQueue.cpp b/cpp/src/Ice/RetryQueue.cpp index adfb6bf9440..c70ed379edd 100644 --- a/cpp/src/Ice/RetryQueue.cpp +++ b/cpp/src/Ice/RetryQueue.cpp @@ -18,7 +18,7 @@ using namespace IceInternal; IceUtil::Shared* IceInternal::upCast(RetryQueue* p) { return p; } -IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const AsyncResultPtr& outAsync) : +IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const ProxyOutgoingAsyncBasePtr& outAsync) : _queue(queue), _outAsync(outAsync) { } @@ -26,14 +26,7 @@ IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const AsyncResultP void IceInternal::RetryTask::runTimerTask() { - try - { - _outAsync->__processRetry(); - } - catch(const Ice::LocalException& ex) - { - _outAsync->__invokeExceptionAsync(ex); - } + _outAsync->retry(); // Retry again the invocation. // // NOTE: this must be called last, destroy() blocks until all task @@ -44,10 +37,37 @@ IceInternal::RetryTask::runTimerTask() _queue->remove(this); } +void +IceInternal::RetryTask::requestCanceled(OutgoingBase*, const Ice::LocalException&) +{ + assert(false); +} + +void +IceInternal::RetryTask::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException&) +{ + if(_queue->cancel(this)) + { + // + // We just retry the outgoing async now rather than marking it + // as finished. The retry will check for the cancellation + // exception and terminate appropriately the request. + // + _outAsync->retry(); + } +} + void IceInternal::RetryTask::destroy() { - _outAsync->__invokeExceptionAsync(CommunicatorDestroyedException(__FILE__, __LINE__)); + try + { + _outAsync->abort(CommunicatorDestroyedException(__FILE__, __LINE__)); + } + catch(const CommunicatorDestroyedException&) + { + // Abort shouldn't throw if there's no callback, ignore. + } } bool @@ -61,7 +81,7 @@ IceInternal::RetryQueue::RetryQueue(const InstancePtr& instance) : _instance(ins } void -IceInternal::RetryQueue::add(const AsyncResultPtr& out, int interval) +IceInternal::RetryQueue::add(const ProxyOutgoingAsyncBasePtr& out, int interval) { Lock sync(*this); if(!_instance) @@ -78,6 +98,7 @@ IceInternal::RetryQueue::add(const AsyncResultPtr& out, int interval) throw CommunicatorDestroyedException(__FILE__, __LINE__); } _requests.insert(task); + out->cancelable(task); } void @@ -119,4 +140,17 @@ IceInternal::RetryQueue::remove(const RetryTaskPtr& task) } } - +bool +IceInternal::RetryQueue::cancel(const RetryTaskPtr& task) +{ + Lock sync(*this); + if(_requests.erase(task) > 0) + { + if(!_instance && _requests.empty()) + { + notify(); // If we are destroying the queue, destroy is probably waiting on the queue to be empty. + } + return _instance->timer()->cancel(task); + } + return false; +} |