summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
authorMark Spruiell <mes@zeroc.com>2002-05-02 03:27:07 +0000
committerMark Spruiell <mes@zeroc.com>2002-05-02 03:27:07 +0000
commit5f342d668a30647b7d7182b4b296e65f7cfc2b07 (patch)
tree53f13aa7e79a77da4ca6133cf170812d3995d8b2 /java/src/IceInternal/ThreadPool.java
parentadding assertions (diff)
downloadice-5f342d668a30647b7d7182b4b296e65f7cfc2b07.tar.bz2
ice-5f342d668a30647b7d7182b4b296e65f7cfc2b07.tar.xz
ice-5f342d668a30647b7d7182b4b296e65f7cfc2b07.zip
align with C++ changes for thread pool, properties, plug-ins
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java156
1 files changed, 40 insertions, 116 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 13a7a646930..94d71ada0f6 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -15,14 +15,7 @@ public final class ThreadPool
public synchronized void
_register(java.nio.channels.SelectableChannel fd, EventHandler handler)
{
- if (handler.server())
- {
- ++_servers;
- }
- else
- {
- ++_clients;
- }
+ ++_handlers;
_changes.add(new FdHandlerPair(fd, handler));
setInterrupt(0);
}
@@ -34,62 +27,24 @@ public final class ThreadPool
setInterrupt(0);
}
- public synchronized void
- serverIsNowClient()
- {
- ++_clients;
- assert(_servers > 0);
- --_servers;
- if (_servers == 0)
- {
- notifyAll(); // For waitUntil...Finished() methods.
- }
- }
-
- public synchronized void
- clientIsNowServer()
- {
- ++_servers;
- assert(_clients > 0);
- --_clients;
- }
-
public void
promoteFollower()
{
+ if (_multipleThreads)
+ {
//System.out.println("ThreadPool - promote follower - lock count = " + _threadMutex.count());
- _threadMutex.unlock();
+ _threadMutex.unlock();
+ }
}
public void
- initiateServerShutdown()
+ initiateShutdown()
{
//System.out.println("ThreadPool - initiate server shutdown");
setInterrupt(1);
}
public synchronized void
- waitUntilServerFinished()
- {
- while (_servers != 0 && _threadNum != 0)
- {
- try
- {
- wait();
- }
- catch (InterruptedException ex)
- {
- }
- }
-
- if (_servers != 0)
- {
- _logger.error("can't wait for graceful server termination in thread pool\n" +
- "since all threads have vanished");
- }
- }
-
- public synchronized void
waitUntilFinished()
{
while (_handlers != 0 && _threadNum != 0)
@@ -103,14 +58,10 @@ public final class ThreadPool
}
}
- if (_clients + _servers != 0)
+ if (_handlers != 0)
{
- _logger.error("can't wait for graceful application termination in thread pool\n" +
- "since all threads have vanished");
- }
- else
- {
- assert(_handlers == 0);
+ _instance.logger().error("can't wait for graceful server termination in thread pool\n" +
+ "since all threads have vanished");
}
}
@@ -139,38 +90,16 @@ public final class ThreadPool
}
}
- public synchronized void
- setMaxConnections(int maxConnections)
- {
- if (maxConnections < _threadNum + 1 && maxConnections != 0)
- {
- _maxConnections = _threadNum + 1;
- }
- else
- {
- _maxConnections = maxConnections;
- }
- }
-
- public synchronized int
- getMaxConnections()
- {
- return _maxConnections;
- }
-
//
// Only for use by Instance
//
- ThreadPool(Instance instance)
+ ThreadPool(Instance instance, boolean server)
{
_instance = instance;
- _logger = _instance.logger();
- _properties = _instance.properties();
_destroyed = false;
_handlers = 0;
- _clients = 0;
- _servers = 0;
_timeout = 0;
+ _multipleThreads = false;
Network.SocketPair pair = Network.createPipe();
_fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
@@ -206,13 +135,26 @@ public final class ThreadPool
//
_keysIter = null;
- _timeout = _properties.getPropertyAsInt("Ice.ServerIdleTime");
- _threadNum = _properties.getPropertyAsIntWithDefault("Ice.ThreadPool.Size", 10);
+ if (server)
+ {
+ _timeout = _instance.properties().getPropertyAsInt("Ice.ServerIdleTime");
+ _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ServerThreadPool.Size", 10);
+ }
+ else
+ {
+ _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ClientThreadPool.Size", 10);
+ }
+
if (_threadNum < 1)
{
_threadNum = 1;
}
+ if (_threadNum > 1)
+ {
+ _multipleThreads = true;
+ }
+
try
{
_threads = new EventHandlerThread[_threadNum];
@@ -227,9 +169,6 @@ public final class ThreadPool
destroy();
throw ex;
}
-
- // Must be called after _threadNum is set.
- setMaxConnections(_properties.getPropertyAsInt("Ice.ThreadPool.MaxConnections"));
}
protected void
@@ -378,8 +317,11 @@ catch (RuntimeException ex)
while (true)
{
- _threadMutex.lock();
+ if (_multipleThreads)
+ {
+ _threadMutex.lock();
//System.out.println("ThreadPool - thread " + Thread.currentThread() + " has the lock");
+ }
repeatSelect:
@@ -520,7 +462,7 @@ catch (RuntimeException ex)
if (change.handler != null) // Addition if handler is set.
{
-//System.out.println("ThreadPool - adding handler");
+//System.err.println("ThreadPool - adding handler" + change.handler + ", stream = " + change.handler._stream);
int op;
if ((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
{
@@ -531,7 +473,6 @@ catch (RuntimeException ex)
op = java.nio.channels.SelectionKey.OP_ACCEPT;
}
- _handlers++;
try
{
change.fd.register(_selector, op, change.handler);
@@ -549,7 +490,6 @@ catch (RuntimeException ex)
assert(key != null);
handler = (EventHandler)key.attachment();
finished = true;
- --_handlers;
key.cancel();
// Don't goto repeatSelect; we have to call
// finished() on the event handler below, outside
@@ -587,6 +527,7 @@ catch (RuntimeException ex)
}
handler = (EventHandler)key.attachment();
+//System.err.println("Selected handler with stream " + handler._stream + " - key is " + (key.isValid() ? "" : "NOT ") + "valid");
}
}
@@ -598,25 +539,14 @@ catch (RuntimeException ex)
// Notify a handler about it's removal from the thread
// pool.
//
- handler.finished();
- handler._stream.destroy();
+ handler.finished(this);
synchronized (this)
{
- if (handler.server())
- {
- assert(_servers > 0);
- --_servers;
- }
- else
- {
- assert(_clients > 0);
- --_clients;
- }
-//System.out.println("ThreadPool - handler is finished - _handlers = " + _handlers + ", _clients = " + _clients + ", _servers = " + _servers);
- if (_clients == 0 || _servers == 0)
+ assert(_handlers > 0);
+ if (--_handlers == 0)
{
- notifyAll(); // For waitUntil...Finished() methods.
+ notifyAll(); // For waitUntilFinished().
}
}
}
@@ -656,7 +586,7 @@ catch (RuntimeException ex)
assert(stream.pos() == stream.size());
}
- handler.message(stream);
+ handler.message(stream, this);
}
finally
{
@@ -751,8 +681,6 @@ catch (RuntimeException ex)
}
private Instance _instance;
- private Ice.Logger _logger;
- private Ice.Properties _properties;
private boolean _destroyed;
private java.nio.channels.ReadableByteChannel _fdIntrRead;
private java.nio.channels.SelectionKey _fdIntrReadKey;
@@ -762,10 +690,9 @@ catch (RuntimeException ex)
private java.util.Iterator _keysIter;
private java.util.LinkedList _changes = new java.util.LinkedList();
private int _handlers;
- private int _clients;
- private int _servers;
private int _timeout;
private RecursiveMutex _threadMutex = new RecursiveMutex();
+ private boolean _multipleThreads;
private final static class EventHandlerThread extends Thread
{
@@ -790,7 +717,7 @@ catch (RuntimeException ex)
ex.printStackTrace(pw);
pw.flush();
String s = "exception in thread pool:\n" + sw.toString();
- _pool._logger.error(s);
+ _pool._instance.logger().error(s);
}
catch (RuntimeException ex)
{
@@ -799,7 +726,7 @@ catch (RuntimeException ex)
ex.printStackTrace(pw);
pw.flush();
String s = "unknown exception in thread pool:\n" + sw.toString();
- _pool._logger.error(s);
+ _pool._instance.logger().error(s);
}
synchronized(_pool)
@@ -832,7 +759,4 @@ catch (RuntimeException ex)
}
private EventHandlerThread[] _threads;
private int _threadNum; // Number of running threads
- private int _maxConnections; // Maximum number of connections. If set to
- // zero, the number of connections is not
- // limited.
}