diff options
author | Benoit Foucher <benoit@zeroc.com> | 2008-10-28 15:37:54 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2008-10-28 15:37:54 +0100 |
commit | 4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e (patch) | |
tree | 0eb73fac0730f0a7bb3046e9c3b2cba8a4185654 /cpp/src | |
parent | Locator improvements to improve scalability when receiving many requests (diff) | |
download | ice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.tar.bz2 ice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.tar.xz ice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.zip |
Fixed bug 3513 - AMI retry bug
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/Instance.cpp | 23 | ||||
-rw-r--r-- | cpp/src/Ice/Instance.h | 3 | ||||
-rw-r--r-- | cpp/src/Ice/Makefile | 1 | ||||
-rw-r--r-- | cpp/src/Ice/Makefile.mak | 1 | ||||
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 24 | ||||
-rw-r--r-- | cpp/src/Ice/ProxyFactory.cpp | 59 | ||||
-rw-r--r-- | cpp/src/Ice/RetryQueue.cpp | 92 | ||||
-rw-r--r-- | cpp/src/Ice/RetryQueue.h | 62 | ||||
-rw-r--r-- | cpp/src/Ice/RetryQueueF.h | 24 |
9 files changed, 237 insertions, 52 deletions
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index 8a1e5f45405..b26a1948a86 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -29,6 +29,7 @@ #include <Ice/LoggerI.h> #include <Ice/Network.h> #include <Ice/EndpointFactoryManager.h> +#include <Ice/RetryQueue.h> #include <Ice/TcpEndpointI.h> #include <Ice/UdpEndpointI.h> #include <Ice/DynamicLibrary.h> @@ -285,6 +286,19 @@ IceInternal::Instance::endpointHostResolver() return _endpointHostResolver; } +RetryQueuePtr +IceInternal::Instance::retryQueue() +{ + IceUtil::RecMutex::Lock sync(*this); + + if(_state == StateDestroyed) + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + + return _retryQueue; +} + IceUtil::TimerPtr IceInternal::Instance::timer() { @@ -991,6 +1005,8 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi _servantFactoryManager = new ObjectFactoryManager(); _objectAdapterFactory = new ObjectAdapterFactory(this, communicator); + + _retryQueue = new RetryQueue(this); if(_initData.wstringConverter == 0) { @@ -1039,6 +1055,7 @@ IceInternal::Instance::~Instance() assert(!_serverThreadPool); assert(!_selectorThread); assert(!_endpointHostResolver); + assert(!_retryQueue); assert(!_timer); assert(!_routerManager); assert(!_locatorManager); @@ -1200,6 +1217,11 @@ IceInternal::Instance::destroy() _outgoingConnectionFactory->waitUntilFinished(); } + if(_retryQueue) + { + _retryQueue->destroy(); + } + ThreadPoolPtr serverThreadPool; ThreadPoolPtr clientThreadPool; SelectorThreadPtr selectorThread; @@ -1210,6 +1232,7 @@ IceInternal::Instance::destroy() _objectAdapterFactory = 0; _outgoingConnectionFactory = 0; + _retryQueue = 0; if(_connectionMonitor) { diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h index f0aa50a80b8..721b347b238 100644 --- a/cpp/src/Ice/Instance.h +++ b/cpp/src/Ice/Instance.h @@ -30,6 +30,7 @@ #include <Ice/ObjectFactoryManagerF.h> #include <Ice/ObjectAdapterFactoryF.h> #include <Ice/EndpointFactoryManagerF.h> +#include <Ice/RetryQueueF.h> #include <Ice/DynamicLibraryF.h> #include <Ice/PluginF.h> #include <Ice/Initialize.h> @@ -71,6 +72,7 @@ public: ThreadPoolPtr serverThreadPool(); SelectorThreadPtr selectorThread(); EndpointHostResolverPtr endpointHostResolver(); + RetryQueuePtr retryQueue(); IceUtil::TimerPtr timer(); EndpointFactoryManagerPtr endpointFactoryManager() const; DynamicLibraryListPtr dynamicLibraryList() const; @@ -134,6 +136,7 @@ private: ThreadPoolPtr _serverThreadPool; SelectorThreadPtr _selectorThread; EndpointHostResolverPtr _endpointHostResolver; + RetryQueuePtr _retryQueue; IceUtil::TimerPtr _timer; EndpointFactoryManagerPtr _endpointFactoryManager; DynamicLibraryListPtr _dynamicLibraryList; diff --git a/cpp/src/Ice/Makefile b/cpp/src/Ice/Makefile index f236dd482fc..dbf931e2b31 100644 --- a/cpp/src/Ice/Makefile +++ b/cpp/src/Ice/Makefile @@ -80,6 +80,7 @@ OBJS = Acceptor.o \ Proxy.o \ ReferenceFactory.o \ Reference.o \ + RetryQueue.o \ RequestHandler.o \ RouterInfo.o \ Router.o \ diff --git a/cpp/src/Ice/Makefile.mak b/cpp/src/Ice/Makefile.mak index 3424f3e2439..a75f2467d44 100644 --- a/cpp/src/Ice/Makefile.mak +++ b/cpp/src/Ice/Makefile.mak @@ -81,6 +81,7 @@ OBJS = Acceptor.obj \ Proxy.obj \
ReferenceFactory.obj \
Reference.obj \
+ RetryQueue.obj \
RequestHandler.obj \
RouterInfo.obj \
Router.obj \
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 788cb2cb373..3ef1a299197 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -24,6 +24,7 @@ #include <Ice/ReplyStatus.h> #include <Ice/ImplicitContextI.h> #include <Ice/ThreadPool.h> +#include <Ice/RetryQueue.h> using namespace std; using namespace Ice; @@ -454,6 +455,24 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc) } } +void +IceInternal::OutgoingAsync::__retry(int interval) +{ + // + // This method is called by the proxy to retry an invocation, no + // other threads can access this object. + // + if(interval > 0) + { + assert(__os); + __os->instance()->retryQueue()->add(this, interval); + } + else + { + __send(); + } +} + bool IceInternal::OutgoingAsync::__send() { @@ -466,11 +485,11 @@ IceInternal::OutgoingAsync::__send() } catch(const LocalExceptionWrapper& ex) { - handleException(ex); + handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously } catch(const Ice::LocalException& ex) { - handleException(ex); + handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously } return _sentSynchronously; } @@ -483,6 +502,7 @@ IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operat _delegate = 0; _cnt = 0; _mode = mode; + _sentSynchronously = false; // // Can't call async via a batch proxy. diff --git a/cpp/src/Ice/ProxyFactory.cpp b/cpp/src/Ice/ProxyFactory.cpp index 2fc5316eb6d..eeb554d3e4a 100644 --- a/cpp/src/Ice/ProxyFactory.cpp +++ b/cpp/src/Ice/ProxyFactory.cpp @@ -25,32 +25,8 @@ using namespace std; using namespace Ice; using namespace IceInternal; -namespace -{ - -class RetryTask : public IceUtil::TimerTask -{ -public: - - RetryTask(const OutgoingAsyncPtr& out) : _out(out) - { - } - - virtual void - runTimerTask() - { - _out->__send(); - } - -private: - - const OutgoingAsyncPtr _out; -}; - -} - IceUtil::Shared* IceInternal::upCast(ProxyFactory* p) { return p; } - + ObjectPrx IceInternal::ProxyFactory::stringToProxy(const string& str) const { @@ -243,34 +219,17 @@ IceInternal::ProxyFactory::checkRetryAfterException(const LocalException& ex, } out << " because of exception\n" << ex; } - - if(interval > 0) + + if(out) { - if(out) - { - try - { - _instance->timer()->schedule(new RetryTask(out), IceUtil::Time::milliSeconds(interval)); - } - catch(const IceUtil::IllegalArgumentException&) // Expected if the communicator destroyed the timer. - { - throw CommunicatorDestroyedException(__FILE__, __LINE__); - } - } - else - { - // - // Sleep before retrying. - // - IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval)); - } + out->__retry(interval); } - else + else if(interval > 0) { - if(out) - { - out->__send(); - } + // + // Sleep before retrying. + // + IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval)); } } diff --git a/cpp/src/Ice/RetryQueue.cpp b/cpp/src/Ice/RetryQueue.cpp new file mode 100644 index 00000000000..d6aba62f844 --- /dev/null +++ b/cpp/src/Ice/RetryQueue.cpp @@ -0,0 +1,92 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/RetryQueue.h> +#include <Ice/OutgoingAsync.h> +#include <Ice/LocalException.h> +#include <Ice/Instance.h> + +using namespace std; +using namespace Ice; +using namespace IceInternal; + +IceUtil::Shared* IceInternal::upCast(RetryQueue* p) { return p; } + +IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const OutgoingAsyncPtr& outAsync) : + _queue(queue), _outAsync(outAsync) +{ +} + +void +IceInternal::RetryTask::runTimerTask() +{ + if(_queue->remove(this)) + { + try + { + _outAsync->__send(); + } + catch(const Ice::LocalException& ex) + { + _outAsync->__releaseCallback(ex); + } + } +} + +void +IceInternal::RetryTask::destroy() +{ + _outAsync->__releaseCallback(CommunicatorDestroyedException(__FILE__, __LINE__)); +} + +bool +IceInternal::RetryTask::operator<(const RetryTask& rhs) const +{ + return this < &rhs; +} + +IceInternal::RetryQueue::RetryQueue(const InstancePtr& instance) : _instance(instance) +{ +} + +void +IceInternal::RetryQueue::add(const OutgoingAsyncPtr& out, int interval) +{ + Lock sync(*this); + RetryTaskPtr task = new RetryTask(this, out); + try + { + _instance->timer()->schedule(task, IceUtil::Time::milliSeconds(interval)); + } + catch(const IceUtil::IllegalArgumentException&) // Expected if the communicator destroyed the timer. + { + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + _requests.insert(task); +} + +void +IceInternal::RetryQueue::destroy() +{ + Lock sync(*this); + for(set<RetryTaskPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + _instance->timer()->cancel(*p); + (*p)->destroy(); + } + _requests.clear(); +} + +bool +IceInternal::RetryQueue::remove(const RetryTaskPtr& task) +{ + Lock sync(*this); + return _requests.erase(task) > 0; +} + diff --git a/cpp/src/Ice/RetryQueue.h b/cpp/src/Ice/RetryQueue.h new file mode 100644 index 00000000000..960b4a8d220 --- /dev/null +++ b/cpp/src/Ice/RetryQueue.h @@ -0,0 +1,62 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_RETRY_QUEUE_H +#define ICE_RETRY_QUEUE_H + +#include <IceUtil/Shared.h> +#include <IceUtil/Mutex.h> +#include <IceUtil/Timer.h> +#include <Ice/RetryQueueF.h> +#include <Ice/OutgoingAsyncF.h> +#include <Ice/InstanceF.h> + +namespace IceInternal +{ + +class RetryTask : public IceUtil::TimerTask +{ +public: + + RetryTask(const RetryQueuePtr&, const OutgoingAsyncPtr&); + + virtual void runTimerTask(); + void destroy(); + + bool operator<(const RetryTask&) const; + +private: + + const RetryQueuePtr _queue; + const OutgoingAsyncPtr _outAsync; +}; +typedef IceUtil::Handle<RetryTask> RetryTaskPtr; + +class RetryQueue : public IceUtil::Shared, public IceUtil::Mutex +{ +public: + + RetryQueue(const InstancePtr&); + + void add(const OutgoingAsyncPtr&, int); + void destroy(); + +private: + + bool remove(const RetryTaskPtr&); + friend class RetryTask; + + const InstancePtr _instance; + std::set<RetryTaskPtr> _requests; +}; + +} + +#endif + diff --git a/cpp/src/Ice/RetryQueueF.h b/cpp/src/Ice/RetryQueueF.h new file mode 100644 index 00000000000..0e99fd7e881 --- /dev/null +++ b/cpp/src/Ice/RetryQueueF.h @@ -0,0 +1,24 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_RETRY_QUEUE_F_H +#define ICE_RETRY_QUEUE_F_H + +#include <Ice/Handle.h> + +namespace IceInternal +{ + +class RetryQueue; +IceUtil::Shared* upCast(RetryQueue*); +typedef Handle<RetryQueue> RetryQueuePtr; + +} + +#endif |