diff options
author | Marc Laukien <marc@zeroc.com> | 2002-12-16 13:21:06 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2002-12-16 13:21:06 +0000 |
commit | 6ed1dedc9c45a35209d174c5c018bd68637b39d9 (patch) | |
tree | 9f38fdc277bae25dafeabe193c98c6e331fca5f3 /java/src/IceInternal/ThreadPool.java | |
parent | minor fixes (diff) | |
download | ice-6ed1dedc9c45a35209d174c5c018bd68637b39d9.tar.bz2 ice-6ed1dedc9c45a35209d174c5c018bd68637b39d9.tar.xz ice-6ed1dedc9c45a35209d174c5c018bd68637b39d9.zip |
reimplementation of shutdown/destroy/deactivate/etc.
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 370 |
1 files changed, 152 insertions, 218 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 33b15884b42..4576880f122 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -29,11 +29,8 @@ public final class ThreadPool { if(TRACE_REGISTRATION) { - trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd + - ", handler count = " + (_handlers + 1)); + trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd); } - - ++_handlers; _changes.add(new FdHandlerPair(fd, handler)); setInterrupt(0); } @@ -88,41 +85,6 @@ public final class ThreadPool setInterrupt(1); } - public synchronized void - waitUntilFinished() - { - if(TRACE_SHUTDOWN) - { - trace("waiting until finished..."); - } - - while(_handlers != 0 && _threadNum != 0) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - if(_handlers != 0) - { - _instance.logger().error("can't wait for graceful application termination in thread pool\n" + - "since all threads have vanished"); - } - else - { - assert(_handlerMap.isEmpty()); - } - - if(TRACE_SHUTDOWN) - { - trace("finished."); - } - } - public void joinWithAllThreads() { @@ -155,7 +117,6 @@ public final class ThreadPool { _instance = instance; _destroyed = false; - _handlers = 0; _timeout = 0; _multipleThreads = false; _name = name; @@ -183,23 +144,24 @@ public final class ThreadPool // _keys = _selector.selectedKeys(); + int threadNum; if(server) { _timeout = _instance.properties().getPropertyAsInt("Ice.ServerIdleTime"); _timeoutMillis = _timeout * 1000; - _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10); + threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10); } else { - _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 1); + threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 1); } - if(_threadNum < 1) + if(threadNum < 1) { - _threadNum = 1; + threadNum = 1; } - if(_threadNum > 1) + if(threadNum > 1) { _multipleThreads = true; } @@ -216,8 +178,8 @@ public final class ThreadPool try { - _threads = new EventHandlerThread[_threadNum]; - for(int i = 0; i < _threadNum; i++) + _threads = new EventHandlerThread[threadNum]; + for(int i = 0; i < threadNum; i++) { _threads[i] = new EventHandlerThread(threadNamePrefix + _name + "-" + i); _threads[i].start(); @@ -387,8 +349,6 @@ public final class ThreadPool private void run(BasicStream stream) { - boolean shutdown = false; - while(true) { if(_multipleThreads) @@ -405,21 +365,6 @@ public final class ThreadPool while(true) { - if(shutdown) // Shutdown has been initiated. - { - if(TRACE_SHUTDOWN) - { - trace("shutdown detected"); - } - - shutdown = false; - ObjectAdapterFactory factory = _instance.objectAdapterFactory(); - if(factory != null) - { - factory.shutdown(); - } - } - if(TRACE_REGISTRATION) { java.util.Set keys = _selector.keys(); @@ -433,7 +378,7 @@ public final class ThreadPool } select(); - if(_keys.size() == 0) // Timeout. + if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout. { if(TRACE_SELECT) { @@ -443,12 +388,13 @@ public final class ThreadPool assert(_timeout > 0); _timeout = 0; _timeoutMillis = 0; - shutdown = true; + initiateShutdown(); continue repeatSelect; } EventHandler handler = null; boolean finished = false; + boolean shutdown = false; synchronized(this) { @@ -493,71 +439,68 @@ public final class ThreadPool shutdown = clearInterrupt(); - // - // Server shutdown? - // - if(shutdown) + if(!shutdown) { - continue repeatSelect; - } - - // - // An event handler must have been registered or - // unregistered. - // - assert(!_changes.isEmpty()); - FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); - - if(change.handler != null) // Addition if handler is set. - { - int op; - if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) - { - op = java.nio.channels.SelectionKey.OP_READ; - } - else - { - op = java.nio.channels.SelectionKey.OP_ACCEPT; - } - - java.nio.channels.SelectionKey key = null; - try - { - key = change.fd.register(_selector, op, change.handler); - } - catch(java.nio.channels.ClosedChannelException ex) - { - assert(false); - } - _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key)); - - if(TRACE_REGISTRATION) - { - trace("added handler (" + change.handler.getClass().getName() + ") for fd " + - change.fd); - } - - continue repeatSelect; - } - else // Removal if handler is not set. - { - HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd); - assert(pair != null); - handler = pair.handler; - finished = true; - pair.key.cancel(); - - if(TRACE_REGISTRATION) - { - trace("removed handler (" + handler.getClass().getName() + ") for fd " + change.fd); - } - - // Don't goto repeatSelect; we have to call - // finished() on the event handler below, outside - // the thread synchronization. - } - } - else + // + // An event handler must have been + // registered or unregistered. + // + assert(!_changes.isEmpty()); + FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); + + if(change.handler != null) // Addition if handler is set. + { + int op; + if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) + { + op = java.nio.channels.SelectionKey.OP_READ; + } + else + { + op = java.nio.channels.SelectionKey.OP_ACCEPT; + } + + java.nio.channels.SelectionKey key = null; + try + { + key = change.fd.register(_selector, op, change.handler); + } + catch(java.nio.channels.ClosedChannelException ex) + { + assert(false); + } + _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key)); + + if(TRACE_REGISTRATION) + { + trace("added handler (" + change.handler.getClass().getName() + ") for fd " + + change.fd); + } + + continue repeatSelect; + } + else // Removal if handler is not set. + { + HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd); + assert(pair != null); + handler = pair.handler; + finished = true; + pair.key.cancel(); + + if(TRACE_REGISTRATION) + { + trace("removed handler (" + handler.getClass().getName() + ") for fd " + + change.fd); + } + + // Don't goto repeatSelect; we have to + // call finished() on the event + // handler below, outside the thread + // synchronization. + } + } + } + else { java.nio.channels.SelectionKey key = null; java.util.Iterator iter = _keys.iterator(); @@ -594,80 +537,92 @@ public final class ThreadPool } } - assert(handler != null); + assert(handler != null || shutdown); - if(finished) + if(shutdown) // Shutdown has been initiated. { - // - // Notify a handler about it's removal from the thread - // pool. - // - try - { - handler.finished(this); - } - catch(Ice.LocalException ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception while calling finished():\n" + sw.toString() + "\n" + handler.toString(); - _instance.logger().error(s); - } - - synchronized(this) + if(TRACE_SHUTDOWN) { - assert(_handlers > 0); - if(--_handlers == 0) - { - notifyAll(); // For waitUntilFinished(). - } + trace("shutdown detected"); } - } - else - { - // - // If the handler is "readable", try to read a message. - // - try - { - if(handler.readable()) - { - try - { - read(handler); - } - catch(Ice.TimeoutException ex) // Expected - { - continue repeatSelect; - } - catch(Ice.LocalException ex) - { - if(TRACE_EXCEPTION) - { - trace("informing handler (" + handler.getClass().getName() + - ") about exception " + ex); - ex.printStackTrace(); - } - - handler.exception(ex); - continue repeatSelect; - } - stream.swap(handler._stream); - assert(stream.pos() == stream.size()); - } + ObjectAdapterFactory factory = _instance.objectAdapterFactory(); + if(factory == null) + { + continue repeatSelect; + } - handler.message(stream, this); - } - finally - { - stream.reset(); - } + promoteFollower(); + factory.shutdown(); } - - break; // inner while loop + else + { + if(finished) + { + // + // Notify a handler about it's removal from + // the thread pool. + // + try + { + handler.finished(this); + } + catch(Ice.LocalException ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "exception while calling finished():\n" + sw.toString() + "\n" + + handler.toString(); + _instance.logger().error(s); + } + } + else + { + // + // If the handler is "readable", try to read a + // message. + // + try + { + if(handler.readable()) + { + try + { + read(handler); + } + catch(Ice.TimeoutException ex) // Expected. + { + continue repeatSelect; + } + catch(Ice.LocalException ex) + { + if(TRACE_EXCEPTION) + { + trace("informing handler (" + handler.getClass().getName() + + ") about exception " + ex); + ex.printStackTrace(); + } + + handler.exception(ex); + continue repeatSelect; + } + + stream.swap(handler._stream); + assert(stream.pos() == stream.size()); + } + + handler.message(stream, this); + } + finally + { + stream.reset(); + } + } + } + + break; // Inner while loop. } } } @@ -890,7 +845,6 @@ public final class ThreadPool private java.util.Set _keys; private java.util.LinkedList _changes = new java.util.LinkedList(); private java.util.HashMap _handlerMap = new java.util.HashMap(); - private int _handlers; private int _timeout; private int _timeoutMillis; private RecursiveMutex _threadMutex = new RecursiveMutex(); @@ -932,25 +886,6 @@ public final class ThreadPool _instance.logger().error(s); } - synchronized(ThreadPool.this) - { - --_threadNum; - assert(_threadNum >= 0); - - // - // The notifyAll() shouldn't be needed, *except* if one of the - // threads exits because of an exception. (Which is an error - // condition in Ice and if it happens needs to be debugged.) - // However, I call notifyAll() anyway, in all cases, using a - // "defensive" programming approach when it comes to - // multithreading. - // - if(_threadNum == 0) - { - ThreadPool.this.notifyAll(); // For waitUntil...Finished() methods. - } - } - if(TRACE_THREAD) { trace("run() terminated - promoting follower"); @@ -962,5 +897,4 @@ public final class ThreadPool } } private EventHandlerThread[] _threads; - private int _threadNum; // Number of running threads } |