diff options
author | Mark Spruiell <mes@zeroc.com> | 2002-02-13 23:24:47 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2002-02-13 23:24:47 +0000 |
commit | 0432ff88f9855a4c1fdeeeec422046c2ee99436f (patch) | |
tree | 4f08e55eb7d60382cbfe3355a555d9fea1f2cffa /java/src/IceInternal/ThreadPool.java | |
parent | fixes (diff) | |
download | ice-0432ff88f9855a4c1fdeeeec422046c2ee99436f.tar.bz2 ice-0432ff88f9855a4c1fdeeeec422046c2ee99436f.tar.xz ice-0432ff88f9855a4c1fdeeeec422046c2ee99436f.zip |
ThreadPool bug fixes, cleanup
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 231 |
1 files changed, 164 insertions, 67 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index ae1398e1a78..b08b4bbeaea 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -28,16 +28,10 @@ public final class ThreadPool public synchronized void unregister(java.nio.channels.SelectableChannel fd, boolean callFinished) { - java.nio.channels.SelectionKey key = fd.keyFor(_selector); - if (key != null) - { - HandlerInfo info = (HandlerInfo)key.attachment(); - assert(info != null); - info.callFinished = callFinished; - info.next = _removes; - _removes = info; - setInterrupt(); - } + RemoveInfo info = new RemoveInfo(fd, callFinished); + info.next = _removes; + _removes = info; + setInterrupt(); } public void @@ -49,9 +43,16 @@ public final class ThreadPool public synchronized void initiateServerShutdown() { - assert(!_shutdown); - _shutdown = true; - setInterrupt(); // TODO: just use wakeup? + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); + buf.put(0, (byte)1); + try + { + int n = _fdIntrWrite.write(buf); + assert(n == 1); + } + catch (java.io.IOException ex) + { + } } public synchronized void @@ -70,8 +71,7 @@ public final class ThreadPool if (_servers != 0) { - _instance.logger().error("can't wait for graceful server " + - "termination in thread pool\n" + + _instance.logger().error("can't wait for graceful server termination in thread pool\n" + "since all threads have vanished"); } } @@ -92,8 +92,7 @@ public final class ThreadPool if (_handlers != 0) { - _instance.logger().error("can't wait for graceful application " + - "termination in thread pool\n" + + _instance.logger().error("can't wait for graceful application termination in thread pool\n" + "since all threads have vanished"); } } @@ -149,8 +148,6 @@ public final class ThreadPool { _instance = instance; _destroyed = false; - _interrupted = false; - _shutdown = false; _adds = null; _removes = null; _handlers = 0; @@ -159,7 +156,22 @@ public final class ThreadPool try { + java.nio.channels.Pipe pipe = java.nio.channels.Pipe.open(); + _fdIntrRead = pipe.source(); + _fdIntrWrite = pipe.sink(); + _fdIntrRead.configureBlocking(false); + } + catch (java.io.IOException ex) + { + Ice.SystemException sys = new Ice.SystemException(); + sys.initCause(ex); + throw sys; + } + + try + { _selector = java.nio.channels.Selector.open(); + _fdIntrReadKey = _fdIntrRead.register(_selector, java.nio.channels.SelectionKey.OP_READ); } catch (java.io.IOException ex) { @@ -168,8 +180,7 @@ public final class ThreadPool throw sys; } - String value = - _instance.properties().getProperty("Ice.ServerIdleTime"); + String value = _instance.properties().getProperty("Ice.ServerIdleTime"); if (value != null) { try @@ -217,8 +228,7 @@ public final class ThreadPool // Must be called after _threadNum is set int maxConnections = 0; - value = _instance.properties().getProperty( - "Ice.ThreadPool.MaxConnections"); + value = _instance.properties().getProperty("Ice.ThreadPool.MaxConnections"); if (value != null) { try @@ -248,6 +258,26 @@ public final class ThreadPool { } } + if (_fdIntrWrite != null) + { + try + { + _fdIntrWrite.close(); + } + catch (java.io.IOException ex) + { + } + } + if (_fdIntrRead != null) + { + try + { + _fdIntrRead.close(); + } + catch (java.io.IOException ex) + { + } + } super.finalize(); } @@ -266,17 +296,67 @@ public final class ThreadPool private boolean clearInterrupt() { - _interrupted = false; - boolean shutdown = _shutdown; - _shutdown = false; +/* +System.out.println("clearInterrupt"); +try +{ + throw new RuntimeException(); +} +catch (RuntimeException ex) +{ + ex.printStackTrace(); +} +*/ + boolean shutdown = false; + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); + try + { + while (true) + { + buf.rewind(); + if (_fdIntrRead.read(buf) != 1) + { + break; + } + +//System.out.println(" got byte " + (int)buf.get(0)); + if (buf.get(0) == (byte)1) // Shutdown initiated? + { + shutdown = true; + } + } + } + catch (java.io.IOException ex) + { + } + return shutdown; } private void setInterrupt() { - _interrupted = true; - _selector.wakeup(); // Causes select() to return immediately +/* +System.out.println("setInterrupt"); +try +{ + throw new RuntimeException(); +} +catch (RuntimeException ex) +{ + ex.printStackTrace(); +} +*/ + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); + buf.put(0, (byte)0); + try + { + int n = _fdIntrWrite.write(buf); + assert(n == 1); + } + catch (java.io.IOException ex) + { + } } private void @@ -284,6 +364,7 @@ public final class ThreadPool { boolean shutdown = false; final int timeoutMillis = _timeout * 1000; + java.util.Set keys = _selector.selectedKeys(); while (true) { @@ -295,11 +376,12 @@ public final class ThreadPool { if (shutdown) // Shutdown has been initiated. { +//System.out.println("ThreadPool - shutdown"); shutdown = false; _instance.objectAdapterFactory().shutdown(); } - _selector.selectedKeys().clear(); + keys.clear(); int ret = 0; try { @@ -316,7 +398,8 @@ public final class ThreadPool throw se; } - if (ret == 0 && !_interrupted) // Timeout. +//System.out.println("Select() returned " + ret); + if (ret == 0) // Timeout. { assert(_timeout > 0); _timeout = 0; @@ -330,11 +413,11 @@ public final class ThreadPool { if (_destroyed) { +//System.out.println("ThreadPool - destroyed"); // // Don't clear the interrupt fd if destroyed, so that // the other threads exit as well. // - _selector.wakeup(); return; } @@ -347,8 +430,7 @@ public final class ThreadPool while (info != null) { int op; - if ((info.fd.validOps() & - java.nio.channels.SelectionKey.OP_READ) > 0) + if ((info.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) { op = java.nio.channels.SelectionKey.OP_READ; } @@ -360,8 +442,7 @@ public final class ThreadPool _handlers++; try { - info.key = - info.fd.register(_selector, op, info); + info.key = info.fd.register(_selector, op, info); } catch (java.nio.channels.ClosedChannelException ex) { @@ -379,15 +460,18 @@ public final class ThreadPool // // Handlers are permanently removed. // - HandlerInfo info = _removes; + RemoveInfo info = _removes; while (info != null) { - info.key.cancel(); - if (info.callFinished) + java.nio.channels.SelectionKey key = info.fd.keyFor(_selector); + assert(key != null); + HandlerInfo hinfo = (HandlerInfo)key.attachment(); + key.cancel(); + if (info.callFinished) // Call finished() on the handler? { - info.handler.finished(); + hinfo.handler.finished(); } - if (info.handler.server()) + if (hinfo.handler.server()) { --_servers; } @@ -409,36 +493,37 @@ public final class ThreadPool continue repeatSelect; } - java.util.Set keys = _selector.selectedKeys(); - if (keys.size() == 0) - { - shutdown = clearInterrupt(); - continue repeatSelect; - } - + java.nio.channels.SelectionKey key = null; java.util.Iterator i = keys.iterator(); while (i.hasNext()) { - java.nio.channels.SelectionKey key = - (java.nio.channels.SelectionKey)i.next(); // - // Ignore selection keys that have been - // cancelled + // Ignore selection keys that have been cancelled // - if (key.isValid()) + java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)i.next(); + if (k.isValid()) { - HandlerInfo info = - (HandlerInfo)key.attachment(); - assert(info != null); - handler = info.handler; + key = k; break; } } - if (handler == null) + if (key == null) { +//System.out.println("ThreadPool - didn't find a valid key"); + continue repeatSelect; + } + + if (key.channel() == _fdIntrRead) + { +//System.out.println("ThreadPool - input ready on the interrupt pipe"); + shutdown = clearInterrupt(); continue repeatSelect; } + + HandlerInfo info = (HandlerInfo)key.attachment(); + assert(info != null); + handler = info.handler; } // @@ -486,10 +571,11 @@ public final class ThreadPool if (stream.pos() != stream.size()) { handler.read(stream); - assert(stream.pos() != stream.size()); + assert(stream.pos() == stream.size()); } int pos = stream.pos(); + assert(pos >= Protocol.headerSize); stream.pos(0); byte protVer = stream.readByte(); if (protVer != Protocol.protocolVersion) @@ -520,7 +606,7 @@ public final class ThreadPool if (stream.pos() != stream.size()) { handler.read(stream); - assert(stream.pos() != stream.size()); + assert(stream.pos() == stream.size()); } } @@ -530,23 +616,35 @@ public final class ThreadPool EventHandler handler; java.nio.channels.SelectionKey key; HandlerInfo next; - boolean callFinished; - HandlerInfo(java.nio.channels.SelectableChannel fd, - EventHandler handler) + HandlerInfo(java.nio.channels.SelectableChannel fd, EventHandler handler) { this.fd = fd; this.handler = handler; } } + private static final class RemoveInfo + { + java.nio.channels.SelectableChannel fd; + boolean callFinished; + RemoveInfo next; + + RemoveInfo(java.nio.channels.SelectableChannel fd, boolean callFinished) + { + this.fd = fd; + this.callFinished = callFinished; + } + } + private Instance _instance; private boolean _destroyed; + private java.nio.channels.Pipe.SourceChannel _fdIntrRead; + private java.nio.channels.SelectionKey _fdIntrReadKey; + private java.nio.channels.Pipe.SinkChannel _fdIntrWrite; private java.nio.channels.Selector _selector; - private boolean _interrupted; - private boolean _shutdown; private HandlerInfo _adds; - private HandlerInfo _removes; + private RemoveInfo _removes; private int _handlers; private int _servers; private int _timeout; @@ -581,8 +679,7 @@ public final class ThreadPool java.io.PrintWriter pw = new java.io.PrintWriter(sw); ex.printStackTrace(pw); pw.flush(); - String s = "unknown exception in thread pool:\n" + - sw.toString(); + String s = "unknown exception in thread pool:\n" + sw.toString(); _pool._instance.logger().error(s); } |