diff options
author | Mark Spruiell <mes@zeroc.com> | 2006-05-15 19:08:14 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2006-05-15 19:08:14 +0000 |
commit | 785779393681eddac36a9e4b2f779c6794ca2b51 (patch) | |
tree | ab36984b576c97655a00ff99321ee7e5a28d7e65 /java/src/IceInternal | |
parent | Fixed typo in string conversion (diff) | |
download | ice-785779393681eddac36a9e4b2f779c6794ca2b51.tar.bz2 ice-785779393681eddac36a9e4b2f779c6794ca2b51.tar.xz ice-785779393681eddac36a9e4b2f779c6794ca2b51.zip |
thread pool changes to support Java5 SSL plugin
Diffstat (limited to 'java/src/IceInternal')
-rw-r--r-- | java/src/IceInternal/EventHandler.java | 5 | ||||
-rw-r--r-- | java/src/IceInternal/IncomingConnectionFactory.java | 3 | ||||
-rw-r--r-- | java/src/IceInternal/TcpTransceiver.java | 4 | ||||
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 296 | ||||
-rw-r--r-- | java/src/IceInternal/Transceiver.java | 6 | ||||
-rw-r--r-- | java/src/IceInternal/UdpTransceiver.java | 4 |
6 files changed, 182 insertions, 136 deletions
diff --git a/java/src/IceInternal/EventHandler.java b/java/src/IceInternal/EventHandler.java index 7775685f433..a942e79d7d9 100644 --- a/java/src/IceInternal/EventHandler.java +++ b/java/src/IceInternal/EventHandler.java @@ -25,7 +25,10 @@ public abstract class EventHandler // Read data via the event handler. May only be called if // readable() returns true. // - abstract public void read(BasicStream is); + // NOTE: In Java, read returns true if the handler has more data + // data available, and therefore read should be called again. + // + abstract public boolean read(BasicStream is); // // A complete message has been received. diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index df032a220da..86d67671eef 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -207,11 +207,12 @@ public final class IncomingConnectionFactory extends EventHandler return false; } - public void + public boolean read(BasicStream unused) { assert(!_instance.threadPerConnection()); // Only for use with a thread pool. assert(false); // Must not be called. + return false; } public void diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java index f435be5f76f..e2e7418d22f 100644 --- a/java/src/IceInternal/TcpTransceiver.java +++ b/java/src/IceInternal/TcpTransceiver.java @@ -215,7 +215,7 @@ final class TcpTransceiver implements Transceiver } } - public void + public boolean read(BasicStream stream, int timeout) { java.nio.ByteBuffer buf = stream.prepareRead(); @@ -307,6 +307,8 @@ final class TcpTransceiver implements Transceiver throw se; } } + + return false; } public String diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index e0fb807d01a..5538bc9af22 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -449,7 +449,7 @@ public final class ThreadPool } } - while(true) + while(true) { if(TRACE_REGISTRATION) { @@ -462,160 +462,175 @@ public final class ThreadPool trace(" " + key.channel()); } } - - select(); EventHandler handler = null; + + // + // Only call select() if there are no pending handlers with additional data + // for us to read. + // + if(!_pendingHandlers.isEmpty()) + { + handler = (EventHandler)_pendingHandlers.removeFirst(); + } + else + { + select(); + } + boolean finished = false; boolean shutdown = false; - synchronized(this) + if(handler == null) { - if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout. - { - if(TRACE_SELECT) - { - trace("timeout"); - } - - assert(_timeout > 0); - _timeout = 0; - shutdown = true; - } - else + synchronized(this) { - if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable()) + if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout. { - if(TRACE_SELECT || TRACE_INTERRUPT) + if(TRACE_SELECT) { - trace("detected interrupt"); + trace("timeout"); } - // - // There are two possiblities for an interrupt: - // - // 1. The thread pool has been destroyed. - // - // 2. An event handler was registered or unregistered. - // - - // - // Thread pool destroyed? - // - if(_destroyed) + assert(_timeout > 0); + _timeout = 0; + shutdown = true; + } + else + { + if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable()) { - if(TRACE_SHUTDOWN) + if(TRACE_SELECT || TRACE_INTERRUPT) { - trace("destroyed, thread id = " + Thread.currentThread()); + trace("detected interrupt"); } // - // Don't clear the interrupt fd if - // destroyed, so that the other threads - // exit as well. + // There are two possiblities for an interrupt: // - return true; - } - - // - // Remove the interrupt channel from the - // selected key set. - // - _keys.remove(_fdIntrReadKey); - - clearInterrupt(); - - // - // An event handler must have been registered - // or unregistered. - // - assert(!_changes.isEmpty()); - FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); - - if(change.handler != null) // Addition if handler is set. - { - int op; - if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) - { - op = java.nio.channels.SelectionKey.OP_READ; - } - else + // 1. The thread pool has been destroyed. + // + // 2. An event handler was registered or unregistered. + // + + // + // Thread pool destroyed? + // + if(_destroyed) { - op = java.nio.channels.SelectionKey.OP_ACCEPT; - } + if(TRACE_SHUTDOWN) + { + trace("destroyed, thread id = " + Thread.currentThread()); + } - java.nio.channels.SelectionKey key = null; - try - { - key = change.fd.register(_selector, op, change.handler); + // + // Don't clear the interrupt fd if + // destroyed, so that the other threads + // exit as well. + // + return true; } - catch(java.nio.channels.ClosedChannelException ex) + + // + // Remove the interrupt channel from the + // selected key set. + // + _keys.remove(_fdIntrReadKey); + + clearInterrupt(); + + // + // An event handler must have been registered + // or unregistered. + // + assert(!_changes.isEmpty()); + FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); + + if(change.handler != null) // Addition if handler is set. { - assert(false); + int op; + if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) + { + op = java.nio.channels.SelectionKey.OP_READ; + } + else + { + op = java.nio.channels.SelectionKey.OP_ACCEPT; + } + + java.nio.channels.SelectionKey key = null; + try + { + key = change.fd.register(_selector, op, change.handler); + } + catch(java.nio.channels.ClosedChannelException ex) + { + assert(false); + } + _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key)); + + if(TRACE_REGISTRATION) + { + trace("added handler (" + change.handler.getClass().getName() + ") for fd " + + change.fd); + } + + continue; } - _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key)); - - if(TRACE_REGISTRATION) + else // Removal if handler is not set. { - trace("added handler (" + change.handler.getClass().getName() + ") for fd " + - change.fd); + HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd); + assert(pair != null); + handler = pair.handler; + finished = true; + pair.key.cancel(); + + if(TRACE_REGISTRATION) + { + trace("removed handler (" + handler.getClass().getName() + ") for fd " + + change.fd); + } + + // Don't continue; we have to call + // finished() on the event handler below, + // outside the thread synchronization. } - - continue; } - else // Removal if handler is not set. + else { - HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd); - assert(pair != null); - handler = pair.handler; - finished = true; - pair.key.cancel(); - - if(TRACE_REGISTRATION) + java.nio.channels.SelectionKey key = null; + java.util.Iterator iter = _keys.iterator(); + while(iter.hasNext()) { - trace("removed handler (" + handler.getClass().getName() + ") for fd " + - change.fd); + // + // Ignore selection keys that have been cancelled + // + java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)iter.next(); + iter.remove(); + if(k.isValid() && key != _fdIntrReadKey) + { + if(TRACE_SELECT) + { + trace("found a key: " + keyToString(k)); + } + + key = k; + break; + } } - - // Don't continue; we have to call - // finished() on the event handler below, - // outside the thread synchronization. - } - } - else - { - java.nio.channels.SelectionKey key = null; - java.util.Iterator iter = _keys.iterator(); - while(iter.hasNext()) - { - // - // Ignore selection keys that have been cancelled - // - java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)iter.next(); - iter.remove(); - if(k.isValid() && key != _fdIntrReadKey) + + if(key == null) { if(TRACE_SELECT) { - trace("found a key: " + keyToString(k)); + trace("didn't find a valid key"); } - key = k; - break; - } - } - - if(key == null) - { - if(TRACE_SELECT) - { - trace("didn't find a valid key"); + continue; } - continue; + handler = (EventHandler)key.attachment(); } - - handler = (EventHandler)key.attachment(); } } } @@ -660,7 +675,7 @@ public final class ThreadPool if(finished) { // - // Notify a handler about it's removal from + // Notify a handler about its removal from // the thread pool. // try @@ -697,7 +712,14 @@ public final class ThreadPool { try { - read(handler); + // + // If read returns true, the handler has more data for the thread pool + // to process. + // + if(read(handler)) + { + _pendingHandlers.add(handler); + } } catch(Ice.TimeoutException ex) // Expected. { @@ -705,7 +727,7 @@ public final class ThreadPool } catch(Ice.DatagramLimitException ex) // Expected. { - continue; + continue; } catch(Ice.SocketException ex) { @@ -721,12 +743,12 @@ public final class ThreadPool } catch(Ice.LocalException ex) { - if(handler.datagram()) + if(handler.datagram()) { if(_instance.initializationData().properties.getPropertyAsInt( - "Ice.Warn.Connections") > 0) + "Ice.Warn.Connections") > 0) { - _instance.initializationData().logger.warning( + _instance.initializationData().logger.warning( "datagram connection exception:\n" + ex + "\n" + handler.toString()); } } @@ -734,9 +756,9 @@ public final class ThreadPool { if(TRACE_EXCEPTION) { - trace("informing handler (" + handler.getClass().getName() + + trace("informing handler (" + handler.getClass().getName() + ") about exception " + ex); - ex.printStackTrace(); + ex.printStackTrace(); } handler.exception(ex); @@ -749,7 +771,7 @@ public final class ThreadPool } // - // Provide a new mesage to the handler. + // Provide a new message to the handler. // try { @@ -885,9 +907,11 @@ public final class ThreadPool } } - private void + private boolean read(EventHandler handler) { + boolean moreData = false; + BasicStream stream = handler._stream; if(stream.size() == 0) @@ -898,7 +922,7 @@ public final class ThreadPool if(stream.pos() != stream.size()) { - handler.read(stream); + moreData = handler.read(stream); assert(stream.pos() == stream.size()); } @@ -980,10 +1004,12 @@ public final class ThreadPool } else { - handler.read(stream); + moreData = handler.read(stream); assert(stream.pos() == stream.size()); } } + + return moreData; } /* @@ -1184,6 +1210,14 @@ public final class ThreadPool private int _timeout; + // + // Since the Java5 SSL transceiver can read more data from the socket than is + // actually requested, we have to keep a separate list of handlers that need + // the thread pool to read more data before it re-enters a blocking call to + // select(). + // + private java.util.LinkedList _pendingHandlers = new java.util.LinkedList(); + private final class EventHandlerThread extends Thread { EventHandlerThread(String name) diff --git a/java/src/IceInternal/Transceiver.java b/java/src/IceInternal/Transceiver.java index b23b6e24240..4934b6cdfe0 100644 --- a/java/src/IceInternal/Transceiver.java +++ b/java/src/IceInternal/Transceiver.java @@ -16,7 +16,11 @@ public interface Transceiver void shutdownWrite(); void shutdownReadWrite(); void write(BasicStream stream, int timeout); - void read(BasicStream stream, int timeout); + // + // NOTE: In Java, read() returns a boolean to indicate whether the transceiver + // has read more data than requested. + // + boolean read(BasicStream stream, int timeout); String type(); String toString(); } diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java index 9c324fc8c90..335a23329ec 100644 --- a/java/src/IceInternal/UdpTransceiver.java +++ b/java/src/IceInternal/UdpTransceiver.java @@ -118,7 +118,7 @@ final class UdpTransceiver implements Transceiver } } - public void + public boolean read(BasicStream stream, int timeout) // NOTE: timeout is not used { assert(stream.pos() == 0); @@ -218,6 +218,8 @@ final class UdpTransceiver implements Transceiver stream.resize(ret, true); stream.pos(ret); + + return false; } public String |