diff options
author | Benoit Foucher <benoit@zeroc.com> | 2008-03-06 10:13:42 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2008-03-06 10:13:42 +0100 |
commit | c6dbd090d9691cc0116a2967b2827b858b184dfe (patch) | |
tree | 6d2ad80c98665c9090b16f97c400ab4b33c7ab73 /java/src/IceInternal/ThreadPool.java | |
parent | Merge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff) | |
download | ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.bz2 ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.xz ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.zip |
Removed thread-per-connection and added serialize mode
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 660 |
1 files changed, 142 insertions, 518 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index f38ea735b6f..6f19615ecf0 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -26,11 +26,13 @@ public final class ThreadPool _destroyed = false; _prefix = prefix; _timeout = timeout; + _selector = new Selector(instance, timeout); _threadIndex = 0; _running = 0; _inUse = 0; _load = 1.0; _promote = true; + _serialize = _instance.initializationData().properties.getPropertyAsInt(_prefix + ".Serialize") > 0; _warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; String programName = _instance.initializationData().properties.getProperty("Ice.ProgramName"); @@ -43,29 +45,6 @@ public final class ThreadPool _programNamePrefix = ""; } - Network.SocketPair pair = Network.createPipe(); - _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; - _fdIntrWrite = pair.sink; - - try - { - _selector = java.nio.channels.Selector.open(); - pair.source.configureBlocking(false); - _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ); - } - catch(java.io.IOException ex) - { - Ice.SyscallException sys = new Ice.SyscallException(); - sys.initCause(ex); - throw sys; - } - - // - // The Selector holds a Set representing the selected keys. The - // Set reference doesn't change, so we obtain it once here. - // - _keys = _selector.selectedKeys(); - // // We use just one thread as the default. This is the fastest // possible setting, still allows one level of nesting, and @@ -144,53 +123,71 @@ public final class ThreadPool } assert(!_destroyed); - assert(_handlerMap.isEmpty()); _destroyed = true; - setInterrupt(); + _selector.setInterrupt(); } public synchronized void - _register(java.nio.channels.SelectableChannel fd, EventHandler handler) + _register(EventHandler handler) { - if(TRACE_REGISTRATION) + assert(!_destroyed); + + if(!handler._registered) { - trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd); + if(TRACE_REGISTRATION) + { + trace("adding handler of type " + handler.getClass().getName() + " for channel " + handler.fd()); + } + + if(!handler._serializing) + { + _selector.add(handler, SocketStatus.NeedRead); + } + handler._registered = true; } - assert(!_destroyed); - _changes.add(new FdHandlerPair(fd, handler)); - setInterrupt(); } public synchronized void - unregister(java.nio.channels.SelectableChannel fd) + unregister(EventHandler handler) { - if(TRACE_REGISTRATION) + assert(!_destroyed); + if(handler._registered) { - if(TRACE_STACK_TRACE) + if(TRACE_REGISTRATION) { - java.io.StringWriter sw = new java.io.StringWriter(); - try - { - throw new RuntimeException(); - } - catch(RuntimeException ex) - { - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - } - trace("removing handler for channel " + fd + "\n" + sw.toString()); + trace("removing handler for channel " + handler.fd()); } - else + + if(!handler._serializing) { - trace("removing handler for channel " + fd); + _selector.remove(handler); } + handler._registered = false; } + } + public synchronized void + finish(EventHandler handler) + { assert(!_destroyed); - _changes.add(new FdHandlerPair(fd, null)); - setInterrupt(); - } + + if(TRACE_REGISTRATION) + { + trace("finishing handler for channel " + handler.fd()); + } + + if(handler._registered) + { + if(!handler._serializing) + { + _selector.remove(handler); + } + handler._registered = false; + } + + _finished.add(handler); + _selector.setInterrupt(); + } public synchronized void execute(ThreadPoolWorkItem workItem) @@ -200,16 +197,25 @@ public final class ThreadPool throw new Ice.CommunicatorDestroyedException(); } _workItems.add(workItem); - setInterrupt(); + _selector.setInterrupt(); } public void - promoteFollower() + promoteFollower(EventHandler handler) { if(_sizeMax > 1) { synchronized(this) { + if(_serialize && handler != null) + { + handler._serializing = true; + if(handler._registered) + { + _selector.remove(handler); + } + } + assert(!_promote); _promote = true; notify(); @@ -280,65 +286,9 @@ public final class ThreadPool } // - // Cleanup the selector, and the socket pair. + // Destroy the selector // - try - { - if(_selector != null) - { - try - { - _selector.close(); - } - catch(java.io.IOException ex) - { - // - // BUGFIX: - // - // Ignore this exception. This shouldn't happen - // but for some reasons the close() call raises - // "java.io.IOException: Bad file descriptor" on - // Mac OS X 10.3.x (it works fine on OS X 10.4.x) - // - } - _selector = null; - } - - if(_fdIntrWrite != null) - { - try - { - _fdIntrWrite.close(); - } - catch(java.io.IOException ex) - { - // - // BUGFIX: - // - // Ignore this exception. This shouldn't happen - // but for some reasons the close() call raises - // "java.io.IOException: No such file or - // directory" under Linux with JDK 1.4.2. - // - } - _fdIntrWrite = null; - } - - if(_fdIntrRead != null) - { - _fdIntrRead.close(); - _fdIntrRead = null; - } - } - catch(java.io.IOException ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "' while calling close():\n" + sw.toString(); - _instance.initializationData().logger.error(s); - } + _selector.destroy(); } public String @@ -346,91 +296,6 @@ public final class ThreadPool { return _prefix; } - - private void - clearInterrupt() - { - if(TRACE_INTERRUPT) - { - trace("clearInterrupt"); - if(TRACE_STACK_TRACE) - { - try - { - throw new RuntimeException(); - } - catch(RuntimeException ex) - { - ex.printStackTrace(); - } - } - } - - byte b = 0; - - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - try - { - while(true) - { - buf.rewind(); - if(_fdIntrRead.read(buf) != 1) - { - break; - } - - if(TRACE_INTERRUPT) - { - trace("clearInterrupt got byte " + (int)buf.get(0)); - } - - b = buf.get(0); - break; - } - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - - private void - setInterrupt() - { - if(TRACE_INTERRUPT) - { - trace("setInterrupt()"); - if(TRACE_STACK_TRACE) - { - try - { - throw new RuntimeException(); - } - catch(RuntimeException ex) - { - ex.printStackTrace(); - } - } - } - - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - buf.put(0, (byte)0); - while(buf.hasRemaining()) - { - try - { - _fdIntrWrite.write(buf); - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - } // // Each thread supplies a BasicStream, to avoid creating excessive @@ -465,206 +330,85 @@ public final class ThreadPool while(true) { - if(TRACE_REGISTRATION) - { - java.util.Set<java.nio.channels.SelectionKey> keys = _selector.keys(); - trace("selecting on " + keys.size() + " channels:"); - java.util.Iterator<java.nio.channels.SelectionKey> i = keys.iterator(); - while(i.hasNext()) - { - java.nio.channels.SelectionKey key = i.next(); - trace(" " + key.channel()); - } - } - - EventHandler handler = null; - ThreadPoolWorkItem workItem = null; - - // - // Only call select() if there are no pending handlers with additional data - // for us to read. - // - if(!_pendingHandlers.isEmpty()) + try { - handler = _pendingHandlers.removeFirst(); + _selector.select(); } - else + catch(java.io.IOException ex) { - select(); + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + //throw se; + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + se.printStackTrace(pw); + pw.flush(); + String s = "exception in `" + _prefix + "':\n" + sw.toString(); + _instance.initializationData().logger.error(s); + continue; } + EventHandler handler = null; + ThreadPoolWorkItem workItem = null; boolean finished = false; boolean shutdown = false; - if(handler == null) + synchronized(this) { - synchronized(this) + if(_selector.checkTimeout()) + { + assert(_timeout > 0); + shutdown = true; + } + else if(_selector.isInterrupted()) { - if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout. + if(_selector.processInterrupt()) { - if(TRACE_SELECT) - { - trace("timeout"); - } - - assert(_timeout > 0); - _timeout = 0; - shutdown = true; + continue; + } + + // + // There are three possiblities for an interrupt: + // + // 1. The thread pool has been destroyed. + // + // 2. An event handler is being finished. + // + // 3. A work item has been scheduled. + // + + if(!_finished.isEmpty()) + { + _selector.clearInterrupt(); + handler = _finished.removeFirst(); + finished = true; + } + else if(!_workItems.isEmpty()) + { + // + // Work items must be executed first even if the thread pool is destroyed. + // + _selector.clearInterrupt(); + workItem = _workItems.removeFirst(); + } + else if(_destroyed) + { + // + // Don't clear the interrupt if destroyed, so that the other threads exit as well. + // + return true; } else { - if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable()) - { - if(TRACE_SELECT || TRACE_INTERRUPT) - { - trace("detected interrupt"); - } - - // - // There are three possiblities for an interrupt: - // - // 1. The thread pool has been destroyed. - // - // 2. An event handler was registered or unregistered. - // - // 3. A work item has been scheduled. - // - - if(!_workItems.isEmpty()) - { - // - // Work items must be executed first even if the thread pool is destroyed. - // - - // - // Remove the interrupt channel from the selected key set. - // - _keys.remove(_fdIntrReadKey); - clearInterrupt(); - assert(!_workItems.isEmpty()); - workItem = _workItems.removeFirst(); - } - else if(_destroyed) - { - if(TRACE_SHUTDOWN) - { - trace("destroyed, thread id = " + Thread.currentThread()); - } - - // - // Don't clear the interrupt fd if destroyed, so that the other threads exit as well. - // - return true; - } - else - { - // - // Remove the interrupt channel from the selected key set. - // - _keys.remove(_fdIntrReadKey); - clearInterrupt(); - - // - // An event handler must have been registered or unregistered. - // - assert(!_changes.isEmpty()); - FdHandlerPair change = _changes.removeFirst(); - - if(change.handler != null) // Addition if handler is set. - { - int op; - if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) - { - op = java.nio.channels.SelectionKey.OP_READ; - } - else - { - op = java.nio.channels.SelectionKey.OP_ACCEPT; - } - - java.nio.channels.SelectionKey key = null; - try - { - key = change.fd.register(_selector, op, change.handler); - } - catch(java.nio.channels.ClosedChannelException ex) - { - assert(false); - } - _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key)); - - // - // If the handler is readable and already has some data to read add it - // to the _pendingHandlers list to ensure it will be processed. - // - if(change.handler.readable() && change.handler.hasMoreData()) - { - _pendingHandlers.add(change.handler); - } - - if(TRACE_REGISTRATION) - { - trace("added handler (" + change.handler.getClass().getName() + ") for fd " + - change.fd); - } - - continue; - } - else // Removal if handler is not set. - { - HandlerKeyPair pair = _handlerMap.remove(change.fd); - assert(pair != null); - handler = pair.handler; - finished = true; - pair.key.cancel(); - - if(TRACE_REGISTRATION) - { - trace("removed handler (" + handler.getClass().getName() + ") for fd " + - change.fd); - } - - // Don't continue; we have to call - // finished() on the event handler below, - // outside the thread synchronization. - } - } - } - else - { - java.nio.channels.SelectionKey key = null; - java.util.Iterator<java.nio.channels.SelectionKey> iter = _keys.iterator(); - while(iter.hasNext()) - { - // - // Ignore selection keys that have been cancelled - // - java.nio.channels.SelectionKey k = iter.next(); - iter.remove(); - if(k.isValid() && k != _fdIntrReadKey) - { - if(TRACE_SELECT) - { - trace("found a key: " + keyToString(k)); - } - - key = k; - break; - } - } - - if(key == null) - { - if(TRACE_SELECT) - { - trace("didn't find a valid key"); - } - - continue; - } - - handler = (EventHandler)key.attachment(); - } + assert(false); + } + } + else + { + handler = (EventHandler)_selector.getNextSelected(); + if(handler == null) + { + continue; } } } @@ -675,11 +419,6 @@ public final class ThreadPool if(shutdown) { - if(TRACE_SHUTDOWN) - { - trace("shutdown detected"); - } - // // Initiate server shutdown. // @@ -693,7 +432,7 @@ public final class ThreadPool continue; } - promoteFollower(); + promoteFollower(null); factory.shutdown(); // @@ -732,8 +471,7 @@ public final class ThreadPool if(finished) { // - // Notify a handler about its removal from - // the thread pool. + // Notify a handler about its removal from the thread pool. // try { @@ -773,14 +511,10 @@ public final class ThreadPool { continue; // Can't read without blocking. } - - // - // If the handler has more data to process add it to the _pendingHandlers list - // to ensure it will be processed. - // + if(handler.hasMoreData()) { - _pendingHandlers.add(handler); + _selector.hasMoreData(handler); } } catch(Ice.TimeoutException ex) @@ -871,6 +605,15 @@ public final class ThreadPool { if(!_destroyed) { + if(_serialize && handler != null && handler._serializing) + { + if(handler._registered) + { + _selector.add(handler, SocketStatus.NeedRead); + } + handler._serializing = false; + } + // // First we reap threads that have been // destroyed before. @@ -1146,84 +889,6 @@ public final class ThreadPool */ private void - select() - { - int ret = 0; - int spuriousWakeUp = 0; - while(true) - { - try - { - if(TRACE_SELECT) - { - trace("select on " + _selector.keys().size() + " keys, thread id = " + Thread.currentThread()); - } - - if(_timeout > 0) - { - ret = _selector.select(_timeout * 1000); - } - else - { - ret = _selector.select(); - } - } - catch(java.io.IOException ex) - { - // - // Pressing Ctrl-C causes select() to raise an - // IOException, which seems like a JDK bug. We trap - // for that special case here and ignore it. - // Hopefully we're not masking something important! - // - if(Network.interrupted(ex)) - { - continue; - } - - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - //throw se; - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - se.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "':\n" + sw.toString(); - _instance.initializationData().logger.error(s); - continue; - } - - if(TRACE_SELECT) - { - trace("select() returned " + ret + ", _keys.size() = " + _keys.size()); - } - - if(ret == 0 && _timeout <= 0) - { - // - // This is necessary to prevent a busy loop in case of a spurious wake-up which - // sometime occurs in the client thread pool when the communicator is destroyed. - // If there are too many successive spurious wake-ups, we log an error. - // - try - { - Thread.currentThread().sleep(1); - } - catch(java.lang.InterruptedException ex) - { - } - - if(++spuriousWakeUp > 100) - { - _instance.initializationData().logger.error("spurious selector wake up in `" + _prefix + "'"); - } - } - - break; - } - } - - private void trace(String msg) { System.err.println(_prefix + ": " + msg); @@ -1253,57 +918,15 @@ public final class ThreadPool return key.channel() + " " + ops; } - private static final class FdHandlerPair - { - java.nio.channels.SelectableChannel fd; - EventHandler handler; - - FdHandlerPair(java.nio.channels.SelectableChannel fd, EventHandler handler) - { - this.fd = fd; - this.handler = handler; - } - } - - private static final class HandlerKeyPair - { - EventHandler handler; - java.nio.channels.SelectionKey key; - - HandlerKeyPair(EventHandler handler, java.nio.channels.SelectionKey key) - { - this.handler = handler; - this.key = key; - } - } - private Instance _instance; private boolean _destroyed; private final String _prefix; private final String _programNamePrefix; - - private java.nio.channels.ReadableByteChannel _fdIntrRead; - private java.nio.channels.SelectionKey _fdIntrReadKey; - private java.nio.channels.WritableByteChannel _fdIntrWrite; - private java.nio.channels.Selector _selector; - private java.util.Set<java.nio.channels.SelectionKey> _keys; - - private java.util.LinkedList<FdHandlerPair> _changes = new java.util.LinkedList<FdHandlerPair>(); + private final Selector _selector; private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>(); - - private java.util.Map<java.nio.channels.SelectableChannel, HandlerKeyPair> _handlerMap = - new java.util.HashMap<java.nio.channels.SelectableChannel, HandlerKeyPair>(); - + private java.util.LinkedList<EventHandler> _finished = new java.util.LinkedList<EventHandler>(); private int _timeout; - // - // Since the Java5 SSL transceiver can read more data from the socket than is - // actually requested, we have to keep a separate list of handlers that need - // the thread pool to read more data before it re-enters a blocking call to - // select(). - // - private java.util.LinkedList<EventHandler> _pendingHandlers = new java.util.LinkedList<EventHandler>(); - private final class EventHandlerThread extends Thread { EventHandlerThread(String name) @@ -1377,6 +1000,7 @@ public final class ThreadPool private final int _size; // Number of threads that are pre-created. private final int _sizeMax; // Maximum number of threads. private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed. + private final boolean _serialize; // True if requests need to be serialized over the connection. private java.util.List<EventHandlerThread> _threads; // All threads, running or not. private int _threadIndex; // For assigning thread names. |