diff options
author | Mark Spruiell <mes@zeroc.com> | 2002-04-16 23:02:05 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2002-04-16 23:02:05 +0000 |
commit | f735a39d64deba47b416e41442d1321f9c972d8c (patch) | |
tree | 12a7b8ed7c594f13ee511cec9e409669e7ad90fd /java/src/IceInternal/ThreadPool.java | |
parent | Win32 fixes for IceBox (diff) | |
download | ice-f735a39d64deba47b416e41442d1321f9c972d8c.tar.bz2 ice-f735a39d64deba47b416e41442d1321f9c972d8c.tar.xz ice-f735a39d64deba47b416e41442d1321f9c972d8c.zip |
align with C++ - fixes for the thread pool
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 258 |
1 files changed, 143 insertions, 115 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 24e6043f586..c73731bccc4 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -19,19 +19,39 @@ public final class ThreadPool { ++_servers; } - HandlerInfo info = new HandlerInfo(fd, handler); - info.next = _adds; - _adds = info; - setInterrupt(); + else + { + ++_clients; + } + _changes.add(new FdHandlerPair(fd, handler)); + setInterrupt(0); + } + + public synchronized void + unregister(java.nio.channels.SelectableChannel fd) + { + _changes.add(new FdHandlerPair(fd, null)); + setInterrupt(0); } public synchronized void - unregister(java.nio.channels.SelectableChannel fd, boolean callFinished) + serverIsNowClient() { - RemoveInfo info = new RemoveInfo(fd, callFinished); - info.next = _removes; - _removes = info; - setInterrupt(); + ++_clients; + assert(_servers > 0); + --_servers; + if (_servers == 0) + { + notifyAll(); // For waitUntil...Finished() methods. + } + } + + public synchronized void + clientIsNowServer() + { + ++_servers; + assert(_clients > 0); + --_clients; } public void @@ -41,26 +61,17 @@ public final class ThreadPool _threadMutex.unlock(); } - public synchronized void + public void initiateServerShutdown() { //System.out.println("ThreadPool - initiate server shutdown"); - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - buf.put(0, (byte)1); - try - { - int n = _fdIntrWrite.write(buf); - assert(n == 1); - } - catch (java.io.IOException ex) - { - } + setInterrupt(1); } public synchronized void waitUntilServerFinished() { - while (_servers != 0 && _threadNum != 0) + while (_clients + _servers != 0 && _threadNum != 0) { try { @@ -92,11 +103,15 @@ public final class ThreadPool } } - if (_handlers != 0) + if (_clients + _servers != 0) { _logger.error("can't wait for graceful application termination in thread pool\n" + "since all threads have vanished"); } + else + { + assert(_handlers == 0); + } } public void @@ -152,9 +167,8 @@ public final class ThreadPool _logger = _instance.logger(); _properties = _instance.properties(); _destroyed = false; - _adds = null; - _removes = null; _handlers = 0; + _clients = 0; _servers = 0; _timeout = 0; @@ -285,7 +299,7 @@ public final class ThreadPool //System.out.println("ThreadPool - destroy"); assert(!_destroyed); _destroyed = true; - setInterrupt(); + setInterrupt(0); } private boolean @@ -302,7 +316,8 @@ catch (RuntimeException ex) ex.printStackTrace(); } */ - boolean shutdown = false; + byte b = 0; + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); try { @@ -315,21 +330,22 @@ catch (RuntimeException ex) } //System.out.println(" clearInterrupt - got byte " + (int)buf.get(0)); - if (buf.get(0) == (byte)1) // Shutdown initiated? - { - shutdown = true; - } + b = buf.get(0); + break; } } catch (java.io.IOException ex) { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; } - return shutdown; + return b == (byte)1; // Return true if shutdown has been initiated. } private void - setInterrupt() + setInterrupt(int b) { //System.out.println("setInterrupt"); /* @@ -343,14 +359,19 @@ catch (RuntimeException ex) } //*/ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - buf.put(0, (byte)0); - try - { - int n = _fdIntrWrite.write(buf); - assert(n == 1); - } - catch (java.io.IOException ex) + buf.put(0, (byte)b); + while (buf.hasRemaining()) { + try + { + _fdIntrWrite.write(buf); + } + catch (java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } } } @@ -461,31 +482,66 @@ catch (RuntimeException ex) } EventHandler handler = null; - RemoveInfo remove = null; + boolean finished = false; synchronized (this) { - if (_destroyed) + if (_keys.remove(_fdIntrReadKey) && _fdIntrReadKey.isReadable()) { + if (!_keys.isEmpty()) + { + _keysIter = _keys.iterator(); + } + else + { + _keysIter = null; + } + + // + // There are three possibilities for an interrupt: + // + // - The thread pool has been destroyed. + // + // - Server shutdown has been initiated. + // + // - An event handler was registered or unregistered. + // + + // + // Thread pool destroyed? + // + if (_destroyed) + { //System.out.println("ThreadPool - destroyed, thread id = " + Thread.currentThread()); + // + // Don't clear the interrupt fd if destroyed, so that + // the other threads exit as well. + // + return; + } + + shutdown = clearInterrupt(); + // - // Don't clear the interrupt fd if destroyed, so that - // the other threads exit as well. + // Server shutdown? // - return; - } + if (shutdown) + { + continue repeatSelect; + } - if (_adds != null) - { // - // New handlers have been added. + // An event handler must have been registered or + // unregistered. // - HandlerInfo info = _adds; - while (info != null) + assert(!_changes.isEmpty()); + FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); + + if (change.handler != null) // Addition if handler is set. { -//System.out.println("ThreadPool - adding fd " + info.fd); +//System.out.println("ThreadPool - adding handler"); int op; - if ((info.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) + if ((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) { op = java.nio.channels.SelectionKey.OP_READ; } @@ -497,37 +553,29 @@ catch (RuntimeException ex) _handlers++; try { - info.key = info.fd.register(_selector, op, info); + change.fd.register(_selector, op, change.handler); } catch (java.nio.channels.ClosedChannelException ex) { assert(false); } - HandlerInfo next = info.next; - info.next = null; - info = next; + continue repeatSelect; + } + else // Removal if handler is not set. + { +//System.out.println("ThreadPool - removing handler"); + java.nio.channels.SelectionKey key = change.fd.keyFor(_selector); + assert(key != null); + handler = (EventHandler)key.attachment(); + finished = true; + --_handlers; + key.cancel(); + // Don't goto repeatSelect; we have to call + // finished() on the event handler below, outside + // the thread synchronization. } - _adds = null; - } - - if (_removes != null) - { - // - // Handlers are permanently removed. - // - remove = _removes; - _removes = _removes.next; - java.nio.channels.SelectionKey key = remove.fd.keyFor(_selector); - assert(key != null); - HandlerInfo hinfo = (HandlerInfo)key.attachment(); - key.cancel(); - handler = hinfo.handler; -//System.out.println("ThreadPool - remove fd = " + remove.fd); -//if (_removes != null) -// System.out.println("ThreadPool - more fds to be removed"); } - - if (handler == null) + else { java.nio.channels.SelectionKey key = null; while (_keysIter.hasNext()) @@ -537,7 +585,7 @@ catch (RuntimeException ex) // java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)_keysIter.next(); _keysIter.remove(); - if (k.isValid()) + if (k.isValid() && key != _fdIntrReadKey) { //System.out.println("ThreadPool - found a key"); key = k; @@ -557,41 +605,35 @@ catch (RuntimeException ex) continue repeatSelect; } - if (key.channel() == _fdIntrRead) - { -//System.out.println("ThreadPool - input ready on the interrupt pipe"); - shutdown = clearInterrupt(); - continue repeatSelect; - } - - HandlerInfo info = (HandlerInfo)key.attachment(); - assert(info != null); - handler = info.handler; + handler = (EventHandler)key.attachment(); } } assert(handler != null); - if (remove != null) + if (finished) { // - // Call finished() on the handler if necessary. + // Notify a handler about it's removal from the thread + // pool. // - if (remove.callFinished) - { - handler.finished(); - handler._stream.destroy(); - } + handler.finished(); + handler._stream.destroy(); synchronized (this) { - _handlers--; if (handler.server()) { + assert(_servers > 0); --_servers; } + else + { + assert(_clients > 0); + --_clients; + } //System.out.println("ThreadPool - _handlers = " + _handlers + ", _servers = " + _servers); - if (_handlers == 0 || _servers == 0) + if (_clients == 0 || _servers == 0) { notifyAll(); // For waitUntil...Finished() methods. } @@ -624,6 +666,7 @@ catch (RuntimeException ex) } catch (Ice.LocalException ex) { +//System.out.println("ThreadPool - informing handler about exception " + ex); handler.exception(ex); continue repeatSelect; } @@ -714,33 +757,18 @@ catch (RuntimeException ex) return true; } - private static final class HandlerInfo + private static final class FdHandlerPair { java.nio.channels.SelectableChannel fd; EventHandler handler; - java.nio.channels.SelectionKey key; - HandlerInfo next; - HandlerInfo(java.nio.channels.SelectableChannel fd, EventHandler handler) + FdHandlerPair(java.nio.channels.SelectableChannel fd, EventHandler handler) { this.fd = fd; this.handler = handler; } } - private static final class RemoveInfo - { - java.nio.channels.SelectableChannel fd; - boolean callFinished; - RemoveInfo next; - - RemoveInfo(java.nio.channels.SelectableChannel fd, boolean callFinished) - { - this.fd = fd; - this.callFinished = callFinished; - } - } - private Instance _instance; private Ice.Logger _logger; private Ice.Properties _properties; @@ -751,9 +779,9 @@ catch (RuntimeException ex) private java.nio.channels.Selector _selector; private java.util.Set _keys; private java.util.Iterator _keysIter; - private HandlerInfo _adds; - private RemoveInfo _removes; + private java.util.LinkedList _changes = new java.util.LinkedList(); private int _handlers; + private int _clients; private int _servers; private int _timeout; private RecursiveMutex _threadMutex = new RecursiveMutex(); |