summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2002-12-16 13:21:06 +0000
committerMarc Laukien <marc@zeroc.com>2002-12-16 13:21:06 +0000
commit6ed1dedc9c45a35209d174c5c018bd68637b39d9 (patch)
tree9f38fdc277bae25dafeabe193c98c6e331fca5f3 /java/src/IceInternal/ThreadPool.java
parentminor fixes (diff)
downloadice-6ed1dedc9c45a35209d174c5c018bd68637b39d9.tar.bz2
ice-6ed1dedc9c45a35209d174c5c018bd68637b39d9.tar.xz
ice-6ed1dedc9c45a35209d174c5c018bd68637b39d9.zip
reimplementation of shutdown/destroy/deactivate/etc.
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java370
1 files changed, 152 insertions, 218 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 33b15884b42..4576880f122 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -29,11 +29,8 @@ public final class ThreadPool
{
if(TRACE_REGISTRATION)
{
- trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd +
- ", handler count = " + (_handlers + 1));
+ trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd);
}
-
- ++_handlers;
_changes.add(new FdHandlerPair(fd, handler));
setInterrupt(0);
}
@@ -88,41 +85,6 @@ public final class ThreadPool
setInterrupt(1);
}
- public synchronized void
- waitUntilFinished()
- {
- if(TRACE_SHUTDOWN)
- {
- trace("waiting until finished...");
- }
-
- while(_handlers != 0 && _threadNum != 0)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
-
- if(_handlers != 0)
- {
- _instance.logger().error("can't wait for graceful application termination in thread pool\n" +
- "since all threads have vanished");
- }
- else
- {
- assert(_handlerMap.isEmpty());
- }
-
- if(TRACE_SHUTDOWN)
- {
- trace("finished.");
- }
- }
-
public void
joinWithAllThreads()
{
@@ -155,7 +117,6 @@ public final class ThreadPool
{
_instance = instance;
_destroyed = false;
- _handlers = 0;
_timeout = 0;
_multipleThreads = false;
_name = name;
@@ -183,23 +144,24 @@ public final class ThreadPool
//
_keys = _selector.selectedKeys();
+ int threadNum;
if(server)
{
_timeout = _instance.properties().getPropertyAsInt("Ice.ServerIdleTime");
_timeoutMillis = _timeout * 1000;
- _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10);
+ threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10);
}
else
{
- _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 1);
+ threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 1);
}
- if(_threadNum < 1)
+ if(threadNum < 1)
{
- _threadNum = 1;
+ threadNum = 1;
}
- if(_threadNum > 1)
+ if(threadNum > 1)
{
_multipleThreads = true;
}
@@ -216,8 +178,8 @@ public final class ThreadPool
try
{
- _threads = new EventHandlerThread[_threadNum];
- for(int i = 0; i < _threadNum; i++)
+ _threads = new EventHandlerThread[threadNum];
+ for(int i = 0; i < threadNum; i++)
{
_threads[i] = new EventHandlerThread(threadNamePrefix + _name + "-" + i);
_threads[i].start();
@@ -387,8 +349,6 @@ public final class ThreadPool
private void
run(BasicStream stream)
{
- boolean shutdown = false;
-
while(true)
{
if(_multipleThreads)
@@ -405,21 +365,6 @@ public final class ThreadPool
while(true)
{
- if(shutdown) // Shutdown has been initiated.
- {
- if(TRACE_SHUTDOWN)
- {
- trace("shutdown detected");
- }
-
- shutdown = false;
- ObjectAdapterFactory factory = _instance.objectAdapterFactory();
- if(factory != null)
- {
- factory.shutdown();
- }
- }
-
if(TRACE_REGISTRATION)
{
java.util.Set keys = _selector.keys();
@@ -433,7 +378,7 @@ public final class ThreadPool
}
select();
- if(_keys.size() == 0) // Timeout.
+ if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout.
{
if(TRACE_SELECT)
{
@@ -443,12 +388,13 @@ public final class ThreadPool
assert(_timeout > 0);
_timeout = 0;
_timeoutMillis = 0;
- shutdown = true;
+ initiateShutdown();
continue repeatSelect;
}
EventHandler handler = null;
boolean finished = false;
+ boolean shutdown = false;
synchronized(this)
{
@@ -493,71 +439,68 @@ public final class ThreadPool
shutdown = clearInterrupt();
- //
- // Server shutdown?
- //
- if(shutdown)
+ if(!shutdown)
{
- continue repeatSelect;
- }
-
- //
- // An event handler must have been registered or
- // unregistered.
- //
- assert(!_changes.isEmpty());
- FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
-
- if(change.handler != null) // Addition if handler is set.
- {
- int op;
- if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
- {
- op = java.nio.channels.SelectionKey.OP_READ;
- }
- else
- {
- op = java.nio.channels.SelectionKey.OP_ACCEPT;
- }
-
- java.nio.channels.SelectionKey key = null;
- try
- {
- key = change.fd.register(_selector, op, change.handler);
- }
- catch(java.nio.channels.ClosedChannelException ex)
- {
- assert(false);
- }
- _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
-
- if(TRACE_REGISTRATION)
- {
- trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
- change.fd);
- }
-
- continue repeatSelect;
- }
- else // Removal if handler is not set.
- {
- HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd);
- assert(pair != null);
- handler = pair.handler;
- finished = true;
- pair.key.cancel();
-
- if(TRACE_REGISTRATION)
- {
- trace("removed handler (" + handler.getClass().getName() + ") for fd " + change.fd);
- }
-
- // Don't goto repeatSelect; we have to call
- // finished() on the event handler below, outside
- // the thread synchronization.
- }
- }
- else
+ //
+ // An event handler must have been
+ // registered or unregistered.
+ //
+ assert(!_changes.isEmpty());
+ FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
+
+ if(change.handler != null) // Addition if handler is set.
+ {
+ int op;
+ if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
+ {
+ op = java.nio.channels.SelectionKey.OP_READ;
+ }
+ else
+ {
+ op = java.nio.channels.SelectionKey.OP_ACCEPT;
+ }
+
+ java.nio.channels.SelectionKey key = null;
+ try
+ {
+ key = change.fd.register(_selector, op, change.handler);
+ }
+ catch(java.nio.channels.ClosedChannelException ex)
+ {
+ assert(false);
+ }
+ _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
+
+ if(TRACE_REGISTRATION)
+ {
+ trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
+ change.fd);
+ }
+
+ continue repeatSelect;
+ }
+ else // Removal if handler is not set.
+ {
+ HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd);
+ assert(pair != null);
+ handler = pair.handler;
+ finished = true;
+ pair.key.cancel();
+
+ if(TRACE_REGISTRATION)
+ {
+ trace("removed handler (" + handler.getClass().getName() + ") for fd " +
+ change.fd);
+ }
+
+ // Don't goto repeatSelect; we have to
+ // call finished() on the event
+ // handler below, outside the thread
+ // synchronization.
+ }
+ }
+ }
+ else
{
java.nio.channels.SelectionKey key = null;
java.util.Iterator iter = _keys.iterator();
@@ -594,80 +537,92 @@ public final class ThreadPool
}
}
- assert(handler != null);
+ assert(handler != null || shutdown);
- if(finished)
+ if(shutdown) // Shutdown has been initiated.
{
- //
- // Notify a handler about it's removal from the thread
- // pool.
- //
- try
- {
- handler.finished(this);
- }
- catch(Ice.LocalException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception while calling finished():\n" + sw.toString() + "\n" + handler.toString();
- _instance.logger().error(s);
- }
-
- synchronized(this)
+ if(TRACE_SHUTDOWN)
{
- assert(_handlers > 0);
- if(--_handlers == 0)
- {
- notifyAll(); // For waitUntilFinished().
- }
+ trace("shutdown detected");
}
- }
- else
- {
- //
- // If the handler is "readable", try to read a message.
- //
- try
- {
- if(handler.readable())
- {
- try
- {
- read(handler);
- }
- catch(Ice.TimeoutException ex) // Expected
- {
- continue repeatSelect;
- }
- catch(Ice.LocalException ex)
- {
- if(TRACE_EXCEPTION)
- {
- trace("informing handler (" + handler.getClass().getName() +
- ") about exception " + ex);
- ex.printStackTrace();
- }
-
- handler.exception(ex);
- continue repeatSelect;
- }
- stream.swap(handler._stream);
- assert(stream.pos() == stream.size());
- }
+ ObjectAdapterFactory factory = _instance.objectAdapterFactory();
+ if(factory == null)
+ {
+ continue repeatSelect;
+ }
- handler.message(stream, this);
- }
- finally
- {
- stream.reset();
- }
+ promoteFollower();
+ factory.shutdown();
}
-
- break; // inner while loop
+ else
+ {
+ if(finished)
+ {
+ //
+ // Notify a handler about it's removal from
+ // the thread pool.
+ //
+ try
+ {
+ handler.finished(this);
+ }
+ catch(Ice.LocalException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "exception while calling finished():\n" + sw.toString() + "\n" +
+ handler.toString();
+ _instance.logger().error(s);
+ }
+ }
+ else
+ {
+ //
+ // If the handler is "readable", try to read a
+ // message.
+ //
+ try
+ {
+ if(handler.readable())
+ {
+ try
+ {
+ read(handler);
+ }
+ catch(Ice.TimeoutException ex) // Expected.
+ {
+ continue repeatSelect;
+ }
+ catch(Ice.LocalException ex)
+ {
+ if(TRACE_EXCEPTION)
+ {
+ trace("informing handler (" + handler.getClass().getName() +
+ ") about exception " + ex);
+ ex.printStackTrace();
+ }
+
+ handler.exception(ex);
+ continue repeatSelect;
+ }
+
+ stream.swap(handler._stream);
+ assert(stream.pos() == stream.size());
+ }
+
+ handler.message(stream, this);
+ }
+ finally
+ {
+ stream.reset();
+ }
+ }
+ }
+
+ break; // Inner while loop.
}
}
}
@@ -890,7 +845,6 @@ public final class ThreadPool
private java.util.Set _keys;
private java.util.LinkedList _changes = new java.util.LinkedList();
private java.util.HashMap _handlerMap = new java.util.HashMap();
- private int _handlers;
private int _timeout;
private int _timeoutMillis;
private RecursiveMutex _threadMutex = new RecursiveMutex();
@@ -932,25 +886,6 @@ public final class ThreadPool
_instance.logger().error(s);
}
- synchronized(ThreadPool.this)
- {
- --_threadNum;
- assert(_threadNum >= 0);
-
- //
- // The notifyAll() shouldn't be needed, *except* if one of the
- // threads exits because of an exception. (Which is an error
- // condition in Ice and if it happens needs to be debugged.)
- // However, I call notifyAll() anyway, in all cases, using a
- // "defensive" programming approach when it comes to
- // multithreading.
- //
- if(_threadNum == 0)
- {
- ThreadPool.this.notifyAll(); // For waitUntil...Finished() methods.
- }
- }
-
if(TRACE_THREAD)
{
trace("run() terminated - promoting follower");
@@ -962,5 +897,4 @@ public final class ThreadPool
}
}
private EventHandlerThread[] _threads;
- private int _threadNum; // Number of running threads
}