diff options
author | Mark Spruiell <mes@zeroc.com> | 2002-04-16 23:02:05 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2002-04-16 23:02:05 +0000 |
commit | f735a39d64deba47b416e41442d1321f9c972d8c (patch) | |
tree | 12a7b8ed7c594f13ee511cec9e409669e7ad90fd /java/src | |
parent | Win32 fixes for IceBox (diff) | |
download | ice-f735a39d64deba47b416e41442d1321f9c972d8c.tar.bz2 ice-f735a39d64deba47b416e41442d1321f9c972d8c.tar.xz ice-f735a39d64deba47b416e41442d1321f9c972d8c.zip |
align with C++ - fixes for the thread pool
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/IceInternal/Connection.java | 39 | ||||
-rw-r--r-- | java/src/IceInternal/IncomingConnectionFactory.java | 60 | ||||
-rw-r--r-- | java/src/IceInternal/Network.java | 25 | ||||
-rw-r--r-- | java/src/IceInternal/TcpAcceptor.java | 2 | ||||
-rw-r--r-- | java/src/IceInternal/TcpConnector.java | 1 | ||||
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 258 | ||||
-rw-r--r-- | java/src/IceInternal/UdpTransceiver.java | 2 |
7 files changed, 227 insertions, 160 deletions
diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java index f8ff8bca835..7e9162d9730 100644 --- a/java/src/IceInternal/Connection.java +++ b/java/src/IceInternal/Connection.java @@ -76,10 +76,11 @@ public final class Connection extends EventHandler try { assert(_proxyUsageCount >= 0); - if (--_proxyUsageCount == 0) + --_proxyUsageCount; + if (_proxyUsageCount == 0 && _adapter == null) { assert(_requests.isEmpty()); - setState(StateClosing); + setState(StateClosing, new Ice.CloseConnectionException()); } } finally @@ -285,6 +286,25 @@ public final class Connection extends EventHandler _mutex.lock(); try { + // + // In closed and holding state, we are not registered with the + // thread pool. For all other states, we have to notify the thread + // pool in case this event handler changed from a client to a + // server or vice versa. + // + if (_state != StateHolding && _state != StateClosed) + { + if (adapter != null && _adapter == null) + { + _threadPool.clientIsNowServer(); + } + + if (adapter == null && _adapter != null) + { + _threadPool.serverIsNowClient(); + } + } + _adapter = adapter; } finally @@ -611,11 +631,14 @@ public final class Connection extends EventHandler _mutex.lock(); try { - _threadPool.promoteFollower(); + assert(_state == StateClosed || _state == StateHolding); - assert(_state == StateClosed); + _threadPool.promoteFollower(); - _transceiver.close(); + if (_state == StateClosed) + { + _transceiver.close(); + } } finally { @@ -651,7 +674,7 @@ public final class Connection extends EventHandler try { - setState(StateClosing); + setState(StateClosing, new Ice.CloseConnectionException()); return true; } finally @@ -810,7 +833,7 @@ public final class Connection extends EventHandler { return; } - _threadPool.unregister(_transceiver.fd(), false); + _threadPool.unregister(_transceiver.fd()); break; } @@ -841,7 +864,7 @@ public final class Connection extends EventHandler // _threadPool._register(_transceiver.fd(), this); } - _threadPool.unregister(_transceiver.fd(), true); + _threadPool.unregister(_transceiver.fd()); break; } } diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index 57d03cd283b..d6f8b57d37d 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -155,40 +155,44 @@ public class IncomingConnectionFactory extends EventHandler public synchronized void finished() { - _threadPool.promoteFollower(); + assert(_state == StateClosed || _state == StateHolding); - assert(_state == StateClosed); - assert(_connections.isEmpty()); + _threadPool.promoteFollower(); - try + if (_state == StateClosed) { - // - // Clear listen() backlog properly by accepting all queued - // connections, and then shutting them down. - // - while (true) + assert(_connections.isEmpty()); + + try { - try + // + // Clear listen() backlog properly by accepting all queued + // connections, and then shutting them down. + // + while (true) { - Transceiver transceiver = _acceptor.accept(0); - Connection connection = new Connection(_instance, transceiver, _endpoint, _adapter); - connection.exception(new Ice.ObjectAdapterDeactivatedException()); - } - catch (Ice.TimeoutException ex) - { - break; // Exit loop on timeout. + try + { + Transceiver transceiver = _acceptor.accept(0); + Connection connection = new Connection(_instance, transceiver, _endpoint, _adapter); + connection.exception(new Ice.ObjectAdapterDeactivatedException()); + } + catch (Ice.TimeoutException ex) + { + break; // Exit loop on timeout. + } } } - } - catch (Ice.LocalException ex) - { - if (_warn) + catch (Ice.LocalException ex) { - warning(ex); + if (_warn) + { + warning(ex); + } } - } - _acceptor.close(); + _acceptor.close(); + } } public void @@ -307,14 +311,14 @@ public class IncomingConnectionFactory extends EventHandler case StateHolding: { - if (_state != StateActive) // Can only switch from active to - { // holding + if (_state != StateActive) // Can only switch from active to holding + { return; } if (_threadPool != null) { - _threadPool.unregister(_acceptor.fd(), false); + _threadPool.unregister(_acceptor.fd()); } java.util.ListIterator iter = _connections.listIterator(); @@ -338,7 +342,7 @@ public class IncomingConnectionFactory extends EventHandler { _threadPool._register(_acceptor.fd(), this); } - _threadPool.unregister(_acceptor.fd(), true); + _threadPool.unregister(_acceptor.fd()); } java.util.ListIterator iter = _connections.listIterator(); diff --git a/java/src/IceInternal/Network.java b/java/src/IceInternal/Network.java index 67612caf124..2f5634411f3 100644 --- a/java/src/IceInternal/Network.java +++ b/java/src/IceInternal/Network.java @@ -21,7 +21,6 @@ public final class Network java.net.Socket socket = fd.socket(); socket.setTcpNoDelay(true); socket.setKeepAlive(true); - fd.configureBlocking(false); return fd; } catch (java.io.IOException ex) @@ -45,7 +44,6 @@ public final class Network //java.net.Socket socket = fd.socket(); //socket.setTcpNoDelay(true); //socket.setKeepAlive(true); - fd.configureBlocking(false); return fd; } catch (java.io.IOException ex) @@ -61,9 +59,22 @@ public final class Network { try { - java.nio.channels.DatagramChannel fd = java.nio.channels.DatagramChannel.open(); - fd.configureBlocking(false); - return fd; + return java.nio.channels.DatagramChannel.open(); + } + catch (java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } + } + + public static void + setBlock(java.nio.channels.SelectableChannel fd, boolean block) + { + try + { + fd.configureBlocking(block); } catch (java.io.IOException ex) { @@ -304,10 +315,6 @@ public final class Network java.net.Socket socket = result.socket(); socket.setTcpNoDelay(true); socket.setKeepAlive(true); - // - // Need to set non-blocking in order to use Selector - // - result.configureBlocking(false); } catch (java.io.IOException ex) { diff --git a/java/src/IceInternal/TcpAcceptor.java b/java/src/IceInternal/TcpAcceptor.java index 3c339d8fe33..53761d0c731 100644 --- a/java/src/IceInternal/TcpAcceptor.java +++ b/java/src/IceInternal/TcpAcceptor.java @@ -58,6 +58,7 @@ class TcpAcceptor implements Acceptor accept(int timeout) { java.nio.channels.SocketChannel fd = Network.doAccept(_fd, timeout); + Network.setBlock(fd, false); if (_traceLevels.network >= 1) { @@ -102,6 +103,7 @@ class TcpAcceptor implements Acceptor try { _fd = Network.createTcpServerSocket(); + Network.setBlock(_fd, false); java.net.InetSocketAddress addr = new java.net.InetSocketAddress(host, port); _addr = Network.doBind(_fd, addr); } diff --git a/java/src/IceInternal/TcpConnector.java b/java/src/IceInternal/TcpConnector.java index 91ef1c2c75c..3361f9745b3 100644 --- a/java/src/IceInternal/TcpConnector.java +++ b/java/src/IceInternal/TcpConnector.java @@ -22,6 +22,7 @@ final class TcpConnector implements Connector } java.nio.channels.SocketChannel fd = Network.createTcpSocket(); + Network.setBlock(fd, false); Network.doConnect(fd, _addr, timeout); if (_traceLevels.network >= 1) diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 24e6043f586..c73731bccc4 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -19,19 +19,39 @@ public final class ThreadPool { ++_servers; } - HandlerInfo info = new HandlerInfo(fd, handler); - info.next = _adds; - _adds = info; - setInterrupt(); + else + { + ++_clients; + } + _changes.add(new FdHandlerPair(fd, handler)); + setInterrupt(0); + } + + public synchronized void + unregister(java.nio.channels.SelectableChannel fd) + { + _changes.add(new FdHandlerPair(fd, null)); + setInterrupt(0); } public synchronized void - unregister(java.nio.channels.SelectableChannel fd, boolean callFinished) + serverIsNowClient() { - RemoveInfo info = new RemoveInfo(fd, callFinished); - info.next = _removes; - _removes = info; - setInterrupt(); + ++_clients; + assert(_servers > 0); + --_servers; + if (_servers == 0) + { + notifyAll(); // For waitUntil...Finished() methods. + } + } + + public synchronized void + clientIsNowServer() + { + ++_servers; + assert(_clients > 0); + --_clients; } public void @@ -41,26 +61,17 @@ public final class ThreadPool _threadMutex.unlock(); } - public synchronized void + public void initiateServerShutdown() { //System.out.println("ThreadPool - initiate server shutdown"); - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - buf.put(0, (byte)1); - try - { - int n = _fdIntrWrite.write(buf); - assert(n == 1); - } - catch (java.io.IOException ex) - { - } + setInterrupt(1); } public synchronized void waitUntilServerFinished() { - while (_servers != 0 && _threadNum != 0) + while (_clients + _servers != 0 && _threadNum != 0) { try { @@ -92,11 +103,15 @@ public final class ThreadPool } } - if (_handlers != 0) + if (_clients + _servers != 0) { _logger.error("can't wait for graceful application termination in thread pool\n" + "since all threads have vanished"); } + else + { + assert(_handlers == 0); + } } public void @@ -152,9 +167,8 @@ public final class ThreadPool _logger = _instance.logger(); _properties = _instance.properties(); _destroyed = false; - _adds = null; - _removes = null; _handlers = 0; + _clients = 0; _servers = 0; _timeout = 0; @@ -285,7 +299,7 @@ public final class ThreadPool //System.out.println("ThreadPool - destroy"); assert(!_destroyed); _destroyed = true; - setInterrupt(); + setInterrupt(0); } private boolean @@ -302,7 +316,8 @@ catch (RuntimeException ex) ex.printStackTrace(); } */ - boolean shutdown = false; + byte b = 0; + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); try { @@ -315,21 +330,22 @@ catch (RuntimeException ex) } //System.out.println(" clearInterrupt - got byte " + (int)buf.get(0)); - if (buf.get(0) == (byte)1) // Shutdown initiated? - { - shutdown = true; - } + b = buf.get(0); + break; } } catch (java.io.IOException ex) { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; } - return shutdown; + return b == (byte)1; // Return true if shutdown has been initiated. } private void - setInterrupt() + setInterrupt(int b) { //System.out.println("setInterrupt"); /* @@ -343,14 +359,19 @@ catch (RuntimeException ex) } //*/ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - buf.put(0, (byte)0); - try - { - int n = _fdIntrWrite.write(buf); - assert(n == 1); - } - catch (java.io.IOException ex) + buf.put(0, (byte)b); + while (buf.hasRemaining()) { + try + { + _fdIntrWrite.write(buf); + } + catch (java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } } } @@ -461,31 +482,66 @@ catch (RuntimeException ex) } EventHandler handler = null; - RemoveInfo remove = null; + boolean finished = false; synchronized (this) { - if (_destroyed) + if (_keys.remove(_fdIntrReadKey) && _fdIntrReadKey.isReadable()) { + if (!_keys.isEmpty()) + { + _keysIter = _keys.iterator(); + } + else + { + _keysIter = null; + } + + // + // There are three possibilities for an interrupt: + // + // - The thread pool has been destroyed. + // + // - Server shutdown has been initiated. + // + // - An event handler was registered or unregistered. + // + + // + // Thread pool destroyed? + // + if (_destroyed) + { //System.out.println("ThreadPool - destroyed, thread id = " + Thread.currentThread()); + // + // Don't clear the interrupt fd if destroyed, so that + // the other threads exit as well. + // + return; + } + + shutdown = clearInterrupt(); + // - // Don't clear the interrupt fd if destroyed, so that - // the other threads exit as well. + // Server shutdown? // - return; - } + if (shutdown) + { + continue repeatSelect; + } - if (_adds != null) - { // - // New handlers have been added. + // An event handler must have been registered or + // unregistered. // - HandlerInfo info = _adds; - while (info != null) + assert(!_changes.isEmpty()); + FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); + + if (change.handler != null) // Addition if handler is set. { -//System.out.println("ThreadPool - adding fd " + info.fd); +//System.out.println("ThreadPool - adding handler"); int op; - if ((info.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) + if ((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) { op = java.nio.channels.SelectionKey.OP_READ; } @@ -497,37 +553,29 @@ catch (RuntimeException ex) _handlers++; try { - info.key = info.fd.register(_selector, op, info); + change.fd.register(_selector, op, change.handler); } catch (java.nio.channels.ClosedChannelException ex) { assert(false); } - HandlerInfo next = info.next; - info.next = null; - info = next; + continue repeatSelect; + } + else // Removal if handler is not set. + { +//System.out.println("ThreadPool - removing handler"); + java.nio.channels.SelectionKey key = change.fd.keyFor(_selector); + assert(key != null); + handler = (EventHandler)key.attachment(); + finished = true; + --_handlers; + key.cancel(); + // Don't goto repeatSelect; we have to call + // finished() on the event handler below, outside + // the thread synchronization. } - _adds = null; - } - - if (_removes != null) - { - // - // Handlers are permanently removed. - // - remove = _removes; - _removes = _removes.next; - java.nio.channels.SelectionKey key = remove.fd.keyFor(_selector); - assert(key != null); - HandlerInfo hinfo = (HandlerInfo)key.attachment(); - key.cancel(); - handler = hinfo.handler; -//System.out.println("ThreadPool - remove fd = " + remove.fd); -//if (_removes != null) -// System.out.println("ThreadPool - more fds to be removed"); } - - if (handler == null) + else { java.nio.channels.SelectionKey key = null; while (_keysIter.hasNext()) @@ -537,7 +585,7 @@ catch (RuntimeException ex) // java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)_keysIter.next(); _keysIter.remove(); - if (k.isValid()) + if (k.isValid() && key != _fdIntrReadKey) { //System.out.println("ThreadPool - found a key"); key = k; @@ -557,41 +605,35 @@ catch (RuntimeException ex) continue repeatSelect; } - if (key.channel() == _fdIntrRead) - { -//System.out.println("ThreadPool - input ready on the interrupt pipe"); - shutdown = clearInterrupt(); - continue repeatSelect; - } - - HandlerInfo info = (HandlerInfo)key.attachment(); - assert(info != null); - handler = info.handler; + handler = (EventHandler)key.attachment(); } } assert(handler != null); - if (remove != null) + if (finished) { // - // Call finished() on the handler if necessary. + // Notify a handler about it's removal from the thread + // pool. // - if (remove.callFinished) - { - handler.finished(); - handler._stream.destroy(); - } + handler.finished(); + handler._stream.destroy(); synchronized (this) { - _handlers--; if (handler.server()) { + assert(_servers > 0); --_servers; } + else + { + assert(_clients > 0); + --_clients; + } //System.out.println("ThreadPool - _handlers = " + _handlers + ", _servers = " + _servers); - if (_handlers == 0 || _servers == 0) + if (_clients == 0 || _servers == 0) { notifyAll(); // For waitUntil...Finished() methods. } @@ -624,6 +666,7 @@ catch (RuntimeException ex) } catch (Ice.LocalException ex) { +//System.out.println("ThreadPool - informing handler about exception " + ex); handler.exception(ex); continue repeatSelect; } @@ -714,33 +757,18 @@ catch (RuntimeException ex) return true; } - private static final class HandlerInfo + private static final class FdHandlerPair { java.nio.channels.SelectableChannel fd; EventHandler handler; - java.nio.channels.SelectionKey key; - HandlerInfo next; - HandlerInfo(java.nio.channels.SelectableChannel fd, EventHandler handler) + FdHandlerPair(java.nio.channels.SelectableChannel fd, EventHandler handler) { this.fd = fd; this.handler = handler; } } - private static final class RemoveInfo - { - java.nio.channels.SelectableChannel fd; - boolean callFinished; - RemoveInfo next; - - RemoveInfo(java.nio.channels.SelectableChannel fd, boolean callFinished) - { - this.fd = fd; - this.callFinished = callFinished; - } - } - private Instance _instance; private Ice.Logger _logger; private Ice.Properties _properties; @@ -751,9 +779,9 @@ catch (RuntimeException ex) private java.nio.channels.Selector _selector; private java.util.Set _keys; private java.util.Iterator _keysIter; - private HandlerInfo _adds; - private RemoveInfo _removes; + private java.util.LinkedList _changes = new java.util.LinkedList(); private int _handlers; + private int _clients; private int _servers; private int _timeout; private RecursiveMutex _threadMutex = new RecursiveMutex(); diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java index b901fd2b4f4..d165a29b082 100644 --- a/java/src/IceInternal/UdpTransceiver.java +++ b/java/src/IceInternal/UdpTransceiver.java @@ -195,6 +195,7 @@ final class UdpTransceiver implements Transceiver try { _fd = Network.createUdpSocket(); + Network.setBlock(_fd, false); _addr = Network.getAddress(host, port); Network.doConnect(_fd, _addr, -1); _connect = false; // We're connected now @@ -226,6 +227,7 @@ final class UdpTransceiver implements Transceiver try { _fd = Network.createUdpSocket(); + Network.setBlock(_fd, false); java.net.InetSocketAddress addr = new java.net.InetSocketAddress(host, port); _addr = Network.doBind(_fd, addr); |