diff options
author | Matthew Newhook <matthew@zeroc.com> | 2014-07-23 15:06:02 -0230 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2014-07-23 15:06:02 -0230 |
commit | 866f9ff17391176b836f9bb49f6da40c2c938441 (patch) | |
tree | 7366963294ef3356c7b887cd89af753988c21beb /java/src | |
parent | adding ACM tests for Python/Ruby/PHP (diff) | |
download | ice-866f9ff17391176b836f9bb49f6da40c2c938441.tar.bz2 ice-866f9ff17391176b836f9bb49f6da40c2c938441.tar.xz ice-866f9ff17391176b836f9bb49f6da40c2c938441.zip |
ICE-4234 - Update Ice to use current Java threading constructs
- Use ScheduledThreadPoolDispatcher not IceUtilInternal.Timer.
- Use Ice timer in glacier2, Freeze impl.
- Align C++, C# with java changes.
- Database demo now supports mariadb.
Diffstat (limited to 'java/src')
23 files changed, 546 insertions, 1179 deletions
diff --git a/java/src/Freeze/BackgroundSaveEvictorI.java b/java/src/Freeze/BackgroundSaveEvictorI.java index adb128e3c87..581f083feda 100644 --- a/java/src/Freeze/BackgroundSaveEvictorI.java +++ b/java/src/Freeze/BackgroundSaveEvictorI.java @@ -43,89 +43,6 @@ class BackgroundSaveEvictorI extends EvictorI implements BackgroundSaveEvictor, // static final byte dead = 4; - // - // The WatchDogThread is used by the saving thread to ensure the - // streaming of some object does not take more than timeout ms. - // We only measure the time necessary to acquire the lock on the - // object (servant), not the streaming itself. - // - class WatchDogThread extends Thread - { - WatchDogThread(long timeout, String name) - { - super(name); - _timeout = timeout; - assert timeout > 0; - } - - public synchronized void - run() - { - while(!_done) - { - long startTime = 0; - - try - { - if(_active) - { - startTime = IceInternal.Time.currentMonotonicTimeMillis(); - wait(_timeout); - } - else - { - wait(); - } - } - catch(InterruptedException e) - { - // - // Ignore - // - } - - if(!_done && _active && startTime > 0) - { - // - // Did we timeout? - // - if(IceInternal.Time.currentMonotonicTimeMillis() - startTime >= _timeout) - { - _communicator.getLogger().error(_errorPrefix + - "Fatal error: streaming watch dog thread timed out"); - - Util.handleFatalError(BackgroundSaveEvictorI.this, _communicator, null); - } - } - } - } - - synchronized void - activate() - { - _active = true; - notify(); - } - - synchronized void - deactivate() - { - _active = false; - notify(); - } - - synchronized void - terminate() - { - _done = true; - notify(); - } - - private final long _timeout; - private boolean _done = false; - private boolean _active = false; - } - BackgroundSaveEvictorI(Ice.ObjectAdapter adapter, String envName, String filename, ServantInitializer initializer, Index[] indices, boolean createDb) { @@ -162,6 +79,16 @@ class BackgroundSaveEvictorI extends EvictorI implements BackgroundSaveEvictor, } // + // By default, no stream timeout + // + _streamTimeout = + _communicator.getProperties().getPropertyAsIntWithDefault(propertyPrefix + ".StreamTimeout", 0) * 1000; + if(_streamTimeout > 0) + { + _timer = IceInternal.Util.getInstance(_communicator).timer(); + } + + // // Start threads // String savingThreadName; @@ -175,21 +102,7 @@ class BackgroundSaveEvictorI extends EvictorI implements BackgroundSaveEvictor, { savingThreadName = ""; } - String watchDogThreadName = savingThreadName + "FreezeEvictorWatchDogThread(" + envName + '.' + _filename + ")"; savingThreadName += "FreezeEvictorThread(" + envName + '.' + _filename + ")"; - - // - // By default, no stream timeout - // - long streamTimeout = - _communicator.getProperties().getPropertyAsIntWithDefault(propertyPrefix + ".StreamTimeout", 0) * 1000; - - if(streamTimeout > 0) - { - _watchDogThread = new WatchDogThread(streamTimeout, watchDogThreadName); - _watchDogThread.start(); - } - _thread = new Thread(this, savingThreadName); _thread.start(); } @@ -936,19 +849,6 @@ class BackgroundSaveEvictorI extends EvictorI implements BackgroundSaveEvictor, { } - if(_watchDogThread != null) - { - _watchDogThread.terminate(); - - try - { - _watchDogThread.join(); - } - catch(InterruptedException ex) - { - } - } - closeDbEnv(); } finally @@ -1084,15 +984,31 @@ class BackgroundSaveEvictorI extends EvictorI implements BackgroundSaveEvictor, // Lock servant and then facet so that user can safely lock // servant and call various Evictor operations // - if(_watchDogThread != null) + java.util.concurrent.Future<?> future = null; + if(_timer != null) { - _watchDogThread.activate(); + // + // The timer is used to ensure the streaming of some object does not take more than + // timeout ms. We only measure the time necessary to acquire the lock on the object + // (servant), not the streaming itself. + // + future = _timer.schedule(new Runnable() + { + public void run() + { + _communicator.getLogger().error(_errorPrefix + + "Fatal error: streaming watch dog timed out"); + + Util.handleFatalError(BackgroundSaveEvictorI.this, _communicator, null); + } + }, _streamTimeout, java.util.concurrent.TimeUnit.MILLISECONDS); } synchronized(servant) { - if(_watchDogThread != null) + if(future != null) { - _watchDogThread.deactivate(); + future.cancel(false); + future = null; } synchronized(element) @@ -1526,7 +1442,8 @@ class BackgroundSaveEvictorI extends EvictorI implements BackgroundSaveEvictor, private java.util.List<EvictorElement> _modifiedQueue = new java.util.ArrayList<EvictorElement>(); private boolean _savingThreadDone = false; - private WatchDogThread _watchDogThread = null; + private java.util.concurrent.ScheduledExecutorService _timer; + private long _streamTimeout; // // Threads that have requested a "saveNow" and are waiting for diff --git a/java/src/Glacier2/Application.java b/java/src/Glacier2/Application.java index 2195eeb83c7..edc6cc9a214 100644 --- a/java/src/Glacier2/Application.java +++ b/java/src/Glacier2/Application.java @@ -134,8 +134,9 @@ public abstract class Application extends Ice.Application /** * Called when the session refresh thread detects that the session has been * destroyed. A subclass can override this method to take action after the - * loss of connectivity with the Glacier2 router. This method is always - * called from the session refresh thread. + * loss of connectivity with the Glacier2 router. This method is called + * according to the Ice invocation dipsatch rules (in other words, it + * uses the same rules as an servant upcall or AMI callback). **/ public void sessionDestroyed() @@ -231,93 +232,6 @@ public abstract class Application extends Ice.Application return _adapter; } - private class SessionPingThread extends Thread - { - SessionPingThread(Glacier2.RouterPrx router, long period) - { - _router = router; - _period = period; - _done = false; - } - - synchronized public void - run() - { - while(true) - { - try - { - _router.begin_refreshSession(new Glacier2.Callback_Router_refreshSession() - { - public void - response() - { - } - - public void - exception(Ice.LocalException ex) - { - // - // Here the session has gone. The thread terminates, and we notify the - // application that the session has been destroyed. - // - done(); - sessionDestroyed(); - } - - public void - exception(Ice.UserException ex) - { - // - // Here the session has gone. The thread terminates, and we notify the - // application that the session has been destroyed. - // - done(); - sessionDestroyed(); - } - }); - } - catch(Ice.CommunicatorDestroyedException ex) - { - // - // AMI requests can raise CommunicatorDestroyedException directly. - // - break; - } - - if(!_done) - { - try - { - wait(_period); - } - catch(InterruptedException ex) - { - } - } - - if(_done) - { - break; - } - } - } - - public synchronized void - done() - { - if(!_done) - { - _done = true; - notify(); - } - } - - private final Glacier2.RouterPrx _router; - private final long _period; - private boolean _done = false; - } - private class ConnectionCallbackI implements Ice.ConnectionCallback { public void heartbeat(Ice.Connection con) @@ -373,7 +287,7 @@ public abstract class Application extends Ice.Application boolean restart = false; status.value = 0; - SessionPingThread ping = null; + try { _communicator = Ice.Util.initialize(argHolder, initData); @@ -432,8 +346,50 @@ public abstract class Application extends Ice.Application long timeout = _router.getSessionTimeout(); if(timeout > 0) { - ping = new SessionPingThread(_router, (timeout * 1000) / 2); - ping.start(); + java.util.concurrent.ScheduledExecutorService timer = + IceInternal.Util.getInstance(_communicator).timer(); + // + // We don't need to cancel the task as the communicator is destroyed at the end and + // ContinueExistingPeriodicTasksAfterShutdownPolicy is false. + // + timer.scheduleAtFixedRate(new Runnable() + { + public void run() + { + _router.begin_refreshSession(new Glacier2.Callback_Router_refreshSession() + { + public void + response() + { + } + + public void + exception(Ice.LocalException ex) + { + // + // Here the session has gone and we notify the application that + // the session has been destroyed. + // + sessionDestroyed(); + } + + public void + exception(Ice.UserException ex) + { + // + // Here the session has gone and we notify the application that + // the session has been destroyed. + // + sessionDestroyed(); + } + }); + // + // AMI requests can raise CommunicatorDestroyedException directly. We let this + // out of the task and terminate the timer. + // + + } + }, timeout / 2, timeout / 2, java.util.concurrent.TimeUnit.SECONDS); } } _category = _router.getCategoryForClient(); @@ -531,23 +487,6 @@ public abstract class Application extends Ice.Application } } - if(ping != null) - { - ping.done(); - while(true) - { - try - { - ping.join(); - break; - } - catch(InterruptedException ex) - { - } - } - ping = null; - } - if(_createdSession && _router != null) { try diff --git a/java/src/Glacier2/SessionHelper.java b/java/src/Glacier2/SessionHelper.java index 2afe4493e4a..0e5e0abe748 100644 --- a/java/src/Glacier2/SessionHelper.java +++ b/java/src/Glacier2/SessionHelper.java @@ -14,82 +14,6 @@ package Glacier2; */ public class SessionHelper { - private class SessionRefreshThread extends Thread - { - SessionRefreshThread(Glacier2.RouterPrx router, long period) - { - _router = router; - _period = period; - _done = false; - } - - synchronized public void - run() - { - while(true) - { - try - { - _router.begin_refreshSession(new Glacier2.Callback_Router_refreshSession() - { - public void response() - { - } - - public void exception(Ice.LocalException ex) - { - done(); - SessionHelper.this.destroy(); - } - - public void exception(Ice.UserException ex) - { - done(); - SessionHelper.this.destroy(); - } - }); - } - catch(Ice.CommunicatorDestroyedException ex) - { - // - // AMI requests can raise CommunicatorDestroyedException directly. - // - break; - } - - if(!_done) - { - try - { - wait(_period); - } - catch(InterruptedException ex) - { - } - } - - if(_done) - { - break; - } - } - } - - synchronized public void - done() - { - if(!_done) - { - _done = true; - notify(); - } - } - - private final Glacier2.RouterPrx _router; - private final long _period; - private boolean _done = false; - } - private class ConnectionCallbackI implements Ice.ConnectionCallback { public ConnectionCallbackI(SessionHelper sessionHelper) @@ -381,7 +305,6 @@ public class SessionHelper _session = session; _connected = true; - assert _refreshThread == null; if(acmTimeout > 0) { Ice.Connection connection = _router.ice_getCachedConnection(); @@ -393,8 +316,38 @@ public class SessionHelper } else if(sessionTimeout > 0) { - _refreshThread = new SessionRefreshThread(_router, (sessionTimeout * 1000)/2); - _refreshThread.start(); + java.util.concurrent.ScheduledExecutorService timer = + IceInternal.Util.getInstance(_communicator).timer(); + // + // We don't need to cancel the task as the communicator is destroyed at the end and + // ContinueExistingPeriodicTasksAfterShutdownPolicy is false. + // + timer.scheduleAtFixedRate(new Runnable() + { + public void run() + { + _router.begin_refreshSession(new Glacier2.Callback_Router_refreshSession() + { + public void response() + { + } + + public void exception(Ice.LocalException ex) + { + SessionHelper.this.destroy(); + } + + public void exception(Ice.UserException ex) + { + SessionHelper.this.destroy(); + } + }); + // + // AMI requests can raise CommunicatorDestroyedException directly. We let this + // out of the task and terminate the timer. + // + } + }, sessionTimeout / 2, sessionTimeout / 2, java.util.concurrent.TimeUnit.SECONDS); } _shutdownHook = new Thread("Shutdown hook") @@ -445,7 +398,6 @@ public class SessionHelper assert _destroy; Glacier2.RouterPrx router = null; Ice.Communicator communicator = null; - SessionRefreshThread refreshThread = null; synchronized(this) { if(_router == null) @@ -456,9 +408,6 @@ public class SessionHelper router = _router; _router = null; - refreshThread = _refreshThread; - _refreshThread = null; - communicator = _communicator; _communicator = null; } @@ -489,22 +438,6 @@ public class SessionHelper communicator.getLogger().warning("SessionHelper: unexpected exception when destroying the session:\n" + e); } - if(refreshThread != null) - { - refreshThread.done(); - while(true) - { - try - { - refreshThread.join(); - break; - } - catch(InterruptedException e) - { - } - } - } - try { communicator.destroy(); @@ -639,7 +572,6 @@ public class SessionHelper private Glacier2.SessionPrx _session; private String _category; - private SessionRefreshThread _refreshThread; private final SessionCallback _callback; private boolean _destroy = false; private boolean _connected = false; diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java index 8667a1afb88..de8c225d910 100644 --- a/java/src/Ice/AsyncResult.java +++ b/java/src/Ice/AsyncResult.java @@ -517,6 +517,7 @@ public class AsyncResult protected IceInternal.BasicStream _os; protected IceInternal.RequestHandler _timeoutRequestHandler; + protected java.util.concurrent.Future<?> _future; protected static final byte OK = 0x1; protected static final byte Done = 0x2; diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index b84064da6ba..bc6fa849e22 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -19,10 +19,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne void connectionStartFailed(ConnectionI connection, Ice.LocalException ex); } - private class TimeoutCallback implements IceInternal.TimerTask + private class TimeoutCallback implements Runnable { public void - runTimerTask() + run() { timedOut(); } @@ -1772,9 +1772,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _traceLevels = instance.traceLevels(); // Cached for better performance. _timer = instance.timer(); _writeTimeout = new TimeoutCallback(); - _writeTimeoutScheduled = false; + _writeTimeoutFuture = null; _readTimeout = new TimeoutCallback(); - _readTimeoutScheduled = false; + _readTimeoutFuture = null; _warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0; _warnUdp = instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; _cacheBuffers = instance.cacheMessageBuffers(); @@ -2874,21 +2874,21 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if((status & IceInternal.SocketOperation.Read) != 0) { - if(_readTimeoutScheduled) + if(_readTimeoutFuture != null) { - _timer.cancel(_readTimeout); + _readTimeoutFuture.cancel(false); } - _timer.schedule(_readTimeout, timeout); - _readTimeoutScheduled = true; + _readTimeoutFuture = _timer.schedule(_readTimeout, timeout, + java.util.concurrent.TimeUnit.MILLISECONDS); } if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0) { - if(_writeTimeoutScheduled) + if(_writeTimeoutFuture != null) { - _timer.cancel(_writeTimeout); + _writeTimeoutFuture.cancel(false); } - _timer.schedule(_writeTimeout, timeout); - _writeTimeoutScheduled = true; + _writeTimeoutFuture = _timer.schedule(_writeTimeout, timeout, + java.util.concurrent.TimeUnit.MILLISECONDS); } } catch(Throwable ex) @@ -2900,16 +2900,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private void unscheduleTimeout(int status) { - if((status & IceInternal.SocketOperation.Read) != 0 && _readTimeoutScheduled) + if((status & IceInternal.SocketOperation.Read) != 0 && _readTimeoutFuture != null) { - _timer.cancel(_readTimeout); - _readTimeoutScheduled = false; + _readTimeoutFuture.cancel(false); + _readTimeoutFuture = null; } if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0 && - _writeTimeoutScheduled) + _writeTimeoutFuture != null) { - _timer.cancel(_writeTimeout); - _writeTimeoutScheduled = false; + _writeTimeoutFuture.cancel(false); + _writeTimeoutFuture = null; } } @@ -3159,11 +3159,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private final IceInternal.TraceLevels _traceLevels; private final IceInternal.ThreadPool _threadPool; - private final IceInternal.Timer _timer; - private final IceInternal.TimerTask _writeTimeout; - private boolean _writeTimeoutScheduled; - private final IceInternal.TimerTask _readTimeout; - private boolean _readTimeoutScheduled; + private final java.util.concurrent.ScheduledExecutorService _timer; + private final Runnable _writeTimeout; + private java.util.concurrent.Future<?> _writeTimeoutFuture; + private final Runnable _readTimeout; + private java.util.concurrent.Future<?> _readTimeoutFuture; private StartCallback _startCallback = null; diff --git a/java/src/IceDiscovery/LookupI.java b/java/src/IceDiscovery/LookupI.java index b7027ffadd7..56b7c1383c1 100644 --- a/java/src/IceDiscovery/LookupI.java +++ b/java/src/IceDiscovery/LookupI.java @@ -16,54 +16,69 @@ import java.util.HashMap; class LookupI extends _LookupDisp { - abstract private class Request<T, AmdCB> implements IceInternal.TimerTask + abstract private class Request<T, AmdCB> implements Runnable { - public Request(T id, int retryCount) + Request(T id, int retryCount) { _id = id; _nRetry = retryCount; } - public T + T getId() { return _id; } - - public boolean + + boolean addCallback(AmdCB cb) { _callbacks.add(cb); return _callbacks.size() == 1; } - public boolean + boolean retry() { return --_nRetry >= 0; } + void + scheduleTimer(long timeout) + { + _future = _timer.schedule(this, timeout, java.util.concurrent.TimeUnit.MILLISECONDS); + } + + void + cancelTimer() + { + assert _future != null; + _future.cancel(false); + _future = null; + } + protected int _nRetry; protected List<AmdCB> _callbacks = new ArrayList<AmdCB>(); private T _id; + protected java.util.concurrent.Future<?> _future; }; private class AdapterRequest extends Request<String, Ice.AMD_Locator_findAdapterById> { - public AdapterRequest(String id, int retryCount) + AdapterRequest(String id, int retryCount) { super(id, retryCount); _start = System.nanoTime(); _latency = 0; } - public boolean + boolean retry() { return _proxies.size() == 0 && --_nRetry >= 0; } - - public boolean + + boolean response(Ice.ObjectPrx proxy, boolean isReplicaGroup) { if(isReplicaGroup) @@ -76,8 +91,8 @@ class LookupI extends _LookupDisp { _latency = 1; // 1ms } - _timer.cancel(this); - _timer.schedule(this, _latency); + cancelTimer(); + scheduleTimer(_latency); } return false; } @@ -85,7 +100,7 @@ class LookupI extends _LookupDisp return true; } - public void + void finished(Ice.ObjectPrx proxy) { if(proxy != null || _proxies.isEmpty()) @@ -111,9 +126,9 @@ class LookupI extends _LookupDisp } sendResponse(result.ice_endpoints(endpoints.toArray(new Ice.Endpoint[endpoints.size()]))); } - - public void - runTimerTask() + + public void + run() { adapterRequestTimedOut(this); } @@ -135,19 +150,18 @@ class LookupI extends _LookupDisp private class ObjectRequest extends Request<Ice.Identity, Ice.AMD_Locator_findObjectById> { - public ObjectRequest(Ice.Identity id, int retryCount) { super(id, retryCount); } - public void + void response(Ice.ObjectPrx proxy) { finished(proxy); } - - public void + + void finished(Ice.ObjectPrx proxy) { for(Ice.AMD_Locator_findObjectById cb : _callbacks) @@ -157,7 +171,8 @@ class LookupI extends _LookupDisp _callbacks.clear(); } - public void runTimerTask() + public void + run() { objectRequestTimedOut(this); } @@ -230,7 +245,7 @@ class LookupI extends _LookupDisp if(request.addCallback(cb)) { _lookup.begin_findObjectById(_domainId, id, _lookupReply); - _timer.schedule(request, _timeout); + request.scheduleTimer(_timeout); } } @@ -247,7 +262,7 @@ class LookupI extends _LookupDisp if(request.addCallback(cb)) { _lookup.begin_findAdapterById(_domainId, adapterId, _lookupReply); - _timer.schedule(request, _timeout); + request.scheduleTimer(_timeout); } } @@ -261,7 +276,7 @@ class LookupI extends _LookupDisp } request.response(proxy); - _timer.cancel(request); + request.cancelTimer(); _objectRequests.remove(id); } @@ -276,7 +291,7 @@ class LookupI extends _LookupDisp if(request.response(proxy, isReplicaGroup)) { - _timer.cancel(request); + request.cancelTimer(); _adapterRequests.remove(adapterId); } } @@ -293,7 +308,7 @@ class LookupI extends _LookupDisp if(request.retry()) { _lookup.begin_findObjectById(_domainId, request.getId(), _lookupReply); - _timer.schedule(request, _timeout); + request.scheduleTimer(_timeout); } else { @@ -314,7 +329,7 @@ class LookupI extends _LookupDisp if(request.retry()) { _lookup.begin_findAdapterById(_domainId, request.getId(), _lookupReply); - _timer.schedule(request, _timeout); + request.scheduleTimer(_timeout); } else { @@ -331,7 +346,7 @@ class LookupI extends _LookupDisp private final int _latencyMultiplier; private final String _domainId; - private final IceInternal.Timer _timer; + private final java.util.concurrent.ScheduledExecutorService _timer; private Map<Ice.Identity, ObjectRequest> _objectRequests = new HashMap<Ice.Identity, ObjectRequest>(); private Map<String, AdapterRequest> _adapterRequests = new HashMap<String, AdapterRequest>(); diff --git a/java/src/IceGrid/DiscoveryPluginI.java b/java/src/IceGrid/DiscoveryPluginI.java index cf9cfa66a29..6dea39c4e89 100644 --- a/java/src/IceGrid/DiscoveryPluginI.java +++ b/java/src/IceGrid/DiscoveryPluginI.java @@ -15,28 +15,25 @@ import java.util.ArrayList; class DiscoveryPluginI implements Ice.Plugin { - abstract private class Request { - public Request(LocatorI locator) { _locator = locator; } - abstract public void + abstract void invoke(Ice.LocatorPrx locator); - abstract public void + abstract void response(Ice.ObjectPrx locator); - protected LocatorI _locator; - protected Ice.LocatorPrx _locatorPrx; + LocatorI _locator; + Ice.LocatorPrx _locatorPrx; }; - private class LocatorI extends Ice._LocatorDisp implements IceInternal.TimerTask + private class LocatorI extends Ice._LocatorDisp { - public LocatorI(LookupPrx lookup, Ice.Properties properties) { _lookup = lookup; @@ -58,13 +55,13 @@ class DiscoveryPluginI implements Ice.Plugin public synchronized void findObjectById_async(Ice.AMD_Locator_findObjectById amdCB, Ice.Identity id, Ice.Current curr) { - ((LocatorI)this).invoke(null, new ObjectRequest((LocatorI)this, id, amdCB)); + invoke(null, new ObjectRequest(this, id, amdCB)); } public synchronized void findAdapterById_async(Ice.AMD_Locator_findAdapterById amdCB, String adapterId, Ice.Current curr) { - ((LocatorI)this).invoke(null, new AdapterRequest((LocatorI)this, adapterId, amdCB)); + invoke(null, new AdapterRequest(this, adapterId, amdCB)); } @@ -74,12 +71,12 @@ class DiscoveryPluginI implements Ice.Plugin Ice.LocatorPrx locator; if(_locator != null) { - ((LocatorI)this).queueRequest(null); // Search for locator if not already doing so. + queueRequest(null); // Search for locator if not already doing so. while(_pendingRetryCount > 0) { try { - wait(); + wait(); } catch(java.lang.InterruptedException ex) { @@ -121,7 +118,9 @@ class DiscoveryPluginI implements Ice.Plugin if(_pendingRetryCount > 0) // No need to retry, we found a locator { - _timer.cancel(this); + _future.cancel(false); + _future = null; + _pendingRetryCount = 0; } @@ -131,7 +130,8 @@ class DiscoveryPluginI implements Ice.Plugin // We found another locator replica, append its endpoints to the // current locator proxy endpoints. // - List<Ice.Endpoint> newEndpoints = new ArrayList<Ice.Endpoint>(Arrays.asList(_locator.ice_getEndpoints())); + List<Ice.Endpoint> newEndpoints = new ArrayList<Ice.Endpoint>( + Arrays.asList(_locator.ice_getEndpoints())); for(Ice.Endpoint p : locator.ice_getEndpoints()) { // @@ -152,7 +152,8 @@ class DiscoveryPluginI implements Ice.Plugin } } - _locator = (LocatorPrx)_locator.ice_endpoints(newEndpoints.toArray(new Ice.Endpoint[newEndpoints.size()])); + _locator = (LocatorPrx)_locator.ice_endpoints( + newEndpoints.toArray(new Ice.Endpoint[newEndpoints.size()])); } else { @@ -178,66 +179,64 @@ class DiscoveryPluginI implements Ice.Plugin public synchronized void invoke(Ice.LocatorPrx locator, Request request) { - if(_locator != null && !(_locator.equals(locator))) { - request.invoke(_locator); } else { - _locator = null; queueRequest(request); } } - public void runTimerTask() + private Runnable _retryTask = new Runnable() { - synchronized(this) + public void run() { - - if(--_pendingRetryCount > 0) + synchronized(LocatorI.this) { - - _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request. - _timer.schedule(this, _timeout); - } - else - { - assert !_pendingRequests.isEmpty(); - for(Request req : _pendingRequests) + if(--_pendingRetryCount > 0) { - req.response(null); + _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request. + _future = _timer.schedule(_retryTask, _timeout, java.util.concurrent.TimeUnit.MILLISECONDS); } - _pendingRequests.clear(); - notifyAll(); + else + { + assert !_pendingRequests.isEmpty(); + for(Request req : _pendingRequests) + { + req.response(null); + } + _pendingRequests.clear(); + notifyAll(); + } } + } - } + }; private void queueRequest(Request request) { if(request != null) { - _pendingRequests.add(request); } if(_pendingRetryCount == 0) // No request in progress { - _pendingRetryCount = _retryCount; _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request. - _timer.schedule(this, _timeout); + _future = _timer.schedule(_retryTask, _timeout, java.util.concurrent.TimeUnit.MILLISECONDS); } } private final LookupPrx _lookup; private final int _timeout; - private final IceInternal.Timer _timer; + private java.util.concurrent.Future<?> _future; + private final java.util.concurrent.ScheduledExecutorService _timer; private final int _retryCount; private String _instanceName; @@ -251,7 +250,7 @@ class DiscoveryPluginI implements Ice.Plugin private class LookupReplyI extends _LookupReplyDisp { - public LookupReplyI(LocatorI locator) + LookupReplyI(LocatorI locator) { _locator = locator; } @@ -267,7 +266,6 @@ class DiscoveryPluginI implements Ice.Plugin class ObjectRequest extends Request { - public ObjectRequest(LocatorI locator, Ice.Identity id, Ice.AMD_Locator_findObjectById amdCB) { super(locator); @@ -275,7 +273,7 @@ class DiscoveryPluginI implements Ice.Plugin _amdCB = amdCB; } - public void + void invoke(Ice.LocatorPrx l) { _locatorPrx = l; @@ -301,13 +299,13 @@ class DiscoveryPluginI implements Ice.Plugin }); } - public void + void response(Ice.ObjectPrx prx) { _amdCB.ice_response(prx); } - public void + void exception(Exception ex) { _locator.invoke(_locatorPrx, this); @@ -317,16 +315,16 @@ class DiscoveryPluginI implements Ice.Plugin private final Ice.AMD_Locator_findObjectById _amdCB; }; - class AdapterRequest extends Request { + class AdapterRequest extends Request + { - public AdapterRequest(LocatorI locator, String adapterId, Ice.AMD_Locator_findAdapterById amdCB) { super(locator); _adapterId = adapterId; _amdCB = amdCB; } - public void + void invoke(Ice.LocatorPrx l) { _locatorPrx = l; @@ -353,13 +351,13 @@ class DiscoveryPluginI implements Ice.Plugin }); } - public void + void response(Ice.ObjectPrx prx) { _amdCB.ice_response(prx); } - public void + void exception(Exception ex) { _locator.invoke(_locatorPrx, this); // Retry with new locator proxy. @@ -384,7 +382,7 @@ class DiscoveryPluginI implements Ice.Plugin String address; if(ipv4) { - address = properties.getPropertyWithDefault("IceGridDiscovery.Address", "239.255.0.1"); + address = properties.getPropertyWithDefault("IceGridDiscovery.Address", "239.255.0.1"); } else { diff --git a/java/src/IceGridGUI/Coordinator.java b/java/src/IceGridGUI/Coordinator.java index 911b05a388d..2f861b4f8cc 100644 --- a/java/src/IceGridGUI/Coordinator.java +++ b/java/src/IceGridGUI/Coordinator.java @@ -3150,6 +3150,20 @@ public class Coordinator _liveDeploymentPane = new LiveDeploymentPane(_liveDeploymentRoot); _mainPane = new MainPane(this); _mainFrame.getContentPane().add(_mainPane, BorderLayout.CENTER); + + java.util.concurrent.ScheduledThreadPoolExecutor executor = + new java.util.concurrent.ScheduledThreadPoolExecutor(1, + new java.util.concurrent.ThreadFactory() + { + public Thread newThread(Runnable r) + { + Thread t = new Thread(r); + t.setName("Pinger"); + return t; + } + }); + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + _executor = executor; } public IGraphView createGraphView() @@ -3349,6 +3363,10 @@ public class Coordinator destroyIceGridAdmin(); destroyCommunicator(); destroyWizardCommunicator(); + + _executor.shutdown(); + _executor = null; + Runtime.getRuntime().removeShutdownHook(_shutdownHook); _mainFrame.dispose(); Runtime.getRuntime().exit(status); @@ -3648,6 +3666,11 @@ public class Coordinator return _graphViews.toArray(new IGraphView[_graphViews.size()]); } + public java.util.concurrent.ScheduledExecutorService getExecutor() + { + return _executor; + } + // // May run in any thread // @@ -3896,5 +3919,7 @@ public class Coordinator private java.util.List<IGraphView> _graphViews = new java.util.ArrayList<IGraphView>(); + private java.util.concurrent.ScheduledExecutorService _executor; + static private final int HISTORY_MAX_SIZE = 20; } diff --git a/java/src/IceGridGUI/LiveDeployment/GraphView.java b/java/src/IceGridGUI/LiveDeployment/GraphView.java index 9ee26b7cc96..3d006331e2a 100644 --- a/java/src/IceGridGUI/LiveDeployment/GraphView.java +++ b/java/src/IceGridGUI/LiveDeployment/GraphView.java @@ -131,95 +131,6 @@ import java.util.prefs.BackingStoreException; public class GraphView extends JFrame implements MetricsFieldContext, Coordinator.IGraphView { - class WorkQueue extends Thread - { - private class WorkItem - { - public WorkItem(Runnable runnable, boolean javafx) - { - this.runnable = runnable; - this.javafx = javafx; - } - - Runnable runnable; - boolean javafx; - } - - public void run() - { - while(true) - { - WorkItem item = null; - synchronized(this) - { - while(_queue.isEmpty()) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } - assert !_queue.isEmpty(); - item = _queue.remove(0); - } - - final java.util.concurrent.Semaphore sem = new java.util.concurrent.Semaphore(0); - final Runnable r = item.runnable; - if(item.javafx) - { - Platform.runLater(new Runnable() - { - @Override - public void run() - { - try - { - r.run(); - } - finally - { - sem.release(); - } - } - }); - } - else - { - SwingUtilities.invokeLater(new Runnable() - { - @Override - public void run() - { - try - { - r.run(); - } - finally - { - sem.release(); - } - } - }); - } - sem.acquireUninterruptibly(); - } - } - - synchronized public void enqueue(Runnable runnable, boolean javafx) - { - if(_queue.isEmpty()) - { - notify(); - } - _queue.add(new WorkItem(runnable, javafx)); - } - - private java.util.List<WorkItem> _queue = new java.util.LinkedList<WorkItem>(); - } - class TimeFormatter extends StringConverter<java.lang.Number> { TimeFormatter(String format) @@ -318,99 +229,9 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato } } - private class RefreshThread extends Thread - { - RefreshThread(int period) - { - _period = period; - _done = false; - } - - synchronized void setRefreshPeriod(int period) - { - _period = period; - notify(); - } - - synchronized public void - run() - { - while(true) - { - java.util.Set<MetricsViewInfo> metrics = null; - synchronized(GraphView.this) - { - metrics = new java.util.HashSet<MetricsViewInfo>(_series.keySet()); - } - - for(final MetricsViewInfo m : metrics) - { - IceMX.Callback_MetricsAdmin_getMetricsView cb = new IceMX.Callback_MetricsAdmin_getMetricsView() - { - public void response(final java.util.Map<java.lang.String, IceMX.Metrics[]> data, - long timestamp) - { - addData(m, data, timestamp); - } - - public void exception(final Ice.LocalException e) - { - addData(m, null, 0); - } - - public void exception(final Ice.UserException e) - { - addData(m, null, 0); - } - }; - try - { - m.admin.begin_getMetricsView(m.view, cb); - } - catch(Ice.LocalException e) - { - addData(m, null, 0); - } - } - if(!_done) - { - try - { - wait(_period * 1000); - } - catch(InterruptedException ex) - { - } - } - - if(_done) - { - break; - } - } - } - - synchronized public void - done() - { - if(!_done) - { - _done = true; - notify(); - } - } - - private int _period; - private boolean _done = false; - } - public GraphView(Coordinator coordinator, String title) { _coordinator = coordinator; - _queue = new WorkQueue(); - _queue.setDaemon(true); - _queue.start(); - setTitle(title); _preferences = Preferences.userNodeForPackage(getClass()); @@ -575,7 +396,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato _series.remove(row.info); if(_series.size() == 0) { - stopRefreshThread(); + stopRefresh(); } } } @@ -585,7 +406,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato // // Remove series from the chart, in JavaFx thread. // - _queue.enqueue(new Runnable() + enqueueJFX(new Runnable() { @Override public void run() @@ -616,7 +437,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato } } } - }, true); + }); } }; delete.setEnabled(false); @@ -716,7 +537,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato // // initialize the scene in JavaFX thread. // - _queue.enqueue(new Runnable() + enqueueJFX(new Runnable() { @Override public void run() @@ -773,7 +594,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato }); fxPanel.setScene(scene); } - }, true); + }); pack(); if(!loadPreferences()) @@ -842,7 +663,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato // // Remove series from the chart, in JavaFx thread. // - _queue.enqueue(new Runnable() + enqueueJFX(new Runnable() { @Override public void run() @@ -874,7 +695,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato } } } - }, true); + }); } private boolean showInfo() @@ -985,7 +806,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato public void close() { storePreferences(); - stopRefreshThread(); + stopRefresh(); setVisible(false); _coordinator.removeGraphView(GraphView.this); dispose(); @@ -996,7 +817,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato // // Must run in JavaFX thread. // - _queue.enqueue(new Runnable() + enqueueJFX(new Runnable() { @Override public void run() @@ -1059,7 +880,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato // // Must run in Swing thread. // - _queue.enqueue(new Runnable() + enqueueSwing(new Runnable() { public void run() { @@ -1070,7 +891,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato _legendTable.setRowSelectionInterval(i, i); } } - }, false); + }); } } }); @@ -1078,22 +899,22 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato // // Add the serie to the legend, must run in Swing thread. // - _queue.enqueue(new Runnable() + enqueueSwing(new Runnable() { public void run() { _legendModel.addRow(row); } - }, false); + }); } } } if(_chart.getData().size() > 0) { - startRefreshThread(); + startRefresh(); } } - }, true); + }); } // @@ -1141,7 +962,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato // // Must run in Swing thread. // - _queue.enqueue(new Runnable() + enqueueSwing(new Runnable() { public void run() { @@ -1152,7 +973,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato _legendTable.setRowSelectionInterval(i, i); } } - }, false); + }); } } }); @@ -1165,7 +986,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato // // Update the graph series in JavaFX thread. // - _queue.enqueue(new Runnable() + enqueueJFX(new Runnable() { @Override public void run() @@ -1268,7 +1089,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato // // Fire an event on the legend model to update all cells. // - _queue.enqueue(new Runnable() + enqueueSwing(new Runnable() { public void run() { @@ -1277,9 +1098,9 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato TableModelEvent.ALL_COLUMNS, TableModelEvent.UPDATE)); } - }, false); + }); } - }, true); + }); } int seriesSize(MetricsRow row) @@ -1327,21 +1148,60 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato } } - synchronized private void startRefreshThread() + synchronized private void startRefresh() { - if(_refreshThread == null) + if(_refreshFuture == null) { - _refreshThread = new RefreshThread(getRefreshPeriod()); - _refreshThread.start(); + _refreshFuture = _coordinator.getExecutor().scheduleAtFixedRate(new Runnable() + { + public void run() + { + java.util.Set<MetricsViewInfo> metrics = null; + synchronized(GraphView.this) + { + metrics = new java.util.HashSet<MetricsViewInfo>(_series.keySet()); + } + + for(final MetricsViewInfo m : metrics) + { + IceMX.Callback_MetricsAdmin_getMetricsView cb = new IceMX.Callback_MetricsAdmin_getMetricsView() + { + public void response(final java.util.Map<java.lang.String, IceMX.Metrics[]> data, + long timestamp) + { + addData(m, data, timestamp); + } + + public void exception(final Ice.LocalException e) + { + addData(m, null, 0); + } + + public void exception(final Ice.UserException e) + { + addData(m, null, 0); + } + }; + try + { + m.admin.begin_getMetricsView(m.view, cb); + } + catch(Ice.LocalException e) + { + addData(m, null, 0); + } + } + } + }, _refreshPeriod, _refreshPeriod, java.util.concurrent.TimeUnit.SECONDS); } } - synchronized private void stopRefreshThread() + synchronized private void stopRefresh() { - if(_refreshThread != null) + if(_refreshFuture != null) { - _refreshThread.done(); - _refreshThread = null; + _refreshFuture.cancel(false); + _refreshFuture = null; } } @@ -1358,10 +1218,14 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato } _refreshPeriod = refreshPeriod; - if(_refreshThread != null) + if(_refreshFuture != null) { - _refreshThread.setRefreshPeriod(_refreshPeriod); + _refreshFuture.cancel(false); + _refreshFuture = null; + + startRefresh(); } + } synchronized String getDateFormat() @@ -1376,14 +1240,14 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato // // Update the horizontal axis label, in JavaFx thread. // - _queue.enqueue(new Runnable() + enqueueJFX(new Runnable() { @Override public void run() { _xAxis.setLabel("Time (" + getDateFormat() + ")"); } - }, true); + }); } synchronized private void setMaximumSamples(final int samples) @@ -1399,7 +1263,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato // // If maximum samples change, we remove older samples. // - _queue.enqueue(new Runnable() + enqueueJFX(new Runnable() { @Override public void run() @@ -1410,7 +1274,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato adjustSize(row); } } - }, true); + }); } else { @@ -1617,7 +1481,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato if(_columnNames[columnIndex].equals("Show")) { row.visible = ((Boolean)value).booleanValue(); - _queue.enqueue(new Runnable() + enqueueJFX(new Runnable() { @Override public void run() @@ -1627,7 +1491,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato setNodesVisible(getSeriesClass(row.series.get(i)), row.visible); } } - }, true); + }); } else if(_columnNames[columnIndex].equals("Scale")) { @@ -1720,7 +1584,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato // // Must run in JavaFX thread. // - _queue.enqueue(new Runnable() + enqueueJFX(new Runnable() { @Override public void run() @@ -1730,7 +1594,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato i.setYValue(i.getYValue().doubleValue() * s2 / s1); } } - }, true); + }); } void @@ -1739,7 +1603,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato // // Must run in JavaFX thread. // - _queue.enqueue(new Runnable() + enqueueJFX(new Runnable() { @Override public void run() @@ -1751,7 +1615,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato setNodesStyle(styleClass); } } - }, true); + }); } // @@ -1824,6 +1688,56 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato _styles.put(seriesClass, sb.toString()); } + private void enqueueJFX(final Runnable runnable) { + _queue.submit(new Runnable() + { + public void run() + { + Platform.runLater(new Runnable() + { + @Override + public void run() + { + try + { + runnable.run(); + } + finally + { + _sem.release(); + } + } + }); + _sem.acquireUninterruptibly(); + } + }); + } + + private void enqueueSwing(final Runnable runnable) { + _queue.submit(new Runnable() + { + public void run() + { + SwingUtilities.invokeLater(new Runnable() + { + @Override + public void run() + { + try + { + runnable.run(); + } + finally + { + _sem.release(); + } + } + }); + _sem.acquireUninterruptibly(); + } + }); + } + static class DecimalRenderer extends DefaultListCellRenderer { public DecimalRenderer(ListCellRenderer renderer) @@ -1953,7 +1867,7 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato } private final Coordinator _coordinator; - private RefreshThread _refreshThread; + private java.util.concurrent.Future<?> _refreshFuture; private final static int _minRefreshPeriod = 1; // 1 seconds private final static int _maxRefreshPeriod = 60 * 60; // 3600 seconds = 1 hour. @@ -2043,7 +1957,19 @@ public class GraphView extends JFrame implements MetricsFieldContext, Coordinato 10000000.0d, 100000000.0d, 1000000000.0d}; - private final WorkQueue _queue; + + private final java.util.concurrent.Semaphore _sem = new java.util.concurrent.Semaphore(0); + private final java.util.concurrent.ExecutorService _queue = java.util.concurrent.Executors.newSingleThreadExecutor( + new java.util.concurrent.ThreadFactory() + { + public Thread newThread(Runnable r) + { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("GraphView-Thread"); + return t; + } + }); private final Preferences _preferences; private final static DataFormat LocalObjectMimeType = new DataFormat("application/x-java-jvm-local-objectref"); diff --git a/java/src/IceGridGUI/LiveDeployment/MetricsView.java b/java/src/IceGridGUI/LiveDeployment/MetricsView.java index da866f642f1..0ce60746390 100644 --- a/java/src/IceGridGUI/LiveDeployment/MetricsView.java +++ b/java/src/IceGridGUI/LiveDeployment/MetricsView.java @@ -99,7 +99,7 @@ class MetricsView extends TreeNode // If the metrics view is selected when enabled success, // we must start the refresh thread to pull updates. // - MetricsViewEditor.startRefreshThread(MetricsView.this); + MetricsViewEditor.startRefresh(MetricsView.this); } } }); @@ -107,7 +107,7 @@ class MetricsView extends TreeNode public void exception(final Ice.LocalException e) { - MetricsViewEditor.stopRefreshThread(); + MetricsViewEditor.stopRefresh(); SwingUtilities.invokeLater(new Runnable() { public void run() @@ -133,7 +133,7 @@ class MetricsView extends TreeNode public void exception(final Ice.UserException e) { - MetricsViewEditor.stopRefreshThread(); + MetricsViewEditor.stopRefresh(); SwingUtilities.invokeLater(new Runnable() { public void run() @@ -168,7 +168,7 @@ class MetricsView extends TreeNode // If the metrics view is selected when disabled success, // we stop the refresh. // - MetricsViewEditor.stopRefreshThread(); + MetricsViewEditor.stopRefresh(); } } }); @@ -176,7 +176,7 @@ class MetricsView extends TreeNode public void exception(final Ice.LocalException e) { - MetricsViewEditor.stopRefreshThread(); + MetricsViewEditor.stopRefresh(); SwingUtilities.invokeLater(new Runnable() { public void run() @@ -202,7 +202,7 @@ class MetricsView extends TreeNode public void exception(final Ice.UserException e) { - MetricsViewEditor.stopRefreshThread(); + MetricsViewEditor.stopRefresh(); SwingUtilities.invokeLater(new Runnable() { public void run() @@ -288,7 +288,7 @@ class MetricsView extends TreeNode public void exception(final Ice.LocalException e) { - MetricsViewEditor.stopRefreshThread(); + MetricsViewEditor.stopRefresh(); SwingUtilities.invokeLater(new Runnable() { public void run() @@ -318,7 +318,7 @@ class MetricsView extends TreeNode public void exception(final Ice.UserException e) { - MetricsViewEditor.stopRefreshThread(); + MetricsViewEditor.stopRefresh(); SwingUtilities.invokeLater(new Runnable() { public void run() @@ -340,7 +340,7 @@ class MetricsView extends TreeNode } catch(Ice.LocalException e) { - _editor.stopRefreshThread(); + _editor.stopRefresh(); JOptionPane.showMessageDialog(getCoordinator().getMainFrame(), "Error: " + e.toString(), "Error", JOptionPane.ERROR_MESSAGE); } diff --git a/java/src/IceGridGUI/LiveDeployment/MetricsViewEditor.java b/java/src/IceGridGUI/LiveDeployment/MetricsViewEditor.java index bc3ccc66f1f..a1e02833a58 100644 --- a/java/src/IceGridGUI/LiveDeployment/MetricsViewEditor.java +++ b/java/src/IceGridGUI/LiveDeployment/MetricsViewEditor.java @@ -89,54 +89,6 @@ import IceGridGUI.*; public class MetricsViewEditor extends Editor implements MetricsFieldContext { - private static class RefreshThread extends Thread - { - RefreshThread(long period, MetricsView node) - { - _period = period; - _node = node; - _done = false; - } - - synchronized public void - run() - { - while(true) - { - _node.fetchMetricsView(); - if(!_done) - { - try - { - wait(_period * 1000); - } - catch(InterruptedException ex) - { - } - } - - if(_done) - { - break; - } - } - } - - synchronized public void - done() - { - if(!_done) - { - _done = true; - notify(); - } - } - - private final long _period; - private final MetricsView _node; - private boolean _done = false; - } - // // This class allow to render a button in JTable cell. // @@ -242,7 +194,7 @@ public class MetricsViewEditor extends Editor implements MetricsFieldContext // // Stop the refresh thread. // - MetricsViewEditor.stopRefreshThread(); + MetricsViewEditor.stopRefresh(); // // If selected node is a MetricsView and it is enabled; start the refresh thread. @@ -250,7 +202,7 @@ public class MetricsViewEditor extends Editor implements MetricsFieldContext if(e.isAddedPath() && e.getPath().getLastPathComponent() instanceof MetricsView && ((MetricsView )e.getPath().getLastPathComponent()).isEnabled()) { - MetricsViewEditor.startRefreshThread((MetricsView)e.getPath().getLastPathComponent()); + MetricsViewEditor.startRefresh((MetricsView)e.getPath().getLastPathComponent()); } if(e.isAddedPath()) @@ -311,19 +263,25 @@ public class MetricsViewEditor extends Editor implements MetricsFieldContext return _refreshPeriod; } - synchronized static void startRefreshThread(MetricsView node) + synchronized static void startRefresh(final MetricsView node) { - assert(_refreshThread == null); - _refreshThread = new RefreshThread(_refreshPeriod, node); - _refreshThread.start(); + assert(_refreshFuture == null); + _refreshFuture = node.getCoordinator().getExecutor().scheduleAtFixedRate(new Runnable() + { + public void run() + { + node.fetchMetricsView(); + } + + }, _refreshPeriod, _refreshPeriod, java.util.concurrent.TimeUnit.MILLISECONDS); } - synchronized static void stopRefreshThread() + synchronized static void stopRefresh() { - if(_refreshThread != null) + if(_refreshFuture != null) { - _refreshThread.done(); - _refreshThread = null; + _refreshFuture.cancel(false); + _refreshFuture = null; } } @@ -1107,7 +1065,7 @@ public class MetricsViewEditor extends Editor implements MetricsFieldContext Map<Integer, MetricsField> _fields = new HashMap<Integer, MetricsField>(); } - private static RefreshThread _refreshThread; + private static java.util.concurrent.Future<?> _refreshFuture; private Map<String, JTable> _tables = new HashMap<String, JTable>(); diff --git a/java/src/IceGridGUI/SessionKeeper.java b/java/src/IceGridGUI/SessionKeeper.java index 7fc3072e147..246e43c2387 100644 --- a/java/src/IceGridGUI/SessionKeeper.java +++ b/java/src/IceGridGUI/SessionKeeper.java @@ -261,9 +261,41 @@ public class SessionKeeper } else { - _thread = new Pinger(_session, sessionTimeout * 1000 / 2); - _thread.setDaemon(true); - _thread.start(); + _keepAliveFuture = _coordinator.getExecutor().scheduleAtFixedRate(new Runnable() { + private void error(final Exception e) + { + SwingUtilities.invokeLater(new Runnable() + { + public void run() + { + sessionLost("Failed to contact the IceGrid registry: " + e.toString()); + } + }); + } + + public void run() + { + _session.begin_keepAlive(new Callback_AdminSession_keepAlive() + { + public void + response() + { + } + + public void + exception(Ice.LocalException ex) + { + error(ex); + } + + public void + exception(Ice.UserException ex) + { + error(ex); + } + }); + } + }, sessionTimeout / 2, sessionTimeout / 2, java.util.concurrent.TimeUnit.SECONDS); } try @@ -396,9 +428,10 @@ public class SessionKeeper void close(boolean destroySession) { - if(_thread != null) + if(_keepAliveFuture != null) { - _thread.done(); + _keepAliveFuture.cancel(false); + _keepAliveFuture = null; } if(_adapter != null) @@ -538,7 +571,7 @@ public class SessionKeeper private final AdminSessionPrx _session; private final boolean _routed; - private Pinger _thread; + private java.util.concurrent.Future<?> _keepAliveFuture; private Ice.ObjectAdapter _adapter; private AdminPrx _admin; @@ -4428,94 +4461,6 @@ public class SessionKeeper private KeyStorePanel _authorityCertificatesPanel; } - // - // We create a brand new Pinger thread for each session - // - class Pinger extends Thread - { - Pinger(AdminSessionPrx session, long period) - { - super("Pinger"); - - _session = session; - _period = period; - - if(_period <= 0) - { - _period = 5000; - } - } - - public void run() - { - boolean done = false; - - do - { - synchronized(this) - { - done = _done; - } - - if(!done) - { - try - { - _session.keepAlive(); - } - catch(final Exception e) - { - synchronized(this) - { - done = _done; - _done = true; - } - - if(!done) - { - SwingUtilities.invokeLater(new Runnable() - { - public void run() - { - sessionLost("Failed to contact the IceGrid registry: " + e.toString()); - } - }); - } - } - } - - synchronized(this) - { - if(!_done) - { - try - { - wait(_period); - } - catch(InterruptedException e) - { - // Ignored - } - } - done = _done; - } - } while(!done); - } - - public synchronized void done() - { - if(!_done) - { - _done = true; - notify(); - } - } - - private AdminSessionPrx _session; - private long _period; - private boolean _done = false; - } - SessionKeeper(Coordinator coordinator) { _coordinator = coordinator; diff --git a/java/src/IceInternal/ACMMonitor.java b/java/src/IceInternal/ACMMonitor.java index 24701416fe6..303bc8ded38 100644 --- a/java/src/IceInternal/ACMMonitor.java +++ b/java/src/IceInternal/ACMMonitor.java @@ -9,7 +9,7 @@ package IceInternal; -public interface ACMMonitor extends TimerTask +public interface ACMMonitor extends Runnable { void add(Ice.ConnectionI con); void remove(Ice.ConnectionI con); diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java index 81dfe8a5a4c..072cd8d1b64 100644 --- a/java/src/IceInternal/BatchOutgoingAsync.java +++ b/java/src/IceInternal/BatchOutgoingAsync.java @@ -9,7 +9,7 @@ package IceInternal; -public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessageCallback, TimerTask +public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessageCallback, Runnable { public BatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, CallbackBase callback) { @@ -43,7 +43,8 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync } if(_timeoutRequestHandler != null) { - _instance.timer().cancel(this); + _future.cancel(false); + _future = null; _timeoutRequestHandler = null; } _monitor.notifyAll(); @@ -70,7 +71,8 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync } if(_timeoutRequestHandler != null) { - _instance.timer().cancel(this); + _future.cancel(false); + _future = null; _timeoutRequestHandler = null; } } @@ -92,7 +94,7 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync } public void - runTimerTask() + run() { __runTimerTask(); } diff --git a/java/src/IceInternal/ConnectionACMMonitor.java b/java/src/IceInternal/ConnectionACMMonitor.java index b8a8021c980..effc30949eb 100644 --- a/java/src/IceInternal/ConnectionACMMonitor.java +++ b/java/src/IceInternal/ConnectionACMMonitor.java @@ -11,7 +11,8 @@ package IceInternal; class ConnectionACMMonitor implements ACMMonitor { - ConnectionACMMonitor(FactoryACMMonitor parent, Timer timer, ACMConfig config) + ConnectionACMMonitor(FactoryACMMonitor parent, java.util.concurrent.ScheduledExecutorService timer, + ACMConfig config) { _parent = parent; _timer = timer; @@ -42,7 +43,8 @@ class ConnectionACMMonitor implements ACMMonitor _connection = connection; if(_config.timeout > 0) { - _timer.scheduleRepeated(this, _config.timeout / 2); + _future = _timer.scheduleAtFixedRate(this, _config.timeout / 2, _config.timeout / 2, + java.util.concurrent.TimeUnit.MILLISECONDS); } } @@ -53,7 +55,8 @@ class ConnectionACMMonitor implements ACMMonitor _connection = null; if(_config.timeout > 0) { - _timer.cancel(this); + _future.cancel(false); + _future = null; } } @@ -80,7 +83,7 @@ class ConnectionACMMonitor implements ACMMonitor } public void - runTimerTask() + run() { Ice.ConnectionI connection; synchronized(this) @@ -103,7 +106,8 @@ class ConnectionACMMonitor implements ACMMonitor } final private FactoryACMMonitor _parent; - final private Timer _timer; + final private java.util.concurrent.ScheduledExecutorService _timer; + private java.util.concurrent.Future<?> _future; final private ACMConfig _config; private Ice.ConnectionI _connection; diff --git a/java/src/IceInternal/FactoryACMMonitor.java b/java/src/IceInternal/FactoryACMMonitor.java index a1071e8a4b6..7a328a8b790 100644 --- a/java/src/IceInternal/FactoryACMMonitor.java +++ b/java/src/IceInternal/FactoryACMMonitor.java @@ -74,7 +74,9 @@ class FactoryACMMonitor implements ACMMonitor if(_connections.isEmpty()) { _connections.add(connection); - _instance.timer().scheduleRepeated(this, _config.timeout / 2); + assert _future == null; + _future = _instance.timer().scheduleAtFixedRate(this, _config.timeout / 2, _config.timeout / 2, + java.util.concurrent.TimeUnit.MILLISECONDS); } else { @@ -148,7 +150,7 @@ class FactoryACMMonitor implements ACMMonitor } public void - runTimerTask() + run() { synchronized(this) { @@ -172,7 +174,8 @@ class FactoryACMMonitor implements ACMMonitor if(_connections.isEmpty()) { - _instance.timer().cancel(this); + _future.cancel(false); + _future = null; return; } } @@ -212,5 +215,6 @@ class FactoryACMMonitor implements ACMMonitor private java.util.Set<Ice.ConnectionI> _connections = new java.util.HashSet<Ice.ConnectionI>(); private java.util.List<Change> _changes = new java.util.ArrayList<Change>(); private java.util.List<Ice.ConnectionI> _reapedConnections = new java.util.ArrayList<Ice.ConnectionI>(); + private java.util.concurrent.Future<?> _future; }; diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index 0fa8d7e4422..824ac9c4ba4 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -216,7 +216,7 @@ public final class Instance return _retryQueue; } - synchronized public Timer + synchronized public java.util.concurrent.ScheduledExecutorService timer() { if(_state == StateDestroyed) @@ -896,11 +896,33 @@ public final class Instance // try { - _timer = new Timer(this); - if(initializationData().properties.getProperty("Ice.ThreadPriority").length() > 0) - { - _timer.setPriority(Util.getThreadPriorityProperty(initializationData().properties, "Ice")); - } + java.util.concurrent.ScheduledThreadPoolExecutor executor = + new java.util.concurrent.ScheduledThreadPoolExecutor(1, + new java.util.concurrent.ThreadFactory() + { + public Thread newThread(Runnable r) + { + Thread t = new Thread(r); + if(initializationData().properties.getProperty("Ice.ThreadPriority").length() > 0) + { + final int priority = Util.getThreadPriorityProperty( + initializationData().properties, "Ice"); + t.setPriority(priority); + } + + String threadName = initializationData().properties.getProperty("Ice.ProgramName"); + if(threadName.length() > 0) + { + threadName += "-"; + } + t.setName(threadName + "Ice.Timer"); + + return t; + } + }); + executor.setRemoveOnCancelPolicy(true); + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + _timer = executor; } catch(RuntimeException ex) { @@ -1048,7 +1070,14 @@ public final class Instance if(_timer != null) { - _timer._destroy(); + // Shutdown the executor. It isn't necessary to call + // awaitTermination since the threads are not daemon and + // therefore the VM will block until all threads have + // terminated. + _timer.shutdown(); + // Once we support interrupt we can use shutdownNow. + //_timer.shutdownNow(); + _timer = null; } @@ -1238,7 +1267,7 @@ public final class Instance private ThreadPool _serverThreadPool; private EndpointHostResolver _endpointHostResolver; private RetryQueue _retryQueue; - private Timer _timer; + private java.util.concurrent.ScheduledExecutorService _timer; private EndpointFactoryManager _endpointFactoryManager; private Ice.PluginManager _pluginManager; diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index e5e143a7e71..d939318fd9e 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -9,7 +9,7 @@ package IceInternal; -public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessageCallback, TimerTask +public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessageCallback, Runnable { public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb) { @@ -134,7 +134,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } if(_timeoutRequestHandler != null) { - _instance.timer().cancel(this); + _future.cancel(false); + _future = null; _timeoutRequestHandler = null; } _state |= Done | OK; @@ -165,7 +166,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } if(_timeoutRequestHandler != null) { - _instance.timer().cancel(this); + _future.cancel(false); + _future = null; _timeoutRequestHandler = null; } } @@ -223,7 +225,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa if(_timeoutRequestHandler != null) { - _instance.timer().cancel(this); + _future.cancel(false); + _future = null; _timeoutRequestHandler = null; } @@ -412,7 +415,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa int invocationTimeout = _handler.getReference().getInvocationTimeout(); if(invocationTimeout > 0) { - _instance.timer().schedule(this, invocationTimeout); + _future = _instance.timer().schedule(this, invocationTimeout, + java.util.concurrent.TimeUnit.MILLISECONDS); _timeoutRequestHandler = _handler; } } @@ -474,7 +478,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } public void - runTimerTask() + run() { __runTimerTask(); } diff --git a/java/src/IceInternal/ProxyBatchOutgoingAsync.java b/java/src/IceInternal/ProxyBatchOutgoingAsync.java index 77a82ae2772..15c4b26500f 100644 --- a/java/src/IceInternal/ProxyBatchOutgoingAsync.java +++ b/java/src/IceInternal/ProxyBatchOutgoingAsync.java @@ -44,7 +44,8 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync int invocationTimeout = handler.getReference().getInvocationTimeout(); if(invocationTimeout > 0) { - _instance.timer().schedule(this, invocationTimeout); + _future = _instance.timer().schedule(this, invocationTimeout, + java.util.concurrent.TimeUnit.MILLISECONDS); _timeoutRequestHandler = handler; } } diff --git a/java/src/IceInternal/RetryQueue.java b/java/src/IceInternal/RetryQueue.java index 864a2b8fb2e..6a6de427dfe 100644 --- a/java/src/IceInternal/RetryQueue.java +++ b/java/src/IceInternal/RetryQueue.java @@ -20,7 +20,7 @@ public class RetryQueue add(OutgoingAsync outAsync, int interval) { RetryTask task = new RetryTask(this, outAsync); - _instance.timer().schedule(task, interval); + task.setFuture(_instance.timer().schedule(task, interval, java.util.concurrent.TimeUnit.MILLISECONDS)); _requests.add(task); } @@ -29,7 +29,6 @@ public class RetryQueue { for(RetryTask task : _requests) { - _instance.timer().cancel(task); task.destroy(); } _requests.clear(); diff --git a/java/src/IceInternal/RetryTask.java b/java/src/IceInternal/RetryTask.java index e1c44452448..6767526e9c1 100644 --- a/java/src/IceInternal/RetryTask.java +++ b/java/src/IceInternal/RetryTask.java @@ -9,7 +9,7 @@ package IceInternal; -class RetryTask implements TimerTask +class RetryTask implements Runnable { RetryTask(RetryQueue queue, OutgoingAsync outAsync) { @@ -18,7 +18,7 @@ class RetryTask implements TimerTask } public void - runTimerTask() + run() { if(_queue.remove(this)) { @@ -36,9 +36,16 @@ class RetryTask implements TimerTask public void destroy() { + _future.cancel(false); _outAsync.__invokeExceptionAsync(new Ice.CommunicatorDestroyedException()); } + public void setFuture(java.util.concurrent.Future<?> future) + { + _future = future; + } + private final RetryQueue _queue; private final OutgoingAsync _outAsync; + private java.util.concurrent.Future<?> _future; } diff --git a/java/src/IceInternal/Timer.java b/java/src/IceInternal/Timer.java deleted file mode 100644 index 74f31317be1..00000000000 --- a/java/src/IceInternal/Timer.java +++ /dev/null @@ -1,324 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -// -// NOTE: We don't use the java.util.Timer class for few reasons. The -// Java TimerTask is a class not an interface making it more difficult -// to use. The API is also a bit different, cancel() is a TimerTask -// method not a Timer method and calling purge() on the timer on a -// regular basis is required to allow canceled timer task objects to -// be garbage collected. -// -public final class Timer extends Thread -{ - // - // Renamed from destroy to _destroy to avoid a deprecation warning caused - // by the destroy method inherited from Thread. - // - public void - _destroy() - { - synchronized(this) - { - if(_instance == null) - { - return; - } - - _instance = null; - notify(); - - _tokens.clear(); - _tasks.clear(); - } - - while(true) - { - try - { - join(); - break; - } - catch(java.lang.InterruptedException ex) - { - } - } - } - - synchronized public void - schedule(TimerTask task, long delay) - { - if(_instance == null) - { - throw new Ice.CommunicatorDestroyedException(); - } - - final Token token = new Token(IceInternal.Time.currentMonotonicTimeMillis() + delay, ++_tokenId, 0, task); - - Token previous = _tasks.put(task, token); - assert previous == null; - _tokens.add(token); - - if(token.scheduledTime < _wakeUpTime) - { - notify(); - } - } - - synchronized public void - scheduleRepeated(TimerTask task, long period) - { - if(_instance == null) - { - throw new Ice.CommunicatorDestroyedException(); - } - - final Token token = new Token(IceInternal.Time.currentMonotonicTimeMillis() + period, ++_tokenId, period, task); - - Token previous = _tasks.put(task, token); - assert previous == null; - _tokens.add(token); - - if(token.scheduledTime < _wakeUpTime) - { - notify(); - } - } - - synchronized public boolean - cancel(TimerTask task) - { - if(_instance == null) - { - return false; - } - - Token token = _tasks.remove(task); - if(token == null) - { - return false; - } - - _tokens.remove(token); - return true; - } - - // - // Only for use by Instance. - // - Timer(IceInternal.Instance instance) - { - _instance = instance; - - String threadName = _instance.initializationData().properties.getProperty("Ice.ProgramName"); - if(threadName.length() > 0) - { - threadName += "-"; - } - setName(threadName + "Ice.Timer"); - - start(); - } - - protected synchronized void - finalize() - throws Throwable - { - try - { - IceUtilInternal.Assert.FinalizerAssert(_instance == null); - } - catch(java.lang.Exception ex) - { - } - finally - { - super.finalize(); - } - } - - public void - run() - { - Token token = null; - while(true) - { - synchronized(this) - { - if(_instance != null) - { - // - // If the task we just ran is a repeated task, schedule it - // again for executation if it wasn't canceled. - // - if(token != null && token.delay > 0) - { - if(_tasks.containsKey(token.task)) - { - token.scheduledTime = IceInternal.Time.currentMonotonicTimeMillis() + token.delay; - _tokens.add(token); - } - } - } - token = null; - - if(_instance == null) - { - break; - } - - if(_tokens.isEmpty()) - { - _wakeUpTime = Long.MAX_VALUE; - while(true) - { - try - { - wait(); - break; - } - catch(java.lang.InterruptedException ex) - { - } - } - } - - if(_instance == null) - { - break; - } - - while(!_tokens.isEmpty() && _instance != null) - { - long now = IceInternal.Time.currentMonotonicTimeMillis(); - Token first = _tokens.first(); - if(first.scheduledTime <= now) - { - _tokens.remove(first); - token = first; - if(token.delay == 0) - { - _tasks.remove(token.task); - } - break; - } - - _wakeUpTime = first.scheduledTime; - while(true) - { - try - { - wait(first.scheduledTime - now); - break; - } - catch(java.lang.InterruptedException ex) - { - } - } - } - - if(_instance == null) - { - break; - } - } - - if(token != null) - { - try - { - token.task.runTimerTask(); - } - catch(Exception ex) - { - synchronized(this) - { - if(_instance != null) - { - String s = "unexpected exception from task run method in timer thread:\n" + Ex.toString(ex); - _instance.initializationData().logger.error(s); - } - } - } - } - } - } - - static private class Token implements Comparable<Token> - { - public - Token(long scheduledTime, int id, long delay, TimerTask task) - { - this.scheduledTime = scheduledTime; - this.id = id; - this.delay = delay; - this.task = task; - } - - public int - compareTo(Token r) - { - // - // Token are sorted by scheduled time and token id. - // - if(scheduledTime < r.scheduledTime) - { - return -1; - } - else if(scheduledTime > r.scheduledTime) - { - return 1; - } - - if(id < r.id) - { - return -1; - } - else if(id > r.id) - { - return 1; - } - - return 0; - } - - public boolean - equals(Object obj) - { - if(this == obj) - { - return true; - } - if(obj instanceof Token) - { - return compareTo((Token)obj) == 0; - } - return false; - } - - public int - hashCode() - { - return id ^ (int)scheduledTime; - } - - long scheduledTime; - int id; // Since we can't compare references, we need to use another id. - long delay; - TimerTask task; - } - - private final java.util.SortedSet<Token> _tokens = new java.util.TreeSet<Token>(); - private final java.util.Map<TimerTask, Token> _tasks = new java.util.HashMap<TimerTask, Token>(); - private Instance _instance; - private long _wakeUpTime = Long.MAX_VALUE; - private int _tokenId = 0; -} diff --git a/java/src/IceInternal/TimerTask.java b/java/src/IceInternal/TimerTask.java deleted file mode 100644 index 447eb0f1fbc..00000000000 --- a/java/src/IceInternal/TimerTask.java +++ /dev/null @@ -1,15 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -public interface TimerTask -{ - void runTimerTask(); -} |