diff options
author | Benoit Foucher <benoit@zeroc.com> | 2008-10-28 15:37:54 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2008-10-28 15:37:54 +0100 |
commit | 4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e (patch) | |
tree | 0eb73fac0730f0a7bb3046e9c3b2cba8a4185654 /cpp/src/Ice/RetryQueue.cpp | |
parent | Locator improvements to improve scalability when receiving many requests (diff) | |
download | ice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.tar.bz2 ice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.tar.xz ice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.zip |
Fixed bug 3513 - AMI retry bug
Diffstat (limited to 'cpp/src/Ice/RetryQueue.cpp')
-rw-r--r-- | cpp/src/Ice/RetryQueue.cpp | 92 |
1 files changed, 92 insertions, 0 deletions
diff --git a/cpp/src/Ice/RetryQueue.cpp b/cpp/src/Ice/RetryQueue.cpp new file mode 100644 index 00000000000..d6aba62f844 --- /dev/null +++ b/cpp/src/Ice/RetryQueue.cpp @@ -0,0 +1,92 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/RetryQueue.h> +#include <Ice/OutgoingAsync.h> +#include <Ice/LocalException.h> +#include <Ice/Instance.h> + +using namespace std; +using namespace Ice; +using namespace IceInternal; + +IceUtil::Shared* IceInternal::upCast(RetryQueue* p) { return p; } + +IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const OutgoingAsyncPtr& outAsync) : + _queue(queue), _outAsync(outAsync) +{ +} + +void +IceInternal::RetryTask::runTimerTask() +{ + if(_queue->remove(this)) + { + try + { + _outAsync->__send(); + } + catch(const Ice::LocalException& ex) + { + _outAsync->__releaseCallback(ex); + } + } +} + +void +IceInternal::RetryTask::destroy() +{ + _outAsync->__releaseCallback(CommunicatorDestroyedException(__FILE__, __LINE__)); +} + +bool +IceInternal::RetryTask::operator<(const RetryTask& rhs) const +{ + return this < &rhs; +} + +IceInternal::RetryQueue::RetryQueue(const InstancePtr& instance) : _instance(instance) +{ +} + +void +IceInternal::RetryQueue::add(const OutgoingAsyncPtr& out, int interval) +{ + Lock sync(*this); + RetryTaskPtr task = new RetryTask(this, out); + try + { + _instance->timer()->schedule(task, IceUtil::Time::milliSeconds(interval)); + } + catch(const IceUtil::IllegalArgumentException&) // Expected if the communicator destroyed the timer. + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + _requests.insert(task); +} + +void +IceInternal::RetryQueue::destroy() +{ + Lock sync(*this); + for(set<RetryTaskPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + _instance->timer()->cancel(*p); + (*p)->destroy(); + } + _requests.clear(); +} + +bool +IceInternal::RetryQueue::remove(const RetryTaskPtr& task) +{ + Lock sync(*this); + return _requests.erase(task) > 0; +} + |