diff options
author | Mark Spruiell <mes@zeroc.com> | 2002-05-02 03:27:07 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2002-05-02 03:27:07 +0000 |
commit | 5f342d668a30647b7d7182b4b296e65f7cfc2b07 (patch) | |
tree | 53f13aa7e79a77da4ca6133cf170812d3995d8b2 /java/src/IceInternal/ThreadPool.java | |
parent | adding assertions (diff) | |
download | ice-5f342d668a30647b7d7182b4b296e65f7cfc2b07.tar.bz2 ice-5f342d668a30647b7d7182b4b296e65f7cfc2b07.tar.xz ice-5f342d668a30647b7d7182b4b296e65f7cfc2b07.zip |
align with C++ changes for thread pool, properties, plug-ins
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 156 |
1 files changed, 40 insertions, 116 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 13a7a646930..94d71ada0f6 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -15,14 +15,7 @@ public final class ThreadPool public synchronized void _register(java.nio.channels.SelectableChannel fd, EventHandler handler) { - if (handler.server()) - { - ++_servers; - } - else - { - ++_clients; - } + ++_handlers; _changes.add(new FdHandlerPair(fd, handler)); setInterrupt(0); } @@ -34,62 +27,24 @@ public final class ThreadPool setInterrupt(0); } - public synchronized void - serverIsNowClient() - { - ++_clients; - assert(_servers > 0); - --_servers; - if (_servers == 0) - { - notifyAll(); // For waitUntil...Finished() methods. - } - } - - public synchronized void - clientIsNowServer() - { - ++_servers; - assert(_clients > 0); - --_clients; - } - public void promoteFollower() { + if (_multipleThreads) + { //System.out.println("ThreadPool - promote follower - lock count = " + _threadMutex.count()); - _threadMutex.unlock(); + _threadMutex.unlock(); + } } public void - initiateServerShutdown() + initiateShutdown() { //System.out.println("ThreadPool - initiate server shutdown"); setInterrupt(1); } public synchronized void - waitUntilServerFinished() - { - while (_servers != 0 && _threadNum != 0) - { - try - { - wait(); - } - catch (InterruptedException ex) - { - } - } - - if (_servers != 0) - { - _logger.error("can't wait for graceful server termination in thread pool\n" + - "since all threads have vanished"); - } - } - - public synchronized void waitUntilFinished() { while (_handlers != 0 && _threadNum != 0) @@ -103,14 +58,10 @@ public final class ThreadPool } } - if (_clients + _servers != 0) + if (_handlers != 0) { - _logger.error("can't wait for graceful application termination in thread pool\n" + - "since all threads have vanished"); - } - else - { - assert(_handlers == 0); + _instance.logger().error("can't wait for graceful server termination in thread pool\n" + + "since all threads have vanished"); } } @@ -139,38 +90,16 @@ public final class ThreadPool } } - public synchronized void - setMaxConnections(int maxConnections) - { - if (maxConnections < _threadNum + 1 && maxConnections != 0) - { - _maxConnections = _threadNum + 1; - } - else - { - _maxConnections = maxConnections; - } - } - - public synchronized int - getMaxConnections() - { - return _maxConnections; - } - // // Only for use by Instance // - ThreadPool(Instance instance) + ThreadPool(Instance instance, boolean server) { _instance = instance; - _logger = _instance.logger(); - _properties = _instance.properties(); _destroyed = false; _handlers = 0; - _clients = 0; - _servers = 0; _timeout = 0; + _multipleThreads = false; Network.SocketPair pair = Network.createPipe(); _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; @@ -206,13 +135,26 @@ public final class ThreadPool // _keysIter = null; - _timeout = _properties.getPropertyAsInt("Ice.ServerIdleTime"); - _threadNum = _properties.getPropertyAsIntWithDefault("Ice.ThreadPool.Size", 10); + if (server) + { + _timeout = _instance.properties().getPropertyAsInt("Ice.ServerIdleTime"); + _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ServerThreadPool.Size", 10); + } + else + { + _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ClientThreadPool.Size", 10); + } + if (_threadNum < 1) { _threadNum = 1; } + if (_threadNum > 1) + { + _multipleThreads = true; + } + try { _threads = new EventHandlerThread[_threadNum]; @@ -227,9 +169,6 @@ public final class ThreadPool destroy(); throw ex; } - - // Must be called after _threadNum is set. - setMaxConnections(_properties.getPropertyAsInt("Ice.ThreadPool.MaxConnections")); } protected void @@ -378,8 +317,11 @@ catch (RuntimeException ex) while (true) { - _threadMutex.lock(); + if (_multipleThreads) + { + _threadMutex.lock(); //System.out.println("ThreadPool - thread " + Thread.currentThread() + " has the lock"); + } repeatSelect: @@ -520,7 +462,7 @@ catch (RuntimeException ex) if (change.handler != null) // Addition if handler is set. { -//System.out.println("ThreadPool - adding handler"); +//System.err.println("ThreadPool - adding handler" + change.handler + ", stream = " + change.handler._stream); int op; if ((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) { @@ -531,7 +473,6 @@ catch (RuntimeException ex) op = java.nio.channels.SelectionKey.OP_ACCEPT; } - _handlers++; try { change.fd.register(_selector, op, change.handler); @@ -549,7 +490,6 @@ catch (RuntimeException ex) assert(key != null); handler = (EventHandler)key.attachment(); finished = true; - --_handlers; key.cancel(); // Don't goto repeatSelect; we have to call // finished() on the event handler below, outside @@ -587,6 +527,7 @@ catch (RuntimeException ex) } handler = (EventHandler)key.attachment(); +//System.err.println("Selected handler with stream " + handler._stream + " - key is " + (key.isValid() ? "" : "NOT ") + "valid"); } } @@ -598,25 +539,14 @@ catch (RuntimeException ex) // Notify a handler about it's removal from the thread // pool. // - handler.finished(); - handler._stream.destroy(); + handler.finished(this); synchronized (this) { - if (handler.server()) - { - assert(_servers > 0); - --_servers; - } - else - { - assert(_clients > 0); - --_clients; - } -//System.out.println("ThreadPool - handler is finished - _handlers = " + _handlers + ", _clients = " + _clients + ", _servers = " + _servers); - if (_clients == 0 || _servers == 0) + assert(_handlers > 0); + if (--_handlers == 0) { - notifyAll(); // For waitUntil...Finished() methods. + notifyAll(); // For waitUntilFinished(). } } } @@ -656,7 +586,7 @@ catch (RuntimeException ex) assert(stream.pos() == stream.size()); } - handler.message(stream); + handler.message(stream, this); } finally { @@ -751,8 +681,6 @@ catch (RuntimeException ex) } private Instance _instance; - private Ice.Logger _logger; - private Ice.Properties _properties; private boolean _destroyed; private java.nio.channels.ReadableByteChannel _fdIntrRead; private java.nio.channels.SelectionKey _fdIntrReadKey; @@ -762,10 +690,9 @@ catch (RuntimeException ex) private java.util.Iterator _keysIter; private java.util.LinkedList _changes = new java.util.LinkedList(); private int _handlers; - private int _clients; - private int _servers; private int _timeout; private RecursiveMutex _threadMutex = new RecursiveMutex(); + private boolean _multipleThreads; private final static class EventHandlerThread extends Thread { @@ -790,7 +717,7 @@ catch (RuntimeException ex) ex.printStackTrace(pw); pw.flush(); String s = "exception in thread pool:\n" + sw.toString(); - _pool._logger.error(s); + _pool._instance.logger().error(s); } catch (RuntimeException ex) { @@ -799,7 +726,7 @@ catch (RuntimeException ex) ex.printStackTrace(pw); pw.flush(); String s = "unknown exception in thread pool:\n" + sw.toString(); - _pool._logger.error(s); + _pool._instance.logger().error(s); } synchronized(_pool) @@ -832,7 +759,4 @@ catch (RuntimeException ex) } private EventHandlerThread[] _threads; private int _threadNum; // Number of running threads - private int _maxConnections; // Maximum number of connections. If set to - // zero, the number of connections is not - // limited. } |