summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
authorMark Spruiell <mes@zeroc.com>2002-04-16 23:02:05 +0000
committerMark Spruiell <mes@zeroc.com>2002-04-16 23:02:05 +0000
commitf735a39d64deba47b416e41442d1321f9c972d8c (patch)
tree12a7b8ed7c594f13ee511cec9e409669e7ad90fd /java/src/IceInternal/ThreadPool.java
parentWin32 fixes for IceBox (diff)
downloadice-f735a39d64deba47b416e41442d1321f9c972d8c.tar.bz2
ice-f735a39d64deba47b416e41442d1321f9c972d8c.tar.xz
ice-f735a39d64deba47b416e41442d1321f9c972d8c.zip
align with C++ - fixes for the thread pool
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java258
1 files changed, 143 insertions, 115 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 24e6043f586..c73731bccc4 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -19,19 +19,39 @@ public final class ThreadPool
{
++_servers;
}
- HandlerInfo info = new HandlerInfo(fd, handler);
- info.next = _adds;
- _adds = info;
- setInterrupt();
+ else
+ {
+ ++_clients;
+ }
+ _changes.add(new FdHandlerPair(fd, handler));
+ setInterrupt(0);
+ }
+
+ public synchronized void
+ unregister(java.nio.channels.SelectableChannel fd)
+ {
+ _changes.add(new FdHandlerPair(fd, null));
+ setInterrupt(0);
}
public synchronized void
- unregister(java.nio.channels.SelectableChannel fd, boolean callFinished)
+ serverIsNowClient()
{
- RemoveInfo info = new RemoveInfo(fd, callFinished);
- info.next = _removes;
- _removes = info;
- setInterrupt();
+ ++_clients;
+ assert(_servers > 0);
+ --_servers;
+ if (_servers == 0)
+ {
+ notifyAll(); // For waitUntil...Finished() methods.
+ }
+ }
+
+ public synchronized void
+ clientIsNowServer()
+ {
+ ++_servers;
+ assert(_clients > 0);
+ --_clients;
}
public void
@@ -41,26 +61,17 @@ public final class ThreadPool
_threadMutex.unlock();
}
- public synchronized void
+ public void
initiateServerShutdown()
{
//System.out.println("ThreadPool - initiate server shutdown");
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
- buf.put(0, (byte)1);
- try
- {
- int n = _fdIntrWrite.write(buf);
- assert(n == 1);
- }
- catch (java.io.IOException ex)
- {
- }
+ setInterrupt(1);
}
public synchronized void
waitUntilServerFinished()
{
- while (_servers != 0 && _threadNum != 0)
+ while (_clients + _servers != 0 && _threadNum != 0)
{
try
{
@@ -92,11 +103,15 @@ public final class ThreadPool
}
}
- if (_handlers != 0)
+ if (_clients + _servers != 0)
{
_logger.error("can't wait for graceful application termination in thread pool\n" +
"since all threads have vanished");
}
+ else
+ {
+ assert(_handlers == 0);
+ }
}
public void
@@ -152,9 +167,8 @@ public final class ThreadPool
_logger = _instance.logger();
_properties = _instance.properties();
_destroyed = false;
- _adds = null;
- _removes = null;
_handlers = 0;
+ _clients = 0;
_servers = 0;
_timeout = 0;
@@ -285,7 +299,7 @@ public final class ThreadPool
//System.out.println("ThreadPool - destroy");
assert(!_destroyed);
_destroyed = true;
- setInterrupt();
+ setInterrupt(0);
}
private boolean
@@ -302,7 +316,8 @@ catch (RuntimeException ex)
ex.printStackTrace();
}
*/
- boolean shutdown = false;
+ byte b = 0;
+
java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
try
{
@@ -315,21 +330,22 @@ catch (RuntimeException ex)
}
//System.out.println(" clearInterrupt - got byte " + (int)buf.get(0));
- if (buf.get(0) == (byte)1) // Shutdown initiated?
- {
- shutdown = true;
- }
+ b = buf.get(0);
+ break;
}
}
catch (java.io.IOException ex)
{
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
}
- return shutdown;
+ return b == (byte)1; // Return true if shutdown has been initiated.
}
private void
- setInterrupt()
+ setInterrupt(int b)
{
//System.out.println("setInterrupt");
/*
@@ -343,14 +359,19 @@ catch (RuntimeException ex)
}
//*/
java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
- buf.put(0, (byte)0);
- try
- {
- int n = _fdIntrWrite.write(buf);
- assert(n == 1);
- }
- catch (java.io.IOException ex)
+ buf.put(0, (byte)b);
+ while (buf.hasRemaining())
{
+ try
+ {
+ _fdIntrWrite.write(buf);
+ }
+ catch (java.io.IOException ex)
+ {
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
+ }
}
}
@@ -461,31 +482,66 @@ catch (RuntimeException ex)
}
EventHandler handler = null;
- RemoveInfo remove = null;
+ boolean finished = false;
synchronized (this)
{
- if (_destroyed)
+ if (_keys.remove(_fdIntrReadKey) && _fdIntrReadKey.isReadable())
{
+ if (!_keys.isEmpty())
+ {
+ _keysIter = _keys.iterator();
+ }
+ else
+ {
+ _keysIter = null;
+ }
+
+ //
+ // There are three possibilities for an interrupt:
+ //
+ // - The thread pool has been destroyed.
+ //
+ // - Server shutdown has been initiated.
+ //
+ // - An event handler was registered or unregistered.
+ //
+
+ //
+ // Thread pool destroyed?
+ //
+ if (_destroyed)
+ {
//System.out.println("ThreadPool - destroyed, thread id = " + Thread.currentThread());
+ //
+ // Don't clear the interrupt fd if destroyed, so that
+ // the other threads exit as well.
+ //
+ return;
+ }
+
+ shutdown = clearInterrupt();
+
//
- // Don't clear the interrupt fd if destroyed, so that
- // the other threads exit as well.
+ // Server shutdown?
//
- return;
- }
+ if (shutdown)
+ {
+ continue repeatSelect;
+ }
- if (_adds != null)
- {
//
- // New handlers have been added.
+ // An event handler must have been registered or
+ // unregistered.
//
- HandlerInfo info = _adds;
- while (info != null)
+ assert(!_changes.isEmpty());
+ FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
+
+ if (change.handler != null) // Addition if handler is set.
{
-//System.out.println("ThreadPool - adding fd " + info.fd);
+//System.out.println("ThreadPool - adding handler");
int op;
- if ((info.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
+ if ((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
{
op = java.nio.channels.SelectionKey.OP_READ;
}
@@ -497,37 +553,29 @@ catch (RuntimeException ex)
_handlers++;
try
{
- info.key = info.fd.register(_selector, op, info);
+ change.fd.register(_selector, op, change.handler);
}
catch (java.nio.channels.ClosedChannelException ex)
{
assert(false);
}
- HandlerInfo next = info.next;
- info.next = null;
- info = next;
+ continue repeatSelect;
+ }
+ else // Removal if handler is not set.
+ {
+//System.out.println("ThreadPool - removing handler");
+ java.nio.channels.SelectionKey key = change.fd.keyFor(_selector);
+ assert(key != null);
+ handler = (EventHandler)key.attachment();
+ finished = true;
+ --_handlers;
+ key.cancel();
+ // Don't goto repeatSelect; we have to call
+ // finished() on the event handler below, outside
+ // the thread synchronization.
}
- _adds = null;
- }
-
- if (_removes != null)
- {
- //
- // Handlers are permanently removed.
- //
- remove = _removes;
- _removes = _removes.next;
- java.nio.channels.SelectionKey key = remove.fd.keyFor(_selector);
- assert(key != null);
- HandlerInfo hinfo = (HandlerInfo)key.attachment();
- key.cancel();
- handler = hinfo.handler;
-//System.out.println("ThreadPool - remove fd = " + remove.fd);
-//if (_removes != null)
-// System.out.println("ThreadPool - more fds to be removed");
}
-
- if (handler == null)
+ else
{
java.nio.channels.SelectionKey key = null;
while (_keysIter.hasNext())
@@ -537,7 +585,7 @@ catch (RuntimeException ex)
//
java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)_keysIter.next();
_keysIter.remove();
- if (k.isValid())
+ if (k.isValid() && key != _fdIntrReadKey)
{
//System.out.println("ThreadPool - found a key");
key = k;
@@ -557,41 +605,35 @@ catch (RuntimeException ex)
continue repeatSelect;
}
- if (key.channel() == _fdIntrRead)
- {
-//System.out.println("ThreadPool - input ready on the interrupt pipe");
- shutdown = clearInterrupt();
- continue repeatSelect;
- }
-
- HandlerInfo info = (HandlerInfo)key.attachment();
- assert(info != null);
- handler = info.handler;
+ handler = (EventHandler)key.attachment();
}
}
assert(handler != null);
- if (remove != null)
+ if (finished)
{
//
- // Call finished() on the handler if necessary.
+ // Notify a handler about it's removal from the thread
+ // pool.
//
- if (remove.callFinished)
- {
- handler.finished();
- handler._stream.destroy();
- }
+ handler.finished();
+ handler._stream.destroy();
synchronized (this)
{
- _handlers--;
if (handler.server())
{
+ assert(_servers > 0);
--_servers;
}
+ else
+ {
+ assert(_clients > 0);
+ --_clients;
+ }
//System.out.println("ThreadPool - _handlers = " + _handlers + ", _servers = " + _servers);
- if (_handlers == 0 || _servers == 0)
+ if (_clients == 0 || _servers == 0)
{
notifyAll(); // For waitUntil...Finished() methods.
}
@@ -624,6 +666,7 @@ catch (RuntimeException ex)
}
catch (Ice.LocalException ex)
{
+//System.out.println("ThreadPool - informing handler about exception " + ex);
handler.exception(ex);
continue repeatSelect;
}
@@ -714,33 +757,18 @@ catch (RuntimeException ex)
return true;
}
- private static final class HandlerInfo
+ private static final class FdHandlerPair
{
java.nio.channels.SelectableChannel fd;
EventHandler handler;
- java.nio.channels.SelectionKey key;
- HandlerInfo next;
- HandlerInfo(java.nio.channels.SelectableChannel fd, EventHandler handler)
+ FdHandlerPair(java.nio.channels.SelectableChannel fd, EventHandler handler)
{
this.fd = fd;
this.handler = handler;
}
}
- private static final class RemoveInfo
- {
- java.nio.channels.SelectableChannel fd;
- boolean callFinished;
- RemoveInfo next;
-
- RemoveInfo(java.nio.channels.SelectableChannel fd, boolean callFinished)
- {
- this.fd = fd;
- this.callFinished = callFinished;
- }
- }
-
private Instance _instance;
private Ice.Logger _logger;
private Ice.Properties _properties;
@@ -751,9 +779,9 @@ catch (RuntimeException ex)
private java.nio.channels.Selector _selector;
private java.util.Set _keys;
private java.util.Iterator _keysIter;
- private HandlerInfo _adds;
- private RemoveInfo _removes;
+ private java.util.LinkedList _changes = new java.util.LinkedList();
private int _handlers;
+ private int _clients;
private int _servers;
private int _timeout;
private RecursiveMutex _threadMutex = new RecursiveMutex();