diff options
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/IceInternal/ConnectionMonitor.java | 151 | ||||
-rw-r--r-- | java/src/IceInternal/Instance.java | 28 | ||||
-rw-r--r-- | java/src/IceInternal/Timer.java | 298 |
3 files changed, 377 insertions, 100 deletions
diff --git a/java/src/IceInternal/ConnectionMonitor.java b/java/src/IceInternal/ConnectionMonitor.java index de44f64fe71..7f46d20f19b 100644 --- a/java/src/IceInternal/ConnectionMonitor.java +++ b/java/src/IceInternal/ConnectionMonitor.java @@ -9,39 +9,20 @@ package IceInternal; -public final class ConnectionMonitor extends Thread +public final class ConnectionMonitor implements IceInternal.TimerTask { // // Renamed from destroy to _destroy to avoid a deprecation warning caused // by the destroy method inherited from Thread. // - public void - _destroy() + synchronized public void + destroy() { - synchronized(this) - { - assert(_instance != null); - - _instance = null; - _connections = null; - - notify(); - } - - while(true) - { - try - { - join(); - break; - } - catch(java.lang.InterruptedException ex) - { - continue; - } - } + assert(_instance != null); + _instance = null; + _connections = null; } - + public synchronized void add(Ice.ConnectionI connection) { @@ -61,18 +42,10 @@ public final class ConnectionMonitor extends Thread // ConnectionMonitor(Instance instance, int interval) { + assert(interval > 0); _instance = instance; - _interval = interval; - String threadName = _instance.initializationData().properties.getProperty("Ice.ProgramName"); - if(threadName.length() > 0) - { - threadName += "-"; - } - setName(threadName + "Ice.ConnectionMonitor"); - - assert(_interval > 0); - start(); + _instance.timer().scheduleRepeated(this, interval * 1000); } protected synchronized void @@ -90,85 +63,67 @@ public final class ConnectionMonitor extends Thread { java.util.HashSet connections = new java.util.HashSet(); - while(true) + synchronized(this) { - synchronized(this) + if(_instance == null) { - if(_instance != null) - { - try - { - wait(_interval * 1000); - } - catch(InterruptedException ex) - { - continue; - } - } - - if(_instance == null) - { - return; - } - - connections.clear(); - connections.addAll(_connections); + return; } + + connections.clear(); + connections.addAll(_connections); + } - // - // Monitor connections outside the thread synchronization, - // so that connections can be added or removed during - // monitoring. - // - java.util.Iterator iter = connections.iterator(); - while(iter.hasNext()) + // + // Monitor connections outside the thread synchronization, + // so that connections can be added or removed during + // monitoring. + // + java.util.Iterator iter = connections.iterator(); + while(iter.hasNext()) + { + Ice.ConnectionI connection = (Ice.ConnectionI)iter.next(); + + try + { + connection.monitor(); + } + catch(Ice.LocalException ex) { - Ice.ConnectionI connection = (Ice.ConnectionI)iter.next(); - - try - { - connection.monitor(); - } - catch(Ice.LocalException ex) + synchronized(this) { - synchronized(this) + if(_instance == null) { - if(_instance == null) - { - return; - } - - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in connection monitor thread " + getName() + ":\n" + - sw.toString(); - _instance.initializationData().logger.error(s); + return; } + + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "exception in connection monitor:\n" + sw.toString(); + _instance.initializationData().logger.error(s); } - catch(java.lang.Exception ex) + } + catch(java.lang.Exception ex) + { + synchronized(this) { - synchronized(this) + if(_instance == null) { - if(_instance == null) - { - return; - } - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "unknown exception in connection monitor thread " + getName() + ":\n" + - sw.toString(); - _instance.initializationData().logger.error(s); + return; } + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "unknown exception in connection monitor:\n" + sw.toString(); + _instance.initializationData().logger.error(s); } } } } private Instance _instance; - private final int _interval; private java.util.HashSet _connections = new java.util.HashSet(); } diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index 485ad754cd0..fe06fa2bf0b 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -69,7 +69,7 @@ public final class Instance return _referenceFactory; } - + public synchronized ProxyFactory proxyFactory() { @@ -158,6 +158,22 @@ public final class Instance return _serverThreadPool; } + synchronized public Timer + timer() + { + if(_state == StateDestroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + + if(_timer == null) // Lazy initialization. + { + _timer = new Timer(this); + } + + return _timer; + } + public boolean threadPerConnection() { @@ -460,6 +476,7 @@ public final class Instance IceUtil.Assert.FinalizerAssert(_objectAdapterFactory == null); IceUtil.Assert.FinalizerAssert(_clientThreadPool == null); IceUtil.Assert.FinalizerAssert(_serverThreadPool == null); + IceUtil.Assert.FinalizerAssert(_timer == null); IceUtil.Assert.FinalizerAssert(_routerManager == null); IceUtil.Assert.FinalizerAssert(_locatorManager == null); IceUtil.Assert.FinalizerAssert(_endpointFactoryManager == null); @@ -585,7 +602,7 @@ public final class Instance if(_connectionMonitor != null) { - _connectionMonitor._destroy(); + _connectionMonitor.destroy(); _connectionMonitor = null; } @@ -603,6 +620,12 @@ public final class Instance _clientThreadPool = null; } + if(_timer != null) + { + _timer._destroy(); + _timer = null; + } + if(_servantFactoryManager != null) { _servantFactoryManager.destroy(); @@ -725,6 +748,7 @@ public final class Instance private ObjectAdapterFactory _objectAdapterFactory; private ThreadPool _clientThreadPool; private ThreadPool _serverThreadPool; + private Timer _timer; private final boolean _threadPerConnection; private final int _threadPerConnectionStackSize; private EndpointFactoryManager _endpointFactoryManager; diff --git a/java/src/IceInternal/Timer.java b/java/src/IceInternal/Timer.java new file mode 100644 index 00000000000..194eb5b072f --- /dev/null +++ b/java/src/IceInternal/Timer.java @@ -0,0 +1,298 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +interface TimerTask +{ + void run(); +}; + +public final class Timer extends Thread +{ + // + // Renamed from destroy to _destroy to avoid a deprecation warning caused + // by the destroy method inherited from Thread. + // + public void + _destroy() + { + synchronized(this) + { + if(_instance == null) + { + return; + } + + _instance = null; + notify(); + + _tokens.clear(); + _tasks.clear(); + } + + while(true) + { + try + { + join(); + break; + } + catch(java.lang.InterruptedException ex) + { + } + } + } + + synchronized public void + schedule(TimerTask task, long delay) + { + if(_instance == null) + { + return; + } + + final Token token = new Token(System.currentTimeMillis() + delay, ++_tokenId, 0, task); + + Object previous = _tasks.put(task, token); + assert previous == null; + _tokens.add(token); + + if(token.scheduledTime < _wakeUpTime) + { + notify(); + } + } + + synchronized public void + scheduleRepeated(TimerTask task, long period) + { + if(_instance == null) + { + return; + } + + final Token token = new Token(System.currentTimeMillis() + period, ++_tokenId, period, task); + + Object previous = _tasks.put(task, token); + assert previous == null; + _tokens.add(token); + + if(token.scheduledTime < _wakeUpTime) + { + notify(); + } + } + + synchronized public boolean + cancel(TimerTask task) + { + if(_instance == null) + { + return false; + } + + Token token = (Token)_tasks.remove(task); + if(token == null) + { + return false; + } + + _tokens.remove(token); + return true; + } + + // + // Only for use by Instance. + // + Timer(IceInternal.Instance instance) + { + _instance = instance; + + String threadName = _instance.initializationData().properties.getProperty("Ice.ProgramName"); + if(threadName.length() > 0) + { + threadName += "-"; + } + setName(threadName + "Ice.Timer"); + + start(); + } + + protected synchronized void + finalize() + throws Throwable + { + IceUtil.Assert.FinalizerAssert(_instance == null); + + super.finalize(); + } + + public void + run() + { + Token token = null; + while(true) + { + synchronized(this) + { + if(_instance != null) + { + // + // If the task we just ran is a repeated task, schedule it + // again for executation if it wasn't canceled. + // + if(token != null && token.delay > 0) + { + if(_tasks.containsKey(token.task)) + { + token.scheduledTime = System.currentTimeMillis() + token.delay; + _tokens.add(token); + } + } + } + token = null; + + if(_instance == null) + { + break; + } + + if(_tokens.isEmpty()) + { + _wakeUpTime = Long.MAX_VALUE; + while(true) + { + try + { + wait(); + break; + } + catch(java.lang.InterruptedException ex) + { + } + } + } + + if(_instance == null) + { + break; + } + + while(!_tokens.isEmpty() && _instance != null) + { + long now = System.currentTimeMillis(); + Token first = (Token)_tokens.first(); + if(first.scheduledTime <= now) + { + _tokens.remove(first); + token = first; + if(token.delay == 0) + { + _tasks.remove(token.task); + } + break; + } + + _wakeUpTime = first.scheduledTime; + while(true) + { + try + { + wait(first.scheduledTime - now); + break; + } + catch(java.lang.InterruptedException ex) + { + } + } + } + + if(_instance == null) + { + break; + } + } + + if(token != null) + { + try + { + token.task.run(); + } + catch(Exception ex) + { + synchronized(this) + { + if(_instance != null) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "unexpected exception from task run method in timer thread:\n" + sw.toString(); + _instance.initializationData().logger.error(s); + } + } + } + } + } + } + + static private class Token implements Comparable + { + public + Token(long scheduledTime, int id, long delay, TimerTask task) + { + this.scheduledTime = scheduledTime; + this.id = id; + this.delay = delay; + this.task = task; + } + + public int + compareTo(Object o) + { + // + // Token are sorted by scheduled time and token id. + // + Token r = (Token)o; + if(scheduledTime < r.scheduledTime) + { + return -1; + } + else if(scheduledTime > r.scheduledTime) + { + return 1; + } + + if(id < r.id) + { + return -1; + } + else if(id > r.id) + { + return 1; + } + + return 0; + } + + long scheduledTime; + int id; // Since we can't compare references, we need to use another id. + long delay; + TimerTask task; + } + + private final java.util.SortedSet _tokens = new java.util.TreeSet(); + private final java.util.Map _tasks = new java.util.HashMap(); + private Instance _instance; + private long _wakeUpTime = Long.MAX_VALUE; + private int _tokenId = 0; +} |