diff options
author | Benoit Foucher <benoit@zeroc.com> | 2008-10-28 15:37:54 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2008-10-28 15:37:54 +0100 |
commit | 4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e (patch) | |
tree | 0eb73fac0730f0a7bb3046e9c3b2cba8a4185654 /java | |
parent | Locator improvements to improve scalability when receiving many requests (diff) | |
download | ice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.tar.bz2 ice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.tar.xz ice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.zip |
Fixed bug 3513 - AMI retry bug
Diffstat (limited to 'java')
-rw-r--r-- | java/src/Ice/ObjectPrxHelperBase.java | 2 | ||||
-rw-r--r-- | java/src/IceInternal/Instance.java | 22 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 17 | ||||
-rw-r--r-- | java/src/IceInternal/ProxyFactory.java | 43 | ||||
-rw-r--r-- | java/src/IceInternal/RetryQueue.java | 49 | ||||
-rw-r--r-- | java/src/IceInternal/RetryTask.java | 44 |
6 files changed, 140 insertions, 37 deletions
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index 574ded0a9dd..261adfd6ba3 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -908,7 +908,7 @@ public class ObjectPrxHelperBase implements ObjectPrx if(out != null) { - out.__send(cnt); + out.__send(); } return cnt; diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index 1ee242ba63c..2578f5b018e 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -201,6 +201,17 @@ public final class Instance return _endpointHostResolver; } + synchronized public RetryQueue + retryQueue() + { + if(_state == StateDestroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + + return _retryQueue; + } + synchronized public Timer timer() { @@ -713,6 +724,8 @@ public final class Instance _objectAdapterFactory = new ObjectAdapterFactory(this, communicator); + _retryQueue = new RetryQueue(this); + // // Add Process and Properties facets // @@ -753,6 +766,7 @@ public final class Instance IceUtilInternal.Assert.FinalizerAssert(_locatorManager == null); IceUtilInternal.Assert.FinalizerAssert(_endpointFactoryManager == null); IceUtilInternal.Assert.FinalizerAssert(_pluginManager == null); + IceUtilInternal.Assert.FinalizerAssert(_retryQueue == null); super.finalize(); } @@ -868,6 +882,11 @@ public final class Instance { _outgoingConnectionFactory.waitUntilFinished(); } + + if(_retryQueue != null) + { + _retryQueue.destroy(); + } ThreadPool serverThreadPool = null; ThreadPool clientThreadPool = null; @@ -877,8 +896,8 @@ public final class Instance synchronized(this) { _objectAdapterFactory = null; - _outgoingConnectionFactory = null; + _retryQueue = null; if(_connectionMonitor != null) { @@ -1054,6 +1073,7 @@ public final class Instance private ThreadPool _serverThreadPool; private SelectorThread _selectorThread; private EndpointHostResolver _endpointHostResolver; + private RetryQueue _retryQueue; private Timer _timer; private EndpointFactoryManager _endpointFactoryManager; private Ice.PluginManager _pluginManager; diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 9359fe4e36f..12820c9c7fc 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -270,14 +270,22 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback } public final void - __send(int cnt) + __retry(int cnt, int interval) { // // This method is called by the proxy to retry an invocation. It's safe to update // the count here without synchronization, no other threads can access this object. // _cnt = cnt; - __send(); + if(interval > 0) + { + assert(__os != null); + __os.instance().retryQueue().add(this, interval); + } + else + { + __send(); + } } public final boolean @@ -292,11 +300,11 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback } catch(LocalExceptionWrapper ex) { - handleException(ex); + handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously } catch(Ice.LocalException ex) { - handleException(ex); + handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously } return _sentSynchronously; } @@ -310,6 +318,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback _delegate = null; _cnt = 0; _mode = mode; + _sentSynchronously = false; // // Can't call async via a batch proxy. diff --git a/java/src/IceInternal/ProxyFactory.java b/java/src/IceInternal/ProxyFactory.java index ac05fe1d57b..1f24976e772 100644 --- a/java/src/IceInternal/ProxyFactory.java +++ b/java/src/IceInternal/ProxyFactory.java @@ -132,7 +132,7 @@ public final class ProxyFactory if(out != null) { - out.__send(cnt); + out.__retry(cnt, 0); } return cnt; // We must always retry, so we don't look at the retry count. } @@ -205,42 +205,23 @@ public final class ProxyFactory logger.trace(traceLevels.retryCat, s); } - if(interval > 0) + if(out != null) { - if(out != null) - { - final int count = cnt; - _instance.timer().schedule(new TimerTask() - { - public void - runTimerTask() - { - out.__send(count); - } - }, interval); - } - else - { - // - // Sleep before retrying. - // - try - { - Thread.currentThread().sleep(interval); - } - catch(InterruptedException ex1) - { - } - } + out.__retry(cnt, interval); } - else + else if(interval > 0) { - if(out != null) + // + // Sleep before retrying. + // + try + { + Thread.currentThread().sleep(interval); + } + catch(InterruptedException ex1) { - out.__send(cnt); } } - return cnt; } diff --git a/java/src/IceInternal/RetryQueue.java b/java/src/IceInternal/RetryQueue.java new file mode 100644 index 00000000000..0e0065ab2df --- /dev/null +++ b/java/src/IceInternal/RetryQueue.java @@ -0,0 +1,49 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class RetryQueue +{ + RetryQueue(Instance instance) + { + _instance = instance; + } + + synchronized public void + add(OutgoingAsync outAsync, int interval) + { + RetryTask task = new RetryTask(this, outAsync); + _instance.timer().schedule(task, interval); + _requests.add(task); + } + + synchronized public void + destroy() + { + java.util.Iterator<RetryTask> p = _requests.iterator(); + while(p.hasNext()) + { + RetryTask task = p.next(); + _instance.timer().cancel(task); + task.destroy(); + } + _requests.clear(); + } + + synchronized boolean + remove(RetryTask task) + { + return _requests.remove(task); + } + + final private Instance _instance; + final private java.util.HashSet<RetryTask> _requests = new java.util.HashSet<RetryTask>(); +}; + diff --git a/java/src/IceInternal/RetryTask.java b/java/src/IceInternal/RetryTask.java new file mode 100644 index 00000000000..5c17c8af621 --- /dev/null +++ b/java/src/IceInternal/RetryTask.java @@ -0,0 +1,44 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +class RetryTask implements TimerTask +{ + RetryTask(RetryQueue queue, OutgoingAsync outAsync) + { + _queue = queue; + _outAsync = outAsync; + } + + public void + runTimerTask() + { + if(_queue.remove(this)) + { + try + { + _outAsync.__send(); + } + catch(Ice.LocalException ex) + { + _outAsync.__releaseCallback(ex); + } + } + } + + public void + destroy() + { + _outAsync.__releaseCallback(new Ice.CommunicatorDestroyedException()); + } + + private final RetryQueue _queue; + private final OutgoingAsync _outAsync; +} |