summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2008-10-28 15:37:54 +0100
committerBenoit Foucher <benoit@zeroc.com>2008-10-28 15:37:54 +0100
commit4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e (patch)
tree0eb73fac0730f0a7bb3046e9c3b2cba8a4185654 /java
parentLocator improvements to improve scalability when receiving many requests (diff)
downloadice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.tar.bz2
ice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.tar.xz
ice-4cf71ddedfbcb89f1f8892c87bc6df68f1086c0e.zip
Fixed bug 3513 - AMI retry bug
Diffstat (limited to 'java')
-rw-r--r--java/src/Ice/ObjectPrxHelperBase.java2
-rw-r--r--java/src/IceInternal/Instance.java22
-rw-r--r--java/src/IceInternal/OutgoingAsync.java17
-rw-r--r--java/src/IceInternal/ProxyFactory.java43
-rw-r--r--java/src/IceInternal/RetryQueue.java49
-rw-r--r--java/src/IceInternal/RetryTask.java44
6 files changed, 140 insertions, 37 deletions
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java
index 574ded0a9dd..261adfd6ba3 100644
--- a/java/src/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/ObjectPrxHelperBase.java
@@ -908,7 +908,7 @@ public class ObjectPrxHelperBase implements ObjectPrx
if(out != null)
{
- out.__send(cnt);
+ out.__send();
}
return cnt;
diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java
index 1ee242ba63c..2578f5b018e 100644
--- a/java/src/IceInternal/Instance.java
+++ b/java/src/IceInternal/Instance.java
@@ -201,6 +201,17 @@ public final class Instance
return _endpointHostResolver;
}
+ synchronized public RetryQueue
+ retryQueue()
+ {
+ if(_state == StateDestroyed)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
+ return _retryQueue;
+ }
+
synchronized public Timer
timer()
{
@@ -713,6 +724,8 @@ public final class Instance
_objectAdapterFactory = new ObjectAdapterFactory(this, communicator);
+ _retryQueue = new RetryQueue(this);
+
//
// Add Process and Properties facets
//
@@ -753,6 +766,7 @@ public final class Instance
IceUtilInternal.Assert.FinalizerAssert(_locatorManager == null);
IceUtilInternal.Assert.FinalizerAssert(_endpointFactoryManager == null);
IceUtilInternal.Assert.FinalizerAssert(_pluginManager == null);
+ IceUtilInternal.Assert.FinalizerAssert(_retryQueue == null);
super.finalize();
}
@@ -868,6 +882,11 @@ public final class Instance
{
_outgoingConnectionFactory.waitUntilFinished();
}
+
+ if(_retryQueue != null)
+ {
+ _retryQueue.destroy();
+ }
ThreadPool serverThreadPool = null;
ThreadPool clientThreadPool = null;
@@ -877,8 +896,8 @@ public final class Instance
synchronized(this)
{
_objectAdapterFactory = null;
-
_outgoingConnectionFactory = null;
+ _retryQueue = null;
if(_connectionMonitor != null)
{
@@ -1054,6 +1073,7 @@ public final class Instance
private ThreadPool _serverThreadPool;
private SelectorThread _selectorThread;
private EndpointHostResolver _endpointHostResolver;
+ private RetryQueue _retryQueue;
private Timer _timer;
private EndpointFactoryManager _endpointFactoryManager;
private Ice.PluginManager _pluginManager;
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 9359fe4e36f..12820c9c7fc 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -270,14 +270,22 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
}
public final void
- __send(int cnt)
+ __retry(int cnt, int interval)
{
//
// This method is called by the proxy to retry an invocation. It's safe to update
// the count here without synchronization, no other threads can access this object.
//
_cnt = cnt;
- __send();
+ if(interval > 0)
+ {
+ assert(__os != null);
+ __os.instance().retryQueue().add(this, interval);
+ }
+ else
+ {
+ __send();
+ }
}
public final boolean
@@ -292,11 +300,11 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
}
catch(LocalExceptionWrapper ex)
{
- handleException(ex);
+ handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously
}
catch(Ice.LocalException ex)
{
- handleException(ex);
+ handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously
}
return _sentSynchronously;
}
@@ -310,6 +318,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
_delegate = null;
_cnt = 0;
_mode = mode;
+ _sentSynchronously = false;
//
// Can't call async via a batch proxy.
diff --git a/java/src/IceInternal/ProxyFactory.java b/java/src/IceInternal/ProxyFactory.java
index ac05fe1d57b..1f24976e772 100644
--- a/java/src/IceInternal/ProxyFactory.java
+++ b/java/src/IceInternal/ProxyFactory.java
@@ -132,7 +132,7 @@ public final class ProxyFactory
if(out != null)
{
- out.__send(cnt);
+ out.__retry(cnt, 0);
}
return cnt; // We must always retry, so we don't look at the retry count.
}
@@ -205,42 +205,23 @@ public final class ProxyFactory
logger.trace(traceLevels.retryCat, s);
}
- if(interval > 0)
+ if(out != null)
{
- if(out != null)
- {
- final int count = cnt;
- _instance.timer().schedule(new TimerTask()
- {
- public void
- runTimerTask()
- {
- out.__send(count);
- }
- }, interval);
- }
- else
- {
- //
- // Sleep before retrying.
- //
- try
- {
- Thread.currentThread().sleep(interval);
- }
- catch(InterruptedException ex1)
- {
- }
- }
+ out.__retry(cnt, interval);
}
- else
+ else if(interval > 0)
{
- if(out != null)
+ //
+ // Sleep before retrying.
+ //
+ try
+ {
+ Thread.currentThread().sleep(interval);
+ }
+ catch(InterruptedException ex1)
{
- out.__send(cnt);
}
}
-
return cnt;
}
diff --git a/java/src/IceInternal/RetryQueue.java b/java/src/IceInternal/RetryQueue.java
new file mode 100644
index 00000000000..0e0065ab2df
--- /dev/null
+++ b/java/src/IceInternal/RetryQueue.java
@@ -0,0 +1,49 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+public class RetryQueue
+{
+ RetryQueue(Instance instance)
+ {
+ _instance = instance;
+ }
+
+ synchronized public void
+ add(OutgoingAsync outAsync, int interval)
+ {
+ RetryTask task = new RetryTask(this, outAsync);
+ _instance.timer().schedule(task, interval);
+ _requests.add(task);
+ }
+
+ synchronized public void
+ destroy()
+ {
+ java.util.Iterator<RetryTask> p = _requests.iterator();
+ while(p.hasNext())
+ {
+ RetryTask task = p.next();
+ _instance.timer().cancel(task);
+ task.destroy();
+ }
+ _requests.clear();
+ }
+
+ synchronized boolean
+ remove(RetryTask task)
+ {
+ return _requests.remove(task);
+ }
+
+ final private Instance _instance;
+ final private java.util.HashSet<RetryTask> _requests = new java.util.HashSet<RetryTask>();
+};
+
diff --git a/java/src/IceInternal/RetryTask.java b/java/src/IceInternal/RetryTask.java
new file mode 100644
index 00000000000..5c17c8af621
--- /dev/null
+++ b/java/src/IceInternal/RetryTask.java
@@ -0,0 +1,44 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+class RetryTask implements TimerTask
+{
+ RetryTask(RetryQueue queue, OutgoingAsync outAsync)
+ {
+ _queue = queue;
+ _outAsync = outAsync;
+ }
+
+ public void
+ runTimerTask()
+ {
+ if(_queue.remove(this))
+ {
+ try
+ {
+ _outAsync.__send();
+ }
+ catch(Ice.LocalException ex)
+ {
+ _outAsync.__releaseCallback(ex);
+ }
+ }
+ }
+
+ public void
+ destroy()
+ {
+ _outAsync.__releaseCallback(new Ice.CommunicatorDestroyedException());
+ }
+
+ private final RetryQueue _queue;
+ private final OutgoingAsync _outAsync;
+}