summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/src/IceInternal/ThreadPool.java177
1 files changed, 131 insertions, 46 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index b08b4bbeaea..724c261e23f 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -37,12 +37,14 @@ public final class ThreadPool
public void
promoteFollower()
{
+//System.out.println("ThreadPool - promote follower - lock count = " + _threadMutex.count());
_threadMutex.unlock();
}
public synchronized void
initiateServerShutdown()
{
+//System.out.println("ThreadPool - initiate server shutdown");
java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
buf.put(0, (byte)1);
try
@@ -154,24 +156,15 @@ public final class ThreadPool
_servers = 0;
_timeout = 0;
- try
- {
- java.nio.channels.Pipe pipe = java.nio.channels.Pipe.open();
- _fdIntrRead = pipe.source();
- _fdIntrWrite = pipe.sink();
- _fdIntrRead.configureBlocking(false);
- }
- catch (java.io.IOException ex)
- {
- Ice.SystemException sys = new Ice.SystemException();
- sys.initCause(ex);
- throw sys;
- }
+ Network.SocketPair pair = Network.createPipe();
+ _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
+ _fdIntrWrite = pair.sink;
try
{
_selector = java.nio.channels.Selector.open();
- _fdIntrReadKey = _fdIntrRead.register(_selector, java.nio.channels.SelectionKey.OP_READ);
+ pair.source.configureBlocking(false);
+ _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ);
}
catch (java.io.IOException ex)
{
@@ -180,6 +173,23 @@ public final class ThreadPool
throw sys;
}
+ //
+ // The Selector holds a Set representing the selected keys. The
+ // Set reference doesn't change, so we obtain it once here.
+ //
+ _keys = _selector.selectedKeys();
+
+ //
+ // On Win32 platforms, once a key has been selected, it will not
+ // be reported again until the channel associated with that key
+ // has been processed. This means that we must process all of
+ // the keys in the selected-key set before calling select again.
+ // If the iterator _keysIter is null, it indicates that select needs
+ // to be called. Otherwise, we need to remove the next active key
+ // (if any) from the iterator.
+ //
+ _keysIter = null;
+
String value = _instance.properties().getProperty("Ice.ServerIdleTime");
if (value != null)
{
@@ -288,6 +298,7 @@ public final class ThreadPool
synchronized void
destroy()
{
+//System.out.println("ThreadPool - destroy");
assert(!_destroyed);
_destroyed = true;
setInterrupt();
@@ -296,8 +307,8 @@ public final class ThreadPool
private boolean
clearInterrupt()
{
+//System.out.println("clearInterrupt");
/*
-System.out.println("clearInterrupt");
try
{
throw new RuntimeException();
@@ -319,7 +330,7 @@ catch (RuntimeException ex)
break;
}
-//System.out.println(" got byte " + (int)buf.get(0));
+//System.out.println(" clearInterrupt - got byte " + (int)buf.get(0));
if (buf.get(0) == (byte)1) // Shutdown initiated?
{
shutdown = true;
@@ -336,8 +347,8 @@ catch (RuntimeException ex)
private void
setInterrupt()
{
+//System.out.println("setInterrupt");
/*
-System.out.println("setInterrupt");
try
{
throw new RuntimeException();
@@ -346,7 +357,7 @@ catch (RuntimeException ex)
{
ex.printStackTrace();
}
-*/
+//*/
java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
buf.put(0, (byte)0);
try
@@ -364,11 +375,21 @@ catch (RuntimeException ex)
{
boolean shutdown = false;
final int timeoutMillis = _timeout * 1000;
- java.util.Set keys = _selector.selectedKeys();
+
+ //
+ // On Win32 platforms, select() occasionally returns 0 when it
+ // is supposed to block indefinitely. As a workaround, we only
+ // treat this occurrence as a timeout if we have a timeout value,
+ // and if the proper amount of time has elapsed. This can be a
+ // local variable because a timeout would not be split over
+ // multiple threads.
+ //
+ long nextTimeout = 0;
while (true)
{
_threadMutex.lock();
+//System.out.println("ThreadPool - thread " + Thread.currentThread() + " has the lock");
repeatSelect:
@@ -381,30 +402,59 @@ catch (RuntimeException ex)
_instance.objectAdapterFactory().shutdown();
}
- keys.clear();
- int ret = 0;
- try
- {
- ret = _selector.select(timeoutMillis);
- }
- catch (java.io.InterruptedIOException ex)
+ if (_keysIter == null) // Need to select.
{
- continue repeatSelect;
+ int ret = 0;
+ try
+ {
+//System.out.println("ThreadPool - selecting on " + _selector.keys().size() + " keys, thread id = " + Thread.currentThread());
+ if (timeoutMillis > 0 && nextTimeout == 0)
+ {
+ nextTimeout = System.currentTimeMillis() + timeoutMillis;
+ }
+
+ ret = _selector.select(timeoutMillis);
+ }
+ catch (java.io.InterruptedIOException ex)
+ {
+ continue repeatSelect;
+ }
+ catch (java.io.IOException ex)
+ {
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
+ }
+
+//System.out.println("ThreadPool - select() returned " + ret + ", _keys.size() = " + _keys.size());
+ if (ret == 0) // Potential timeout.
+ {
+ if (_timeout > 0)
+ {
+ long now = System.currentTimeMillis();
+ if (now >= nextTimeout) // Timeout.
+ {
+//System.out.println("ThreadPool - timeout");
+ _timeout = 0;
+ shutdown = true;
+ nextTimeout = 0;
+ }
+ }
+//System.out.println("ThreadPool - timeout workaround");
+ continue repeatSelect;
+ }
+
+ nextTimeout = 0;
}
- catch (java.io.IOException ex)
+ else
{
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
+//System.out.println("ThreadPool - still have keys");
}
-//System.out.println("Select() returned " + ret);
- if (ret == 0) // Timeout.
+ if (_keysIter == null)
{
- assert(_timeout > 0);
- _timeout = 0;
- shutdown = true;
- continue repeatSelect;
+//System.out.println("ThreadPool - initializing _keysIter");
+ _keysIter = _keys.iterator();
}
EventHandler handler = null;
@@ -413,7 +463,7 @@ catch (RuntimeException ex)
{
if (_destroyed)
{
-//System.out.println("ThreadPool - destroyed");
+//System.out.println("ThreadPool - destroyed, thread id = " + Thread.currentThread());
//
// Don't clear the interrupt fd if destroyed, so that
// the other threads exit as well.
@@ -477,6 +527,7 @@ catch (RuntimeException ex)
}
_handlers--;
info = info.next;
+//System.out.println("ThreadPool - _handlers = " + _handlers + ", _servers = " + _servers);
}
_removes = null;
@@ -494,20 +545,27 @@ catch (RuntimeException ex)
}
java.nio.channels.SelectionKey key = null;
- java.util.Iterator i = keys.iterator();
- while (i.hasNext())
+ while (_keysIter.hasNext())
{
//
// Ignore selection keys that have been cancelled
//
- java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)i.next();
+ java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)_keysIter.next();
+ _keysIter.remove();
if (k.isValid())
{
+//System.out.println("ThreadPool - found a key");
key = k;
break;
}
}
+ if (!_keysIter.hasNext())
+ {
+ _keysIter = null;
+//System.out.println("ThreadPool - reset iterator");
+ }
+
if (key == null)
{
//System.out.println("ThreadPool - didn't find a valid key");
@@ -529,12 +587,20 @@ catch (RuntimeException ex)
//
// If the handler is "readable", try to read a message.
//
+ // NOTE: On Win32 platforms, select may report a channel
+ // as readable although nothing can be read. We want to
+ // ignore the event in this case.
+ //
BasicStream stream = new BasicStream(_instance);
if (handler.readable())
{
try
{
- read(handler);
+ if (!read(handler)) // No data available.
+ {
+//System.out.println("ThreadPool - no input");
+ continue repeatSelect;
+ }
}
catch (Ice.TimeoutException ex) // Expected
{
@@ -557,7 +623,7 @@ catch (RuntimeException ex)
}
}
- private void
+ private boolean
read(EventHandler handler)
{
BasicStream stream = handler._stream;
@@ -570,7 +636,21 @@ catch (RuntimeException ex)
if (stream.pos() != stream.size())
{
- handler.read(stream);
+ //
+ // On Win32 platforms, the selector may select a channel for
+ // reading even though no data is available. Therefore, we
+ // first try to read non-blocking; if we don't get any
+ // data, we ignore the read event.
+ //
+ boolean doRead = handler.tryRead(stream);
+ if (stream.pos() == 0)
+ {
+ return false;
+ }
+ if (doRead)
+ {
+ handler.read(stream);
+ }
assert(stream.pos() == stream.size());
}
@@ -608,6 +688,8 @@ catch (RuntimeException ex)
handler.read(stream);
assert(stream.pos() == stream.size());
}
+
+ return true;
}
private static final class HandlerInfo
@@ -639,10 +721,12 @@ catch (RuntimeException ex)
private Instance _instance;
private boolean _destroyed;
- private java.nio.channels.Pipe.SourceChannel _fdIntrRead;
+ private java.nio.channels.ReadableByteChannel _fdIntrRead;
private java.nio.channels.SelectionKey _fdIntrReadKey;
- private java.nio.channels.Pipe.SinkChannel _fdIntrWrite;
+ private java.nio.channels.WritableByteChannel _fdIntrWrite;
private java.nio.channels.Selector _selector;
+ private java.util.Set _keys;
+ private java.util.Iterator _keysIter;
private HandlerInfo _adds;
private RemoveInfo _removes;
private int _handlers;
@@ -702,6 +786,7 @@ catch (RuntimeException ex)
}
}
+//System.out.println("ThreadPool - run() terminated - promoting follower");
_pool.promoteFollower();
_pool = null; // Break cyclic dependency.
}