summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java231
1 files changed, 164 insertions, 67 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index ae1398e1a78..b08b4bbeaea 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -28,16 +28,10 @@ public final class ThreadPool
public synchronized void
unregister(java.nio.channels.SelectableChannel fd, boolean callFinished)
{
- java.nio.channels.SelectionKey key = fd.keyFor(_selector);
- if (key != null)
- {
- HandlerInfo info = (HandlerInfo)key.attachment();
- assert(info != null);
- info.callFinished = callFinished;
- info.next = _removes;
- _removes = info;
- setInterrupt();
- }
+ RemoveInfo info = new RemoveInfo(fd, callFinished);
+ info.next = _removes;
+ _removes = info;
+ setInterrupt();
}
public void
@@ -49,9 +43,16 @@ public final class ThreadPool
public synchronized void
initiateServerShutdown()
{
- assert(!_shutdown);
- _shutdown = true;
- setInterrupt(); // TODO: just use wakeup?
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
+ buf.put(0, (byte)1);
+ try
+ {
+ int n = _fdIntrWrite.write(buf);
+ assert(n == 1);
+ }
+ catch (java.io.IOException ex)
+ {
+ }
}
public synchronized void
@@ -70,8 +71,7 @@ public final class ThreadPool
if (_servers != 0)
{
- _instance.logger().error("can't wait for graceful server " +
- "termination in thread pool\n" +
+ _instance.logger().error("can't wait for graceful server termination in thread pool\n" +
"since all threads have vanished");
}
}
@@ -92,8 +92,7 @@ public final class ThreadPool
if (_handlers != 0)
{
- _instance.logger().error("can't wait for graceful application " +
- "termination in thread pool\n" +
+ _instance.logger().error("can't wait for graceful application termination in thread pool\n" +
"since all threads have vanished");
}
}
@@ -149,8 +148,6 @@ public final class ThreadPool
{
_instance = instance;
_destroyed = false;
- _interrupted = false;
- _shutdown = false;
_adds = null;
_removes = null;
_handlers = 0;
@@ -159,7 +156,22 @@ public final class ThreadPool
try
{
+ java.nio.channels.Pipe pipe = java.nio.channels.Pipe.open();
+ _fdIntrRead = pipe.source();
+ _fdIntrWrite = pipe.sink();
+ _fdIntrRead.configureBlocking(false);
+ }
+ catch (java.io.IOException ex)
+ {
+ Ice.SystemException sys = new Ice.SystemException();
+ sys.initCause(ex);
+ throw sys;
+ }
+
+ try
+ {
_selector = java.nio.channels.Selector.open();
+ _fdIntrReadKey = _fdIntrRead.register(_selector, java.nio.channels.SelectionKey.OP_READ);
}
catch (java.io.IOException ex)
{
@@ -168,8 +180,7 @@ public final class ThreadPool
throw sys;
}
- String value =
- _instance.properties().getProperty("Ice.ServerIdleTime");
+ String value = _instance.properties().getProperty("Ice.ServerIdleTime");
if (value != null)
{
try
@@ -217,8 +228,7 @@ public final class ThreadPool
// Must be called after _threadNum is set
int maxConnections = 0;
- value = _instance.properties().getProperty(
- "Ice.ThreadPool.MaxConnections");
+ value = _instance.properties().getProperty("Ice.ThreadPool.MaxConnections");
if (value != null)
{
try
@@ -248,6 +258,26 @@ public final class ThreadPool
{
}
}
+ if (_fdIntrWrite != null)
+ {
+ try
+ {
+ _fdIntrWrite.close();
+ }
+ catch (java.io.IOException ex)
+ {
+ }
+ }
+ if (_fdIntrRead != null)
+ {
+ try
+ {
+ _fdIntrRead.close();
+ }
+ catch (java.io.IOException ex)
+ {
+ }
+ }
super.finalize();
}
@@ -266,17 +296,67 @@ public final class ThreadPool
private boolean
clearInterrupt()
{
- _interrupted = false;
- boolean shutdown = _shutdown;
- _shutdown = false;
+/*
+System.out.println("clearInterrupt");
+try
+{
+ throw new RuntimeException();
+}
+catch (RuntimeException ex)
+{
+ ex.printStackTrace();
+}
+*/
+ boolean shutdown = false;
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
+ try
+ {
+ while (true)
+ {
+ buf.rewind();
+ if (_fdIntrRead.read(buf) != 1)
+ {
+ break;
+ }
+
+//System.out.println(" got byte " + (int)buf.get(0));
+ if (buf.get(0) == (byte)1) // Shutdown initiated?
+ {
+ shutdown = true;
+ }
+ }
+ }
+ catch (java.io.IOException ex)
+ {
+ }
+
return shutdown;
}
private void
setInterrupt()
{
- _interrupted = true;
- _selector.wakeup(); // Causes select() to return immediately
+/*
+System.out.println("setInterrupt");
+try
+{
+ throw new RuntimeException();
+}
+catch (RuntimeException ex)
+{
+ ex.printStackTrace();
+}
+*/
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
+ buf.put(0, (byte)0);
+ try
+ {
+ int n = _fdIntrWrite.write(buf);
+ assert(n == 1);
+ }
+ catch (java.io.IOException ex)
+ {
+ }
}
private void
@@ -284,6 +364,7 @@ public final class ThreadPool
{
boolean shutdown = false;
final int timeoutMillis = _timeout * 1000;
+ java.util.Set keys = _selector.selectedKeys();
while (true)
{
@@ -295,11 +376,12 @@ public final class ThreadPool
{
if (shutdown) // Shutdown has been initiated.
{
+//System.out.println("ThreadPool - shutdown");
shutdown = false;
_instance.objectAdapterFactory().shutdown();
}
- _selector.selectedKeys().clear();
+ keys.clear();
int ret = 0;
try
{
@@ -316,7 +398,8 @@ public final class ThreadPool
throw se;
}
- if (ret == 0 && !_interrupted) // Timeout.
+//System.out.println("Select() returned " + ret);
+ if (ret == 0) // Timeout.
{
assert(_timeout > 0);
_timeout = 0;
@@ -330,11 +413,11 @@ public final class ThreadPool
{
if (_destroyed)
{
+//System.out.println("ThreadPool - destroyed");
//
// Don't clear the interrupt fd if destroyed, so that
// the other threads exit as well.
//
- _selector.wakeup();
return;
}
@@ -347,8 +430,7 @@ public final class ThreadPool
while (info != null)
{
int op;
- if ((info.fd.validOps() &
- java.nio.channels.SelectionKey.OP_READ) > 0)
+ if ((info.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
{
op = java.nio.channels.SelectionKey.OP_READ;
}
@@ -360,8 +442,7 @@ public final class ThreadPool
_handlers++;
try
{
- info.key =
- info.fd.register(_selector, op, info);
+ info.key = info.fd.register(_selector, op, info);
}
catch (java.nio.channels.ClosedChannelException ex)
{
@@ -379,15 +460,18 @@ public final class ThreadPool
//
// Handlers are permanently removed.
//
- HandlerInfo info = _removes;
+ RemoveInfo info = _removes;
while (info != null)
{
- info.key.cancel();
- if (info.callFinished)
+ java.nio.channels.SelectionKey key = info.fd.keyFor(_selector);
+ assert(key != null);
+ HandlerInfo hinfo = (HandlerInfo)key.attachment();
+ key.cancel();
+ if (info.callFinished) // Call finished() on the handler?
{
- info.handler.finished();
+ hinfo.handler.finished();
}
- if (info.handler.server())
+ if (hinfo.handler.server())
{
--_servers;
}
@@ -409,36 +493,37 @@ public final class ThreadPool
continue repeatSelect;
}
- java.util.Set keys = _selector.selectedKeys();
- if (keys.size() == 0)
- {
- shutdown = clearInterrupt();
- continue repeatSelect;
- }
-
+ java.nio.channels.SelectionKey key = null;
java.util.Iterator i = keys.iterator();
while (i.hasNext())
{
- java.nio.channels.SelectionKey key =
- (java.nio.channels.SelectionKey)i.next();
//
- // Ignore selection keys that have been
- // cancelled
+ // Ignore selection keys that have been cancelled
//
- if (key.isValid())
+ java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)i.next();
+ if (k.isValid())
{
- HandlerInfo info =
- (HandlerInfo)key.attachment();
- assert(info != null);
- handler = info.handler;
+ key = k;
break;
}
}
- if (handler == null)
+ if (key == null)
{
+//System.out.println("ThreadPool - didn't find a valid key");
+ continue repeatSelect;
+ }
+
+ if (key.channel() == _fdIntrRead)
+ {
+//System.out.println("ThreadPool - input ready on the interrupt pipe");
+ shutdown = clearInterrupt();
continue repeatSelect;
}
+
+ HandlerInfo info = (HandlerInfo)key.attachment();
+ assert(info != null);
+ handler = info.handler;
}
//
@@ -486,10 +571,11 @@ public final class ThreadPool
if (stream.pos() != stream.size())
{
handler.read(stream);
- assert(stream.pos() != stream.size());
+ assert(stream.pos() == stream.size());
}
int pos = stream.pos();
+ assert(pos >= Protocol.headerSize);
stream.pos(0);
byte protVer = stream.readByte();
if (protVer != Protocol.protocolVersion)
@@ -520,7 +606,7 @@ public final class ThreadPool
if (stream.pos() != stream.size())
{
handler.read(stream);
- assert(stream.pos() != stream.size());
+ assert(stream.pos() == stream.size());
}
}
@@ -530,23 +616,35 @@ public final class ThreadPool
EventHandler handler;
java.nio.channels.SelectionKey key;
HandlerInfo next;
- boolean callFinished;
- HandlerInfo(java.nio.channels.SelectableChannel fd,
- EventHandler handler)
+ HandlerInfo(java.nio.channels.SelectableChannel fd, EventHandler handler)
{
this.fd = fd;
this.handler = handler;
}
}
+ private static final class RemoveInfo
+ {
+ java.nio.channels.SelectableChannel fd;
+ boolean callFinished;
+ RemoveInfo next;
+
+ RemoveInfo(java.nio.channels.SelectableChannel fd, boolean callFinished)
+ {
+ this.fd = fd;
+ this.callFinished = callFinished;
+ }
+ }
+
private Instance _instance;
private boolean _destroyed;
+ private java.nio.channels.Pipe.SourceChannel _fdIntrRead;
+ private java.nio.channels.SelectionKey _fdIntrReadKey;
+ private java.nio.channels.Pipe.SinkChannel _fdIntrWrite;
private java.nio.channels.Selector _selector;
- private boolean _interrupted;
- private boolean _shutdown;
private HandlerInfo _adds;
- private HandlerInfo _removes;
+ private RemoveInfo _removes;
private int _handlers;
private int _servers;
private int _timeout;
@@ -581,8 +679,7 @@ public final class ThreadPool
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
ex.printStackTrace(pw);
pw.flush();
- String s = "unknown exception in thread pool:\n" +
- sw.toString();
+ String s = "unknown exception in thread pool:\n" + sw.toString();
_pool._instance.logger().error(s);
}