summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2008-10-28 15:37:54 +0100
committerBenoit Foucher <benoit@zeroc.com>2008-10-28 15:37:54 +0100
commit4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e (patch)
tree0eb73fac0730f0a7bb3046e9c3b2cba8a4185654 /cpp/src
parentLocator improvements to improve scalability when receiving many requests (diff)
downloadice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.tar.bz2
ice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.tar.xz
ice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.zip
Fixed bug 3513 - AMI retry bug
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/Instance.cpp23
-rw-r--r--cpp/src/Ice/Instance.h3
-rw-r--r--cpp/src/Ice/Makefile1
-rw-r--r--cpp/src/Ice/Makefile.mak1
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp24
-rw-r--r--cpp/src/Ice/ProxyFactory.cpp59
-rw-r--r--cpp/src/Ice/RetryQueue.cpp92
-rw-r--r--cpp/src/Ice/RetryQueue.h62
-rw-r--r--cpp/src/Ice/RetryQueueF.h24
9 files changed, 237 insertions, 52 deletions
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp
index 8a1e5f45405..b26a1948a86 100644
--- a/cpp/src/Ice/Instance.cpp
+++ b/cpp/src/Ice/Instance.cpp
@@ -29,6 +29,7 @@
#include <Ice/LoggerI.h>
#include <Ice/Network.h>
#include <Ice/EndpointFactoryManager.h>
+#include <Ice/RetryQueue.h>
#include <Ice/TcpEndpointI.h>
#include <Ice/UdpEndpointI.h>
#include <Ice/DynamicLibrary.h>
@@ -285,6 +286,19 @@ IceInternal::Instance::endpointHostResolver()
return _endpointHostResolver;
}
+RetryQueuePtr
+IceInternal::Instance::retryQueue()
+{
+ IceUtil::RecMutex::Lock sync(*this);
+
+ if(_state == StateDestroyed)
+ {
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
+
+ return _retryQueue;
+}
+
IceUtil::TimerPtr
IceInternal::Instance::timer()
{
@@ -991,6 +1005,8 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi
_servantFactoryManager = new ObjectFactoryManager();
_objectAdapterFactory = new ObjectAdapterFactory(this, communicator);
+
+ _retryQueue = new RetryQueue(this);
if(_initData.wstringConverter == 0)
{
@@ -1039,6 +1055,7 @@ IceInternal::Instance::~Instance()
assert(!_serverThreadPool);
assert(!_selectorThread);
assert(!_endpointHostResolver);
+ assert(!_retryQueue);
assert(!_timer);
assert(!_routerManager);
assert(!_locatorManager);
@@ -1200,6 +1217,11 @@ IceInternal::Instance::destroy()
_outgoingConnectionFactory->waitUntilFinished();
}
+ if(_retryQueue)
+ {
+ _retryQueue->destroy();
+ }
+
ThreadPoolPtr serverThreadPool;
ThreadPoolPtr clientThreadPool;
SelectorThreadPtr selectorThread;
@@ -1210,6 +1232,7 @@ IceInternal::Instance::destroy()
_objectAdapterFactory = 0;
_outgoingConnectionFactory = 0;
+ _retryQueue = 0;
if(_connectionMonitor)
{
diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h
index f0aa50a80b8..721b347b238 100644
--- a/cpp/src/Ice/Instance.h
+++ b/cpp/src/Ice/Instance.h
@@ -30,6 +30,7 @@
#include <Ice/ObjectFactoryManagerF.h>
#include <Ice/ObjectAdapterFactoryF.h>
#include <Ice/EndpointFactoryManagerF.h>
+#include <Ice/RetryQueueF.h>
#include <Ice/DynamicLibraryF.h>
#include <Ice/PluginF.h>
#include <Ice/Initialize.h>
@@ -71,6 +72,7 @@ public:
ThreadPoolPtr serverThreadPool();
SelectorThreadPtr selectorThread();
EndpointHostResolverPtr endpointHostResolver();
+ RetryQueuePtr retryQueue();
IceUtil::TimerPtr timer();
EndpointFactoryManagerPtr endpointFactoryManager() const;
DynamicLibraryListPtr dynamicLibraryList() const;
@@ -134,6 +136,7 @@ private:
ThreadPoolPtr _serverThreadPool;
SelectorThreadPtr _selectorThread;
EndpointHostResolverPtr _endpointHostResolver;
+ RetryQueuePtr _retryQueue;
IceUtil::TimerPtr _timer;
EndpointFactoryManagerPtr _endpointFactoryManager;
DynamicLibraryListPtr _dynamicLibraryList;
diff --git a/cpp/src/Ice/Makefile b/cpp/src/Ice/Makefile
index f236dd482fc..dbf931e2b31 100644
--- a/cpp/src/Ice/Makefile
+++ b/cpp/src/Ice/Makefile
@@ -80,6 +80,7 @@ OBJS = Acceptor.o \
Proxy.o \
ReferenceFactory.o \
Reference.o \
+ RetryQueue.o \
RequestHandler.o \
RouterInfo.o \
Router.o \
diff --git a/cpp/src/Ice/Makefile.mak b/cpp/src/Ice/Makefile.mak
index 3424f3e2439..a75f2467d44 100644
--- a/cpp/src/Ice/Makefile.mak
+++ b/cpp/src/Ice/Makefile.mak
@@ -81,6 +81,7 @@ OBJS = Acceptor.obj \
Proxy.obj \
ReferenceFactory.obj \
Reference.obj \
+ RetryQueue.obj \
RequestHandler.obj \
RouterInfo.obj \
Router.obj \
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 788cb2cb373..3ef1a299197 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -24,6 +24,7 @@
#include <Ice/ReplyStatus.h>
#include <Ice/ImplicitContextI.h>
#include <Ice/ThreadPool.h>
+#include <Ice/RetryQueue.h>
using namespace std;
using namespace Ice;
@@ -454,6 +455,24 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
}
}
+void
+IceInternal::OutgoingAsync::__retry(int interval)
+{
+ //
+ // This method is called by the proxy to retry an invocation, no
+ // other threads can access this object.
+ //
+ if(interval > 0)
+ {
+ assert(__os);
+ __os->instance()->retryQueue()->add(this, interval);
+ }
+ else
+ {
+ __send();
+ }
+}
+
bool
IceInternal::OutgoingAsync::__send()
{
@@ -466,11 +485,11 @@ IceInternal::OutgoingAsync::__send()
}
catch(const LocalExceptionWrapper& ex)
{
- handleException(ex);
+ handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously
}
catch(const Ice::LocalException& ex)
{
- handleException(ex);
+ handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously
}
return _sentSynchronously;
}
@@ -483,6 +502,7 @@ IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operat
_delegate = 0;
_cnt = 0;
_mode = mode;
+ _sentSynchronously = false;
//
// Can't call async via a batch proxy.
diff --git a/cpp/src/Ice/ProxyFactory.cpp b/cpp/src/Ice/ProxyFactory.cpp
index 2fc5316eb6d..eeb554d3e4a 100644
--- a/cpp/src/Ice/ProxyFactory.cpp
+++ b/cpp/src/Ice/ProxyFactory.cpp
@@ -25,32 +25,8 @@ using namespace std;
using namespace Ice;
using namespace IceInternal;
-namespace
-{
-
-class RetryTask : public IceUtil::TimerTask
-{
-public:
-
- RetryTask(const OutgoingAsyncPtr& out) : _out(out)
- {
- }
-
- virtual void
- runTimerTask()
- {
- _out->__send();
- }
-
-private:
-
- const OutgoingAsyncPtr _out;
-};
-
-}
-
IceUtil::Shared* IceInternal::upCast(ProxyFactory* p) { return p; }
-
+
ObjectPrx
IceInternal::ProxyFactory::stringToProxy(const string& str) const
{
@@ -243,34 +219,17 @@ IceInternal::ProxyFactory::checkRetryAfterException(const LocalException& ex,
}
out << " because of exception\n" << ex;
}
-
- if(interval > 0)
+
+ if(out)
{
- if(out)
- {
- try
- {
- _instance->timer()->schedule(new RetryTask(out), IceUtil::Time::milliSeconds(interval));
- }
- catch(const IceUtil::IllegalArgumentException&) // Expected if the communicator destroyed the timer.
- {
- throw CommunicatorDestroyedException(__FILE__, __LINE__);
- }
- }
- else
- {
- //
- // Sleep before retrying.
- //
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval));
- }
+ out->__retry(interval);
}
- else
+ else if(interval > 0)
{
- if(out)
- {
- out->__send();
- }
+ //
+ // Sleep before retrying.
+ //
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval));
}
}
diff --git a/cpp/src/Ice/RetryQueue.cpp b/cpp/src/Ice/RetryQueue.cpp
new file mode 100644
index 00000000000..d6aba62f844
--- /dev/null
+++ b/cpp/src/Ice/RetryQueue.cpp
@@ -0,0 +1,92 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/RetryQueue.h>
+#include <Ice/OutgoingAsync.h>
+#include <Ice/LocalException.h>
+#include <Ice/Instance.h>
+
+using namespace std;
+using namespace Ice;
+using namespace IceInternal;
+
+IceUtil::Shared* IceInternal::upCast(RetryQueue* p) { return p; }
+
+IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const OutgoingAsyncPtr& outAsync) :
+ _queue(queue), _outAsync(outAsync)
+{
+}
+
+void
+IceInternal::RetryTask::runTimerTask()
+{
+ if(_queue->remove(this))
+ {
+ try
+ {
+ _outAsync->__send();
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ _outAsync->__releaseCallback(ex);
+ }
+ }
+}
+
+void
+IceInternal::RetryTask::destroy()
+{
+ _outAsync->__releaseCallback(CommunicatorDestroyedException(__FILE__, __LINE__));
+}
+
+bool
+IceInternal::RetryTask::operator<(const RetryTask& rhs) const
+{
+ return this < &rhs;
+}
+
+IceInternal::RetryQueue::RetryQueue(const InstancePtr& instance) : _instance(instance)
+{
+}
+
+void
+IceInternal::RetryQueue::add(const OutgoingAsyncPtr& out, int interval)
+{
+ Lock sync(*this);
+ RetryTaskPtr task = new RetryTask(this, out);
+ try
+ {
+ _instance->timer()->schedule(task, IceUtil::Time::milliSeconds(interval));
+ }
+ catch(const IceUtil::IllegalArgumentException&) // Expected if the communicator destroyed the timer.
+ {
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
+ _requests.insert(task);
+}
+
+void
+IceInternal::RetryQueue::destroy()
+{
+ Lock sync(*this);
+ for(set<RetryTaskPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ _instance->timer()->cancel(*p);
+ (*p)->destroy();
+ }
+ _requests.clear();
+}
+
+bool
+IceInternal::RetryQueue::remove(const RetryTaskPtr& task)
+{
+ Lock sync(*this);
+ return _requests.erase(task) > 0;
+}
+
diff --git a/cpp/src/Ice/RetryQueue.h b/cpp/src/Ice/RetryQueue.h
new file mode 100644
index 00000000000..960b4a8d220
--- /dev/null
+++ b/cpp/src/Ice/RetryQueue.h
@@ -0,0 +1,62 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#ifndef ICE_RETRY_QUEUE_H
+#define ICE_RETRY_QUEUE_H
+
+#include <IceUtil/Shared.h>
+#include <IceUtil/Mutex.h>
+#include <IceUtil/Timer.h>
+#include <Ice/RetryQueueF.h>
+#include <Ice/OutgoingAsyncF.h>
+#include <Ice/InstanceF.h>
+
+namespace IceInternal
+{
+
+class RetryTask : public IceUtil::TimerTask
+{
+public:
+
+ RetryTask(const RetryQueuePtr&, const OutgoingAsyncPtr&);
+
+ virtual void runTimerTask();
+ void destroy();
+
+ bool operator<(const RetryTask&) const;
+
+private:
+
+ const RetryQueuePtr _queue;
+ const OutgoingAsyncPtr _outAsync;
+};
+typedef IceUtil::Handle<RetryTask> RetryTaskPtr;
+
+class RetryQueue : public IceUtil::Shared, public IceUtil::Mutex
+{
+public:
+
+ RetryQueue(const InstancePtr&);
+
+ void add(const OutgoingAsyncPtr&, int);
+ void destroy();
+
+private:
+
+ bool remove(const RetryTaskPtr&);
+ friend class RetryTask;
+
+ const InstancePtr _instance;
+ std::set<RetryTaskPtr> _requests;
+};
+
+}
+
+#endif
+
diff --git a/cpp/src/Ice/RetryQueueF.h b/cpp/src/Ice/RetryQueueF.h
new file mode 100644
index 00000000000..0e99fd7e881
--- /dev/null
+++ b/cpp/src/Ice/RetryQueueF.h
@@ -0,0 +1,24 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#ifndef ICE_RETRY_QUEUE_F_H
+#define ICE_RETRY_QUEUE_F_H
+
+#include <Ice/Handle.h>
+
+namespace IceInternal
+{
+
+class RetryQueue;
+IceUtil::Shared* upCast(RetryQueue*);
+typedef Handle<RetryQueue> RetryQueuePtr;
+
+}
+
+#endif