summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java224
1 files changed, 114 insertions, 110 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 2b431a6c333..7460c975679 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -19,7 +19,7 @@ public final class ThreadPool
{
++_servers;
}
- _adds.add(new FdHandlerPair(fd, handler));
+ _adds.add(new HandlerInfo(fd, handler));
setInterrupt();
}
@@ -37,22 +37,12 @@ public final class ThreadPool
_threadMutex.unlock();
}
- public void
+ public synchronized void
initiateServerShutdown()
{
- // Can't use _fdIntrWriteBuf because it's not thread-safe
- final byte[] arr = { 1 };
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(arr);
- try
- {
- _fdIntrWrite.write(buf);
- }
- catch (java.io.IOException ex)
- {
- Ice.SystemException sys = new Ice.SystemException();
- sys.initCause(ex);
- throw sys;
- }
+ assert(!_shutdown);
+ _shutdown = true;
+ setInterrupt();
}
public synchronized void
@@ -80,7 +70,7 @@ public final class ThreadPool
public synchronized void
waitUntilFinished()
{
- while (_handlerMap.size() > 0 && _threadNum > 0)
+ while (_handlers > 0 && _threadNum > 0)
{
try
{
@@ -91,7 +81,7 @@ public final class ThreadPool
}
}
- if (_handlerMap.size() > 0)
+ if (_handlers > 0)
{
_instance.logger().error("can't wait for graceful application " +
"termination in thread pool\n" +
@@ -151,21 +141,15 @@ public final class ThreadPool
{
_instance = instance;
_destroyed = false;
+ _interrupted = false;
+ _shutdown = false;
+ _handlers = 0;
_servers = 0;
_timeout = 0;
try
{
_selector = java.nio.channels.Selector.open();
- _pipe = java.nio.channels.Pipe.open();
- _fdIntrRead = _pipe.source();
- _fdIntrRead.configureBlocking(false);
- _fdIntrWrite = _pipe.sink();
- _fdIntrReadBuf = java.nio.ByteBuffer.allocateDirect(1);
- _fdIntrWriteBuf = java.nio.ByteBuffer.allocateDirect(1);
- _fdIntrRead.register(_selector,
- java.nio.channels.SelectionKey.OP_READ);
- _fdIntrReadKey = _fdIntrRead.keyFor(_selector);
}
catch (java.io.IOException ex)
{
@@ -208,6 +192,7 @@ public final class ThreadPool
}
}
+ _threads = new EventHandlerThread[_threadNum];
for (int i = 0; i < _threadNum; i++)
{
_threads[i] = new EventHandlerThread(this);
@@ -243,21 +228,11 @@ public final class ThreadPool
throws Throwable
{
assert(_destroyed);
- if (_fdIntrWrite != null)
- {
- try
- {
- _fdIntrWrite.close();
- }
- catch (java.io.IOException ex)
- {
- }
- }
- if (_fdIntrRead != null)
+ if (_selector != null)
{
try
{
- _fdIntrRead.close();
+ _selector.close();
}
catch (java.io.IOException ex)
{
@@ -267,7 +242,10 @@ public final class ThreadPool
super.finalize();
}
- private synchronized void
+ //
+ // Called by Instance
+ //
+ synchronized void
destroy()
{
assert(!_destroyed);
@@ -278,44 +256,17 @@ public final class ThreadPool
private boolean
clearInterrupt()
{
- try
- {
- boolean shutdown = false;
- _fdIntrReadBuf.rewind();
- while (_fdIntrRead.read(_fdIntrReadBuf) == 1)
- {
- if (_fdIntrReadBuf.get(0) == 1) // Shutdown initiated?
- {
- shutdown = true;
- }
- _fdIntrReadBuf.rewind();
- }
-
- return shutdown;
- }
- catch (java.io.IOException ex)
- {
- Ice.SystemException sys = new Ice.SystemException();
- sys.initCause(ex);
- throw sys;
- }
+ _interrupted = false;
+ boolean shutdown = _shutdown;
+ _shutdown = false;
+ return shutdown;
}
private void
setInterrupt()
{
- _fdIntrWriteBuf.rewind();
- _fdIntrWriteBuf.put(0, (byte)0);
- try
- {
- _fdIntrWrite.write(_fdIntrReadBuf);
- }
- catch (java.io.IOException ex)
- {
- Ice.SystemException sys = new Ice.SystemException();
- sys.initCause(ex);
- throw sys;
- }
+ _interrupted = true;
+ _selector.wakeup(); // Causes select() to return immediately
}
private void
@@ -376,11 +327,10 @@ public final class ThreadPool
return;
}
- boolean interrupt = false;
- if (_fdIntrReadKey.isReadable())
+ boolean interrupt = _interrupted;
+ if (interrupt)
{
shutdown = clearInterrupt();
- interrupt = true;
}
if (!_adds.isEmpty())
@@ -391,14 +341,15 @@ public final class ThreadPool
java.util.ListIterator p = _adds.listIterator();
while (p.hasNext())
{
- FdHandlerPair pair = (FdHandlerPair)p.next();
- _reapList.addFirst(pair.fd);
+ HandlerInfo info = (HandlerInfo)p.next();
+ addHandler(info);
+ _handlers++;
try
{
- pair.fd.register(
+ info.fd.register(
_selector,
java.nio.channels.SelectionKey.OP_READ,
- pair.handler);
+ info);
}
catch (java.io.ClosedChannelException ex)
{
@@ -419,19 +370,20 @@ public final class ThreadPool
java.nio.channels.SelectionKey key =
(java.nio.channels.SelectionKey)p.next();
key.cancel();
- handler = (EventHandler)key.attachment();
- assert(handler != null);
- handler.finished();
- if (handler.server())
+ HandlerInfo info = (HandlerInfo)key.attachment();
+ assert(info != null);
+ info.handler.finished();
+ if (info.handler.server())
{
--_servers;
}
- _reapList.remove(key.channel());
+
+ _handlers--;
+ removeHandler(info);
}
_removes.clear();
- // TODO
- if (_handlerMap.empty() || _servers == 0)
+ if (_handlers == 0 || _servers == 0)
{
notifyAll(); // For waitUntil...Finished() methods.
}
@@ -446,30 +398,45 @@ public final class ThreadPool
// Check if there are connections to reap.
//
reap = false;
- // TODO
- if (_maxConnections > 0 &&
- _handlerMap.size() > _maxConnections)
+ if (_maxConnections > 0 && _handlers > _maxConnections)
{
- // TODO
+ HandlerInfo info = _reapListEnd;
+ while (info != null)
+ {
+ if (!info.reaped)
+ {
+ info.reaped = true;
+ handler = info.handler;
+ reap = true;
+ break;
+ }
+ info = info.prev;
+ }
}
if (!reap)
{
java.util.Set keys = _selector.selectedKeys();
java.util.Iterator i = keys.iterator();
- while (handler == null && i.hasNext())
+ assert(i.hasNext());
+ java.nio.channels.SelectionKey key =
+ (java.nio.channels.SelectionKey)i.next();
+ HandlerInfo info = (HandlerInfo)key.attachment();
+ i.remove();
+ assert(info != null);
+
+ //
+ // Make the fd for the handler the most recently used
+ // one by moving it to the beginning of the the reap
+ // list.
+ //
+ if (info != _reapList)
{
- java.nio.channels.SelectionKey key =
- (java.nio.channels.SelectionKey)i.next();
- if (key != _fdIntrReadKey)
- {
- handler = (EventHandler)key.attachment();
- }
- i.remove();
+ removeHandler(info);
+ addHandler(info);
}
- assert(handler != null);
- // TODO: Update _reapList
+ handler = info.handler;
}
}
@@ -530,13 +497,52 @@ public final class ThreadPool
// TODO
}
- private static final class FdHandlerPair
+ private void
+ addHandler(HandlerInfo info)
+ {
+ info.next = _reapList;
+ info.prev = null;
+ if (_reapList != null)
+ {
+ _reapList.prev = info;
+ }
+ else
+ {
+ _reapListEnd = info;
+ }
+ _reapList = info;
+ }
+
+ private void
+ removeHandler(HandlerInfo info)
+ {
+ //
+ // Remove from _reapList
+ //
+ if (info.prev == null)
+ {
+ _reapList = info.next;
+ }
+ else
+ {
+ info.prev.next = info.next;
+ }
+ if (info.next == null)
+ {
+ _reapListEnd = info.prev;
+ }
+ }
+
+ private static final class HandlerInfo
{
java.nio.channels.SelectableChannel fd;
EventHandler handler;
+ HandlerInfo prev;
+ HandlerInfo next;
+ boolean reaped;
- FdHandlerPair(java.nio.channels.SelectableChannel fd,
- EventHandler handler)
+ HandlerInfo(java.nio.channels.SelectableChannel fd,
+ EventHandler handler)
{
this.fd = fd;
this.handler = handler;
@@ -546,15 +552,13 @@ public final class ThreadPool
private Instance _instance;
private boolean _destroyed;
private java.nio.channels.Selector _selector;
- private java.nio.channels.Pipe _pipe;
- private java.nio.channels.Pipe.SourceChannel _fdIntrRead;
- private java.nio.channels.Pipe.SinkChannel _fdIntrWrite;
- private java.nio.ByteBuffer _fdIntrReadBuf;
- private java.nio.ByteBuffer _fdIntrWriteBuf;
- private java.nio.channels.SelectionKey _fdIntrReadKey;
+ private boolean _interrupted;
+ private boolean _shutdown;
private java.util.LinkedList _adds = new java.util.LinkedList();
private java.util.LinkedList _removes = new java.util.LinkedList();
- private java.util.LinkedList _reapList = new java.util.LinkedList();
+ private int _handlers;
+ private HandlerInfo _reapList = null;
+ private HandlerInfo _reapListEnd = null;
private int _servers;
private int _timeout;
private RecursiveMutex _threadMutex = new RecursiveMutex();