diff options
author | Mark Spruiell <mes@zeroc.com> | 2001-11-20 18:38:21 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2001-11-20 18:38:21 +0000 |
commit | b57d1aad76206d429b1a9ad1270aed469eacfd25 (patch) | |
tree | 993b78ab6f0fffd1e11cb94feffe487959cdbfe4 /java/src/IceInternal/ThreadPool.java | |
parent | Turned off session caching as per Duncan's recommendation. (diff) | |
download | ice-b57d1aad76206d429b1a9ad1270aed469eacfd25.tar.bz2 ice-b57d1aad76206d429b1a9ad1270aed469eacfd25.tar.xz ice-b57d1aad76206d429b1a9ad1270aed469eacfd25.zip |
remove interrupt pipe; reaping fixes
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 224 |
1 files changed, 114 insertions, 110 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 2b431a6c333..7460c975679 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -19,7 +19,7 @@ public final class ThreadPool { ++_servers; } - _adds.add(new FdHandlerPair(fd, handler)); + _adds.add(new HandlerInfo(fd, handler)); setInterrupt(); } @@ -37,22 +37,12 @@ public final class ThreadPool _threadMutex.unlock(); } - public void + public synchronized void initiateServerShutdown() { - // Can't use _fdIntrWriteBuf because it's not thread-safe - final byte[] arr = { 1 }; - java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(arr); - try - { - _fdIntrWrite.write(buf); - } - catch (java.io.IOException ex) - { - Ice.SystemException sys = new Ice.SystemException(); - sys.initCause(ex); - throw sys; - } + assert(!_shutdown); + _shutdown = true; + setInterrupt(); } public synchronized void @@ -80,7 +70,7 @@ public final class ThreadPool public synchronized void waitUntilFinished() { - while (_handlerMap.size() > 0 && _threadNum > 0) + while (_handlers > 0 && _threadNum > 0) { try { @@ -91,7 +81,7 @@ public final class ThreadPool } } - if (_handlerMap.size() > 0) + if (_handlers > 0) { _instance.logger().error("can't wait for graceful application " + "termination in thread pool\n" + @@ -151,21 +141,15 @@ public final class ThreadPool { _instance = instance; _destroyed = false; + _interrupted = false; + _shutdown = false; + _handlers = 0; _servers = 0; _timeout = 0; try { _selector = java.nio.channels.Selector.open(); - _pipe = java.nio.channels.Pipe.open(); - _fdIntrRead = _pipe.source(); - _fdIntrRead.configureBlocking(false); - _fdIntrWrite = _pipe.sink(); - _fdIntrReadBuf = java.nio.ByteBuffer.allocateDirect(1); - _fdIntrWriteBuf = java.nio.ByteBuffer.allocateDirect(1); - _fdIntrRead.register(_selector, - java.nio.channels.SelectionKey.OP_READ); - _fdIntrReadKey = _fdIntrRead.keyFor(_selector); } catch (java.io.IOException ex) { @@ -208,6 +192,7 @@ public final class ThreadPool } } + _threads = new EventHandlerThread[_threadNum]; for (int i = 0; i < _threadNum; i++) { _threads[i] = new EventHandlerThread(this); @@ -243,21 +228,11 @@ public final class ThreadPool throws Throwable { assert(_destroyed); - if (_fdIntrWrite != null) - { - try - { - _fdIntrWrite.close(); - } - catch (java.io.IOException ex) - { - } - } - if (_fdIntrRead != null) + if (_selector != null) { try { - _fdIntrRead.close(); + _selector.close(); } catch (java.io.IOException ex) { @@ -267,7 +242,10 @@ public final class ThreadPool super.finalize(); } - private synchronized void + // + // Called by Instance + // + synchronized void destroy() { assert(!_destroyed); @@ -278,44 +256,17 @@ public final class ThreadPool private boolean clearInterrupt() { - try - { - boolean shutdown = false; - _fdIntrReadBuf.rewind(); - while (_fdIntrRead.read(_fdIntrReadBuf) == 1) - { - if (_fdIntrReadBuf.get(0) == 1) // Shutdown initiated? - { - shutdown = true; - } - _fdIntrReadBuf.rewind(); - } - - return shutdown; - } - catch (java.io.IOException ex) - { - Ice.SystemException sys = new Ice.SystemException(); - sys.initCause(ex); - throw sys; - } + _interrupted = false; + boolean shutdown = _shutdown; + _shutdown = false; + return shutdown; } private void setInterrupt() { - _fdIntrWriteBuf.rewind(); - _fdIntrWriteBuf.put(0, (byte)0); - try - { - _fdIntrWrite.write(_fdIntrReadBuf); - } - catch (java.io.IOException ex) - { - Ice.SystemException sys = new Ice.SystemException(); - sys.initCause(ex); - throw sys; - } + _interrupted = true; + _selector.wakeup(); // Causes select() to return immediately } private void @@ -376,11 +327,10 @@ public final class ThreadPool return; } - boolean interrupt = false; - if (_fdIntrReadKey.isReadable()) + boolean interrupt = _interrupted; + if (interrupt) { shutdown = clearInterrupt(); - interrupt = true; } if (!_adds.isEmpty()) @@ -391,14 +341,15 @@ public final class ThreadPool java.util.ListIterator p = _adds.listIterator(); while (p.hasNext()) { - FdHandlerPair pair = (FdHandlerPair)p.next(); - _reapList.addFirst(pair.fd); + HandlerInfo info = (HandlerInfo)p.next(); + addHandler(info); + _handlers++; try { - pair.fd.register( + info.fd.register( _selector, java.nio.channels.SelectionKey.OP_READ, - pair.handler); + info); } catch (java.io.ClosedChannelException ex) { @@ -419,19 +370,20 @@ public final class ThreadPool java.nio.channels.SelectionKey key = (java.nio.channels.SelectionKey)p.next(); key.cancel(); - handler = (EventHandler)key.attachment(); - assert(handler != null); - handler.finished(); - if (handler.server()) + HandlerInfo info = (HandlerInfo)key.attachment(); + assert(info != null); + info.handler.finished(); + if (info.handler.server()) { --_servers; } - _reapList.remove(key.channel()); + + _handlers--; + removeHandler(info); } _removes.clear(); - // TODO - if (_handlerMap.empty() || _servers == 0) + if (_handlers == 0 || _servers == 0) { notifyAll(); // For waitUntil...Finished() methods. } @@ -446,30 +398,45 @@ public final class ThreadPool // Check if there are connections to reap. // reap = false; - // TODO - if (_maxConnections > 0 && - _handlerMap.size() > _maxConnections) + if (_maxConnections > 0 && _handlers > _maxConnections) { - // TODO + HandlerInfo info = _reapListEnd; + while (info != null) + { + if (!info.reaped) + { + info.reaped = true; + handler = info.handler; + reap = true; + break; + } + info = info.prev; + } } if (!reap) { java.util.Set keys = _selector.selectedKeys(); java.util.Iterator i = keys.iterator(); - while (handler == null && i.hasNext()) + assert(i.hasNext()); + java.nio.channels.SelectionKey key = + (java.nio.channels.SelectionKey)i.next(); + HandlerInfo info = (HandlerInfo)key.attachment(); + i.remove(); + assert(info != null); + + // + // Make the fd for the handler the most recently used + // one by moving it to the beginning of the the reap + // list. + // + if (info != _reapList) { - java.nio.channels.SelectionKey key = - (java.nio.channels.SelectionKey)i.next(); - if (key != _fdIntrReadKey) - { - handler = (EventHandler)key.attachment(); - } - i.remove(); + removeHandler(info); + addHandler(info); } - assert(handler != null); - // TODO: Update _reapList + handler = info.handler; } } @@ -530,13 +497,52 @@ public final class ThreadPool // TODO } - private static final class FdHandlerPair + private void + addHandler(HandlerInfo info) + { + info.next = _reapList; + info.prev = null; + if (_reapList != null) + { + _reapList.prev = info; + } + else + { + _reapListEnd = info; + } + _reapList = info; + } + + private void + removeHandler(HandlerInfo info) + { + // + // Remove from _reapList + // + if (info.prev == null) + { + _reapList = info.next; + } + else + { + info.prev.next = info.next; + } + if (info.next == null) + { + _reapListEnd = info.prev; + } + } + + private static final class HandlerInfo { java.nio.channels.SelectableChannel fd; EventHandler handler; + HandlerInfo prev; + HandlerInfo next; + boolean reaped; - FdHandlerPair(java.nio.channels.SelectableChannel fd, - EventHandler handler) + HandlerInfo(java.nio.channels.SelectableChannel fd, + EventHandler handler) { this.fd = fd; this.handler = handler; @@ -546,15 +552,13 @@ public final class ThreadPool private Instance _instance; private boolean _destroyed; private java.nio.channels.Selector _selector; - private java.nio.channels.Pipe _pipe; - private java.nio.channels.Pipe.SourceChannel _fdIntrRead; - private java.nio.channels.Pipe.SinkChannel _fdIntrWrite; - private java.nio.ByteBuffer _fdIntrReadBuf; - private java.nio.ByteBuffer _fdIntrWriteBuf; - private java.nio.channels.SelectionKey _fdIntrReadKey; + private boolean _interrupted; + private boolean _shutdown; private java.util.LinkedList _adds = new java.util.LinkedList(); private java.util.LinkedList _removes = new java.util.LinkedList(); - private java.util.LinkedList _reapList = new java.util.LinkedList(); + private int _handlers; + private HandlerInfo _reapList = null; + private HandlerInfo _reapListEnd = null; private int _servers; private int _timeout; private RecursiveMutex _threadMutex = new RecursiveMutex(); |