diff options
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 177 |
1 files changed, 131 insertions, 46 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index b08b4bbeaea..724c261e23f 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -37,12 +37,14 @@ public final class ThreadPool public void promoteFollower() { +//System.out.println("ThreadPool - promote follower - lock count = " + _threadMutex.count()); _threadMutex.unlock(); } public synchronized void initiateServerShutdown() { +//System.out.println("ThreadPool - initiate server shutdown"); java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); buf.put(0, (byte)1); try @@ -154,24 +156,15 @@ public final class ThreadPool _servers = 0; _timeout = 0; - try - { - java.nio.channels.Pipe pipe = java.nio.channels.Pipe.open(); - _fdIntrRead = pipe.source(); - _fdIntrWrite = pipe.sink(); - _fdIntrRead.configureBlocking(false); - } - catch (java.io.IOException ex) - { - Ice.SystemException sys = new Ice.SystemException(); - sys.initCause(ex); - throw sys; - } + Network.SocketPair pair = Network.createPipe(); + _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; + _fdIntrWrite = pair.sink; try { _selector = java.nio.channels.Selector.open(); - _fdIntrReadKey = _fdIntrRead.register(_selector, java.nio.channels.SelectionKey.OP_READ); + pair.source.configureBlocking(false); + _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ); } catch (java.io.IOException ex) { @@ -180,6 +173,23 @@ public final class ThreadPool throw sys; } + // + // The Selector holds a Set representing the selected keys. The + // Set reference doesn't change, so we obtain it once here. + // + _keys = _selector.selectedKeys(); + + // + // On Win32 platforms, once a key has been selected, it will not + // be reported again until the channel associated with that key + // has been processed. This means that we must process all of + // the keys in the selected-key set before calling select again. + // If the iterator _keysIter is null, it indicates that select needs + // to be called. Otherwise, we need to remove the next active key + // (if any) from the iterator. + // + _keysIter = null; + String value = _instance.properties().getProperty("Ice.ServerIdleTime"); if (value != null) { @@ -288,6 +298,7 @@ public final class ThreadPool synchronized void destroy() { +//System.out.println("ThreadPool - destroy"); assert(!_destroyed); _destroyed = true; setInterrupt(); @@ -296,8 +307,8 @@ public final class ThreadPool private boolean clearInterrupt() { +//System.out.println("clearInterrupt"); /* -System.out.println("clearInterrupt"); try { throw new RuntimeException(); @@ -319,7 +330,7 @@ catch (RuntimeException ex) break; } -//System.out.println(" got byte " + (int)buf.get(0)); +//System.out.println(" clearInterrupt - got byte " + (int)buf.get(0)); if (buf.get(0) == (byte)1) // Shutdown initiated? { shutdown = true; @@ -336,8 +347,8 @@ catch (RuntimeException ex) private void setInterrupt() { +//System.out.println("setInterrupt"); /* -System.out.println("setInterrupt"); try { throw new RuntimeException(); @@ -346,7 +357,7 @@ catch (RuntimeException ex) { ex.printStackTrace(); } -*/ +//*/ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); buf.put(0, (byte)0); try @@ -364,11 +375,21 @@ catch (RuntimeException ex) { boolean shutdown = false; final int timeoutMillis = _timeout * 1000; - java.util.Set keys = _selector.selectedKeys(); + + // + // On Win32 platforms, select() occasionally returns 0 when it + // is supposed to block indefinitely. As a workaround, we only + // treat this occurrence as a timeout if we have a timeout value, + // and if the proper amount of time has elapsed. This can be a + // local variable because a timeout would not be split over + // multiple threads. + // + long nextTimeout = 0; while (true) { _threadMutex.lock(); +//System.out.println("ThreadPool - thread " + Thread.currentThread() + " has the lock"); repeatSelect: @@ -381,30 +402,59 @@ catch (RuntimeException ex) _instance.objectAdapterFactory().shutdown(); } - keys.clear(); - int ret = 0; - try - { - ret = _selector.select(timeoutMillis); - } - catch (java.io.InterruptedIOException ex) + if (_keysIter == null) // Need to select. { - continue repeatSelect; + int ret = 0; + try + { +//System.out.println("ThreadPool - selecting on " + _selector.keys().size() + " keys, thread id = " + Thread.currentThread()); + if (timeoutMillis > 0 && nextTimeout == 0) + { + nextTimeout = System.currentTimeMillis() + timeoutMillis; + } + + ret = _selector.select(timeoutMillis); + } + catch (java.io.InterruptedIOException ex) + { + continue repeatSelect; + } + catch (java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } + +//System.out.println("ThreadPool - select() returned " + ret + ", _keys.size() = " + _keys.size()); + if (ret == 0) // Potential timeout. + { + if (_timeout > 0) + { + long now = System.currentTimeMillis(); + if (now >= nextTimeout) // Timeout. + { +//System.out.println("ThreadPool - timeout"); + _timeout = 0; + shutdown = true; + nextTimeout = 0; + } + } +//System.out.println("ThreadPool - timeout workaround"); + continue repeatSelect; + } + + nextTimeout = 0; } - catch (java.io.IOException ex) + else { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; +//System.out.println("ThreadPool - still have keys"); } -//System.out.println("Select() returned " + ret); - if (ret == 0) // Timeout. + if (_keysIter == null) { - assert(_timeout > 0); - _timeout = 0; - shutdown = true; - continue repeatSelect; +//System.out.println("ThreadPool - initializing _keysIter"); + _keysIter = _keys.iterator(); } EventHandler handler = null; @@ -413,7 +463,7 @@ catch (RuntimeException ex) { if (_destroyed) { -//System.out.println("ThreadPool - destroyed"); +//System.out.println("ThreadPool - destroyed, thread id = " + Thread.currentThread()); // // Don't clear the interrupt fd if destroyed, so that // the other threads exit as well. @@ -477,6 +527,7 @@ catch (RuntimeException ex) } _handlers--; info = info.next; +//System.out.println("ThreadPool - _handlers = " + _handlers + ", _servers = " + _servers); } _removes = null; @@ -494,20 +545,27 @@ catch (RuntimeException ex) } java.nio.channels.SelectionKey key = null; - java.util.Iterator i = keys.iterator(); - while (i.hasNext()) + while (_keysIter.hasNext()) { // // Ignore selection keys that have been cancelled // - java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)i.next(); + java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)_keysIter.next(); + _keysIter.remove(); if (k.isValid()) { +//System.out.println("ThreadPool - found a key"); key = k; break; } } + if (!_keysIter.hasNext()) + { + _keysIter = null; +//System.out.println("ThreadPool - reset iterator"); + } + if (key == null) { //System.out.println("ThreadPool - didn't find a valid key"); @@ -529,12 +587,20 @@ catch (RuntimeException ex) // // If the handler is "readable", try to read a message. // + // NOTE: On Win32 platforms, select may report a channel + // as readable although nothing can be read. We want to + // ignore the event in this case. + // BasicStream stream = new BasicStream(_instance); if (handler.readable()) { try { - read(handler); + if (!read(handler)) // No data available. + { +//System.out.println("ThreadPool - no input"); + continue repeatSelect; + } } catch (Ice.TimeoutException ex) // Expected { @@ -557,7 +623,7 @@ catch (RuntimeException ex) } } - private void + private boolean read(EventHandler handler) { BasicStream stream = handler._stream; @@ -570,7 +636,21 @@ catch (RuntimeException ex) if (stream.pos() != stream.size()) { - handler.read(stream); + // + // On Win32 platforms, the selector may select a channel for + // reading even though no data is available. Therefore, we + // first try to read non-blocking; if we don't get any + // data, we ignore the read event. + // + boolean doRead = handler.tryRead(stream); + if (stream.pos() == 0) + { + return false; + } + if (doRead) + { + handler.read(stream); + } assert(stream.pos() == stream.size()); } @@ -608,6 +688,8 @@ catch (RuntimeException ex) handler.read(stream); assert(stream.pos() == stream.size()); } + + return true; } private static final class HandlerInfo @@ -639,10 +721,12 @@ catch (RuntimeException ex) private Instance _instance; private boolean _destroyed; - private java.nio.channels.Pipe.SourceChannel _fdIntrRead; + private java.nio.channels.ReadableByteChannel _fdIntrRead; private java.nio.channels.SelectionKey _fdIntrReadKey; - private java.nio.channels.Pipe.SinkChannel _fdIntrWrite; + private java.nio.channels.WritableByteChannel _fdIntrWrite; private java.nio.channels.Selector _selector; + private java.util.Set _keys; + private java.util.Iterator _keysIter; private HandlerInfo _adds; private RemoveInfo _removes; private int _handlers; @@ -702,6 +786,7 @@ catch (RuntimeException ex) } } +//System.out.println("ThreadPool - run() terminated - promoting follower"); _pool.promoteFollower(); _pool = null; // Break cyclic dependency. } |