summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java1528
1 files changed, 764 insertions, 764 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 948c56d4418..30ed86a3082 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -26,24 +26,24 @@ public final class ThreadPool
_destroyed = false;
_prefix = prefix;
_timeout = timeout;
- _threadIndex = 0;
- _running = 0;
- _inUse = 0;
- _load = 1.0;
- _promote = true;
- _warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
-
- String programName = _instance.initializationData().properties.getProperty("Ice.ProgramName");
+ _threadIndex = 0;
+ _running = 0;
+ _inUse = 0;
+ _load = 1.0;
+ _promote = true;
+ _warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
+
+ String programName = _instance.initializationData().properties.getProperty("Ice.ProgramName");
if(programName.length() > 0)
{
_programNamePrefix = programName + "-";
}
- else
- {
- _programNamePrefix = "";
- }
+ else
+ {
+ _programNamePrefix = "";
+ }
- Network.SocketPair pair = Network.createPipe();
+ Network.SocketPair pair = Network.createPipe();
_fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
_fdIntrWrite = pair.sink;
@@ -66,64 +66,64 @@ public final class ThreadPool
//
_keys = _selector.selectedKeys();
- //
- // We use just one thread as the default. This is the fastest
- // possible setting, still allows one level of nesting, and
- // doesn't require to make the servants thread safe.
- //
- int size = _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1);
- if(size < 1)
- {
- String s = _prefix + ".Size < 1; Size adjusted to 1";
- _instance.initializationData().logger.warning(s);
- size = 1;
- }
-
- int sizeMax =
- _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
- if(sizeMax < size)
- {
- String s = _prefix + ".SizeMax < " + _prefix + ".Size; SizeMax adjusted to Size (" + size + ")";
- _instance.initializationData().logger.warning(s);
- sizeMax = size;
- }
-
- int sizeWarn = _instance.initializationData().properties.getPropertyAsIntWithDefault(
- _prefix + ".SizeWarn", sizeMax * 80 / 100);
- if(sizeWarn > sizeMax)
- {
- String s = _prefix + ".SizeWarn > " + _prefix + ".SizeMax; adjusted SizeWarn to SizeMax (" + sizeMax + ")";
- _instance.initializationData().logger.warning(s);
- sizeWarn = sizeMax;
- }
-
- _size = size;
- _sizeMax = sizeMax;
- _sizeWarn = sizeWarn;
-
- try
+ //
+ // We use just one thread as the default. This is the fastest
+ // possible setting, still allows one level of nesting, and
+ // doesn't require to make the servants thread safe.
+ //
+ int size = _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1);
+ if(size < 1)
+ {
+ String s = _prefix + ".Size < 1; Size adjusted to 1";
+ _instance.initializationData().logger.warning(s);
+ size = 1;
+ }
+
+ int sizeMax =
+ _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
+ if(sizeMax < size)
+ {
+ String s = _prefix + ".SizeMax < " + _prefix + ".Size; SizeMax adjusted to Size (" + size + ")";
+ _instance.initializationData().logger.warning(s);
+ sizeMax = size;
+ }
+
+ int sizeWarn = _instance.initializationData().properties.getPropertyAsIntWithDefault(
+ _prefix + ".SizeWarn", sizeMax * 80 / 100);
+ if(sizeWarn > sizeMax)
+ {
+ String s = _prefix + ".SizeWarn > " + _prefix + ".SizeMax; adjusted SizeWarn to SizeMax (" + sizeMax + ")";
+ _instance.initializationData().logger.warning(s);
+ sizeWarn = sizeMax;
+ }
+
+ _size = size;
+ _sizeMax = sizeMax;
+ _sizeWarn = sizeWarn;
+
+ try
{
_threads = new java.util.ArrayList();
for(int i = 0; i < _size; i++)
{
- EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" +
- _threadIndex++);
+ EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" +
+ _threadIndex++);
_threads.add(thread);
- thread.start();
- ++_running;
+ thread.start();
+ ++_running;
}
}
catch(RuntimeException ex)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString();
- _instance.initializationData().logger.error(s);
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString();
+ _instance.initializationData().logger.error(s);
destroy();
- joinWithAllThreads();
+ joinWithAllThreads();
throw ex;
}
}
@@ -144,8 +144,8 @@ public final class ThreadPool
}
assert(!_destroyed);
- assert(_handlerMap.isEmpty());
- assert(_changes.isEmpty());
+ assert(_handlerMap.isEmpty());
+ assert(_changes.isEmpty());
_destroyed = true;
setInterrupt();
}
@@ -157,7 +157,7 @@ public final class ThreadPool
{
trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd);
}
- assert(!_destroyed);
+ assert(!_destroyed);
_changes.add(new FdHandlerPair(fd, handler));
setInterrupt();
}
@@ -188,7 +188,7 @@ public final class ThreadPool
}
}
- assert(!_destroyed);
+ assert(!_destroyed);
_changes.add(new FdHandlerPair(fd, null));
setInterrupt();
}
@@ -198,64 +198,64 @@ public final class ThreadPool
{
if(_sizeMax > 1)
{
- synchronized(this)
- {
- assert(!_promote);
- _promote = true;
- notify();
-
- if(!_destroyed)
- {
- assert(_inUse >= 0);
- ++_inUse;
-
- if(_inUse == _sizeWarn)
- {
- String s = "thread pool `" + _prefix + "' is running low on threads\n"
- + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
- _instance.initializationData().logger.warning(s);
- }
-
- assert(_inUse <= _running);
- if(_inUse < _sizeMax && _inUse == _running)
- {
- try
- {
- EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" +
- _threadIndex++);
- _threads.add(thread);
- thread.start();
- ++_running;
- }
- catch(RuntimeException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- }
- }
- }
- }
+ synchronized(this)
+ {
+ assert(!_promote);
+ _promote = true;
+ notify();
+
+ if(!_destroyed)
+ {
+ assert(_inUse >= 0);
+ ++_inUse;
+
+ if(_inUse == _sizeWarn)
+ {
+ String s = "thread pool `" + _prefix + "' is running low on threads\n"
+ + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
+ _instance.initializationData().logger.warning(s);
+ }
+
+ assert(_inUse <= _running);
+ if(_inUse < _sizeMax && _inUse == _running)
+ {
+ try
+ {
+ EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" +
+ _threadIndex++);
+ _threads.add(thread);
+ thread.start();
+ ++_running;
+ }
+ catch(RuntimeException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString();
+ _instance.initializationData().logger.error(s);
+ }
+ }
+ }
+ }
}
}
public void
joinWithAllThreads()
{
- //
- // _threads is immutable after destroy() has been called,
- // therefore no synchronization is needed. (Synchronization
- // wouldn't be possible here anyway, because otherwise the
- // other threads would never terminate.)
- //
- java.util.Iterator i = _threads.iterator();
- while(i.hasNext())
- {
- EventHandlerThread thread = (EventHandlerThread)i.next();
-
+ //
+ // _threads is immutable after destroy() has been called,
+ // therefore no synchronization is needed. (Synchronization
+ // wouldn't be possible here anyway, because otherwise the
+ // other threads would never terminate.)
+ //
+ java.util.Iterator i = _threads.iterator();
+ while(i.hasNext())
+ {
+ EventHandlerThread thread = (EventHandlerThread)i.next();
+
while(true)
{
try
@@ -269,72 +269,72 @@ public final class ThreadPool
}
}
- //
- // Cleanup the selector, and the socket pair.
- //
- try
- {
- if(_selector != null)
- {
- try
- {
- _selector.close();
- }
- catch(java.io.IOException ex)
- {
- //
- // BUGFIX:
- //
- // Ignore this exception. This shouldn't happen
- // but for some reasons the close() call raises
- // "java.io.IOException: Bad file descriptor" on
- // Mac OS X 10.3.x (it works fine on OS X 10.4.x)
- //
- }
- _selector = null;
- }
-
- if(_fdIntrWrite != null)
- {
- try
- {
- _fdIntrWrite.close();
- }
- catch(java.io.IOException ex)
- {
- //
- // BUGFIX:
- //
- // Ignore this exception. This shouldn't happen
- // but for some reasons the close() call raises
- // "java.io.IOException: No such file or
- // directory" under Linux with JDK 1.4.2.
- //
- }
- _fdIntrWrite = null;
- }
-
- if(_fdIntrRead != null)
- {
- _fdIntrRead.close();
- _fdIntrRead = null;
- }
- }
- catch(java.io.IOException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "' while calling close():\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- }
+ //
+ // Cleanup the selector, and the socket pair.
+ //
+ try
+ {
+ if(_selector != null)
+ {
+ try
+ {
+ _selector.close();
+ }
+ catch(java.io.IOException ex)
+ {
+ //
+ // BUGFIX:
+ //
+ // Ignore this exception. This shouldn't happen
+ // but for some reasons the close() call raises
+ // "java.io.IOException: Bad file descriptor" on
+ // Mac OS X 10.3.x (it works fine on OS X 10.4.x)
+ //
+ }
+ _selector = null;
+ }
+
+ if(_fdIntrWrite != null)
+ {
+ try
+ {
+ _fdIntrWrite.close();
+ }
+ catch(java.io.IOException ex)
+ {
+ //
+ // BUGFIX:
+ //
+ // Ignore this exception. This shouldn't happen
+ // but for some reasons the close() call raises
+ // "java.io.IOException: No such file or
+ // directory" under Linux with JDK 1.4.2.
+ //
+ }
+ _fdIntrWrite = null;
+ }
+
+ if(_fdIntrRead != null)
+ {
+ _fdIntrRead.close();
+ _fdIntrRead = null;
+ }
+ }
+ catch(java.io.IOException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "exception in `" + _prefix + "' while calling close():\n" + sw.toString();
+ _instance.initializationData().logger.error(s);
+ }
}
public String
prefix()
{
- return _prefix;
+ return _prefix;
}
private void
@@ -429,492 +429,492 @@ public final class ThreadPool
private boolean
run(BasicStream stream)
{
- if(_sizeMax > 1)
- {
- synchronized(this)
- {
- while(!_promote)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
-
- _promote = false;
- }
-
- if(TRACE_THREAD)
- {
- trace("thread " + Thread.currentThread() + " has the lock");
- }
- }
-
- while(true)
+ if(_sizeMax > 1)
{
- if(TRACE_REGISTRATION)
- {
- java.util.Set keys = _selector.keys();
- trace("selecting on " + keys.size() + " channels:");
- java.util.Iterator i = keys.iterator();
- while(i.hasNext())
- {
- java.nio.channels.SelectionKey key = (java.nio.channels.SelectionKey)i.next();
- trace(" " + key.channel());
- }
- }
-
- EventHandler handler = null;
-
- //
- // Only call select() if there are no pending handlers with additional data
- // for us to read.
- //
- if(!_pendingHandlers.isEmpty())
- {
- handler = (EventHandler)_pendingHandlers.removeFirst();
- }
- else
- {
- select();
- }
-
- boolean finished = false;
- boolean shutdown = false;
-
- if(handler == null)
- {
- synchronized(this)
- {
- if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout.
- {
- if(TRACE_SELECT)
- {
- trace("timeout");
- }
-
- assert(_timeout > 0);
- _timeout = 0;
- shutdown = true;
- }
- else
- {
- if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable())
- {
- if(TRACE_SELECT || TRACE_INTERRUPT)
- {
- trace("detected interrupt");
- }
-
- //
- // There are two possiblities for an interrupt:
- //
- // 1. The thread pool has been destroyed.
- //
- // 2. An event handler was registered or unregistered.
- //
-
- //
- // Thread pool destroyed?
- //
- if(_destroyed)
- {
- if(TRACE_SHUTDOWN)
- {
- trace("destroyed, thread id = " + Thread.currentThread());
- }
-
- //
- // Don't clear the interrupt fd if
- // destroyed, so that the other threads
- // exit as well.
- //
- return true;
- }
-
- //
- // Remove the interrupt channel from the
- // selected key set.
- //
- _keys.remove(_fdIntrReadKey);
-
- clearInterrupt();
-
- //
- // An event handler must have been registered
- // or unregistered.
- //
- assert(!_changes.isEmpty());
- FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
-
- if(change.handler != null) // Addition if handler is set.
- {
- int op;
- if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
- {
- op = java.nio.channels.SelectionKey.OP_READ;
- }
- else
- {
- op = java.nio.channels.SelectionKey.OP_ACCEPT;
- }
-
- java.nio.channels.SelectionKey key = null;
- try
- {
- key = change.fd.register(_selector, op, change.handler);
- }
- catch(java.nio.channels.ClosedChannelException ex)
- {
- assert(false);
- }
- _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
-
- if(TRACE_REGISTRATION)
- {
- trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
- change.fd);
- }
-
- continue;
- }
- else // Removal if handler is not set.
- {
- HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd);
- assert(pair != null);
- handler = pair.handler;
- finished = true;
- pair.key.cancel();
-
- if(TRACE_REGISTRATION)
- {
- trace("removed handler (" + handler.getClass().getName() + ") for fd " +
- change.fd);
- }
-
- // Don't continue; we have to call
- // finished() on the event handler below,
- // outside the thread synchronization.
- }
- }
- else
- {
- java.nio.channels.SelectionKey key = null;
- java.util.Iterator iter = _keys.iterator();
- while(iter.hasNext())
- {
- //
- // Ignore selection keys that have been cancelled
- //
- java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)iter.next();
- iter.remove();
- if(k.isValid() && k != _fdIntrReadKey)
- {
- if(TRACE_SELECT)
- {
- trace("found a key: " + keyToString(k));
- }
-
- key = k;
- break;
- }
- }
-
- if(key == null)
- {
- if(TRACE_SELECT)
- {
- trace("didn't find a valid key");
- }
-
- continue;
- }
-
- handler = (EventHandler)key.attachment();
- }
- }
- }
- }
-
- //
- // Now we are outside the thread synchronization.
- //
-
- if(shutdown)
- {
- if(TRACE_SHUTDOWN)
- {
- trace("shutdown detected");
- }
-
- //
- // Initiate server shutdown.
- //
- ObjectAdapterFactory factory;
- try
- {
- factory = _instance.objectAdapterFactory();
- }
- catch(Ice.CommunicatorDestroyedException e)
- {
- continue;
- }
-
- promoteFollower();
- factory.shutdown();
-
- //
- // No "continue", because we want shutdown to be done in
- // its own thread from this pool. Therefore we called
- // promoteFollower().
- //
- }
- else
- {
- assert(handler != null);
-
- if(finished)
- {
- //
- // Notify a handler about its removal from
- // the thread pool.
- //
- try
- {
- handler.finished(this);
- }
- catch(Ice.LocalException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "' while calling finished():\n" +
- sw.toString() + "\n" + handler.toString();
- _instance.initializationData().logger.error(s);
- }
-
- //
- // No "continue", because we want finished() to be
- // called in its own thread from this pool. Note
- // that this means that finished() must call
- // promoteFollower().
- //
- }
- else
- {
- //
- // If the handler is "readable", try to read a
- // message.
- //
- try
- {
- if(handler.readable())
- {
- try
- {
- //
- // If read returns true, the handler has more data for the thread pool
- // to process.
- //
- if(read(handler))
- {
- _pendingHandlers.add(handler);
- }
- }
- catch(Ice.TimeoutException ex) // Expected.
- {
- continue;
- }
- catch(Ice.DatagramLimitException ex) // Expected.
- {
- continue;
- }
- catch(Ice.SocketException ex)
- {
- if(TRACE_EXCEPTION)
- {
- trace("informing handler (" + handler.getClass().getName() +
- ") about exception " + ex);
- ex.printStackTrace();
- }
-
- handler.exception(ex);
- continue;
- }
- catch(Ice.LocalException ex)
- {
- if(handler.datagram())
- {
- if(_instance.initializationData().properties.getPropertyAsInt(
- "Ice.Warn.Connections") > 0)
- {
- _instance.initializationData().logger.warning(
- "datagram connection exception:\n" + ex + "\n" + handler.toString());
- }
- }
- else
- {
- if(TRACE_EXCEPTION)
- {
- trace("informing handler (" + handler.getClass().getName() +
- ") about exception " + ex);
- ex.printStackTrace();
- }
-
- handler.exception(ex);
- }
- continue;
- }
-
- stream.swap(handler._stream);
- assert(stream.pos() == stream.size());
- }
-
- //
- // Provide a new message to the handler.
- //
- try
- {
- handler.message(stream, this);
- }
- catch(Ice.LocalException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "' while calling message():\n" +
- sw.toString() + "\n" + handler.toString();
- _instance.initializationData().logger.error(s);
- }
-
- //
- // No "continue", because we want message() to
- // be called in its own thread from this
- // pool. Note that this means that message()
- // must call promoteFollower().
- //
- }
- finally
- {
- stream.reset();
- }
- }
- }
-
- if(_sizeMax > 1)
- {
- synchronized(this)
- {
- if(!_destroyed)
- {
- //
- // First we reap threads that have been
- // destroyed before.
- //
- int sz = _threads.size();
- assert(_running <= sz);
- if(_running < sz)
- {
- java.util.Iterator i = _threads.iterator();
- while(i.hasNext())
- {
- EventHandlerThread thread = (EventHandlerThread)i.next();
-
- if(!thread.isAlive())
- {
- try
- {
- thread.join();
- i.remove();
- }
- catch(InterruptedException ex)
- {
- }
- }
- }
- }
-
- //
- // Now we check if this thread can be destroyed, based
- // on a load factor.
- //
-
- //
- // The load factor jumps immediately to the number of
- // threads that are currently in use, but decays
- // exponentially if the number of threads in use is
- // smaller than the load factor. This reflects that we
- // create threads immediately when they are needed,
- // but want the number of threads to slowly decline to
- // the configured minimum.
- //
- double inUse = (double)_inUse;
- if(_load < inUse)
- {
- _load = inUse;
- }
- else
- {
- final double loadFactor = 0.05; // TODO: Configurable?
- final double oneMinusLoadFactor = 1 - loadFactor;
- _load = _load * oneMinusLoadFactor + _inUse * loadFactor;
- }
-
- if(_running > _size)
- {
- int load = (int)(_load + 0.5);
-
- //
- // We add one to the load factor because one
- // additional thread is needed for select().
- //
- if(load + 1 < _running)
- {
- assert(_inUse > 0);
- --_inUse;
-
- assert(_running > 0);
- --_running;
-
- return false;
- }
- }
-
- assert(_inUse > 0);
- --_inUse;
- }
-
- while(!_promote)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
-
- _promote = false;
- }
-
- if(TRACE_THREAD)
- {
- trace("thread " + Thread.currentThread() + " has the lock");
- }
- }
+ synchronized(this)
+ {
+ while(!_promote)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ _promote = false;
+ }
+
+ if(TRACE_THREAD)
+ {
+ trace("thread " + Thread.currentThread() + " has the lock");
+ }
+ }
+
+ while(true)
+ {
+ if(TRACE_REGISTRATION)
+ {
+ java.util.Set keys = _selector.keys();
+ trace("selecting on " + keys.size() + " channels:");
+ java.util.Iterator i = keys.iterator();
+ while(i.hasNext())
+ {
+ java.nio.channels.SelectionKey key = (java.nio.channels.SelectionKey)i.next();
+ trace(" " + key.channel());
+ }
+ }
+
+ EventHandler handler = null;
+
+ //
+ // Only call select() if there are no pending handlers with additional data
+ // for us to read.
+ //
+ if(!_pendingHandlers.isEmpty())
+ {
+ handler = (EventHandler)_pendingHandlers.removeFirst();
+ }
+ else
+ {
+ select();
+ }
+
+ boolean finished = false;
+ boolean shutdown = false;
+
+ if(handler == null)
+ {
+ synchronized(this)
+ {
+ if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout.
+ {
+ if(TRACE_SELECT)
+ {
+ trace("timeout");
+ }
+
+ assert(_timeout > 0);
+ _timeout = 0;
+ shutdown = true;
+ }
+ else
+ {
+ if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable())
+ {
+ if(TRACE_SELECT || TRACE_INTERRUPT)
+ {
+ trace("detected interrupt");
+ }
+
+ //
+ // There are two possiblities for an interrupt:
+ //
+ // 1. The thread pool has been destroyed.
+ //
+ // 2. An event handler was registered or unregistered.
+ //
+
+ //
+ // Thread pool destroyed?
+ //
+ if(_destroyed)
+ {
+ if(TRACE_SHUTDOWN)
+ {
+ trace("destroyed, thread id = " + Thread.currentThread());
+ }
+
+ //
+ // Don't clear the interrupt fd if
+ // destroyed, so that the other threads
+ // exit as well.
+ //
+ return true;
+ }
+
+ //
+ // Remove the interrupt channel from the
+ // selected key set.
+ //
+ _keys.remove(_fdIntrReadKey);
+
+ clearInterrupt();
+
+ //
+ // An event handler must have been registered
+ // or unregistered.
+ //
+ assert(!_changes.isEmpty());
+ FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
+
+ if(change.handler != null) // Addition if handler is set.
+ {
+ int op;
+ if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
+ {
+ op = java.nio.channels.SelectionKey.OP_READ;
+ }
+ else
+ {
+ op = java.nio.channels.SelectionKey.OP_ACCEPT;
+ }
+
+ java.nio.channels.SelectionKey key = null;
+ try
+ {
+ key = change.fd.register(_selector, op, change.handler);
+ }
+ catch(java.nio.channels.ClosedChannelException ex)
+ {
+ assert(false);
+ }
+ _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
+
+ if(TRACE_REGISTRATION)
+ {
+ trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
+ change.fd);
+ }
+
+ continue;
+ }
+ else // Removal if handler is not set.
+ {
+ HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd);
+ assert(pair != null);
+ handler = pair.handler;
+ finished = true;
+ pair.key.cancel();
+
+ if(TRACE_REGISTRATION)
+ {
+ trace("removed handler (" + handler.getClass().getName() + ") for fd " +
+ change.fd);
+ }
+
+ // Don't continue; we have to call
+ // finished() on the event handler below,
+ // outside the thread synchronization.
+ }
+ }
+ else
+ {
+ java.nio.channels.SelectionKey key = null;
+ java.util.Iterator iter = _keys.iterator();
+ while(iter.hasNext())
+ {
+ //
+ // Ignore selection keys that have been cancelled
+ //
+ java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)iter.next();
+ iter.remove();
+ if(k.isValid() && k != _fdIntrReadKey)
+ {
+ if(TRACE_SELECT)
+ {
+ trace("found a key: " + keyToString(k));
+ }
+
+ key = k;
+ break;
+ }
+ }
+
+ if(key == null)
+ {
+ if(TRACE_SELECT)
+ {
+ trace("didn't find a valid key");
+ }
+
+ continue;
+ }
+
+ handler = (EventHandler)key.attachment();
+ }
+ }
+ }
+ }
+
+ //
+ // Now we are outside the thread synchronization.
+ //
+
+ if(shutdown)
+ {
+ if(TRACE_SHUTDOWN)
+ {
+ trace("shutdown detected");
+ }
+
+ //
+ // Initiate server shutdown.
+ //
+ ObjectAdapterFactory factory;
+ try
+ {
+ factory = _instance.objectAdapterFactory();
+ }
+ catch(Ice.CommunicatorDestroyedException e)
+ {
+ continue;
+ }
+
+ promoteFollower();
+ factory.shutdown();
+
+ //
+ // No "continue", because we want shutdown to be done in
+ // its own thread from this pool. Therefore we called
+ // promoteFollower().
+ //
+ }
+ else
+ {
+ assert(handler != null);
+
+ if(finished)
+ {
+ //
+ // Notify a handler about its removal from
+ // the thread pool.
+ //
+ try
+ {
+ handler.finished(this);
+ }
+ catch(Ice.LocalException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "exception in `" + _prefix + "' while calling finished():\n" +
+ sw.toString() + "\n" + handler.toString();
+ _instance.initializationData().logger.error(s);
+ }
+
+ //
+ // No "continue", because we want finished() to be
+ // called in its own thread from this pool. Note
+ // that this means that finished() must call
+ // promoteFollower().
+ //
+ }
+ else
+ {
+ //
+ // If the handler is "readable", try to read a
+ // message.
+ //
+ try
+ {
+ if(handler.readable())
+ {
+ try
+ {
+ //
+ // If read returns true, the handler has more data for the thread pool
+ // to process.
+ //
+ if(read(handler))
+ {
+ _pendingHandlers.add(handler);
+ }
+ }
+ catch(Ice.TimeoutException ex) // Expected.
+ {
+ continue;
+ }
+ catch(Ice.DatagramLimitException ex) // Expected.
+ {
+ continue;
+ }
+ catch(Ice.SocketException ex)
+ {
+ if(TRACE_EXCEPTION)
+ {
+ trace("informing handler (" + handler.getClass().getName() +
+ ") about exception " + ex);
+ ex.printStackTrace();
+ }
+
+ handler.exception(ex);
+ continue;
+ }
+ catch(Ice.LocalException ex)
+ {
+ if(handler.datagram())
+ {
+ if(_instance.initializationData().properties.getPropertyAsInt(
+ "Ice.Warn.Connections") > 0)
+ {
+ _instance.initializationData().logger.warning(
+ "datagram connection exception:\n" + ex + "\n" + handler.toString());
+ }
+ }
+ else
+ {
+ if(TRACE_EXCEPTION)
+ {
+ trace("informing handler (" + handler.getClass().getName() +
+ ") about exception " + ex);
+ ex.printStackTrace();
+ }
+
+ handler.exception(ex);
+ }
+ continue;
+ }
+
+ stream.swap(handler._stream);
+ assert(stream.pos() == stream.size());
+ }
+
+ //
+ // Provide a new message to the handler.
+ //
+ try
+ {
+ handler.message(stream, this);
+ }
+ catch(Ice.LocalException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "exception in `" + _prefix + "' while calling message():\n" +
+ sw.toString() + "\n" + handler.toString();
+ _instance.initializationData().logger.error(s);
+ }
+
+ //
+ // No "continue", because we want message() to
+ // be called in its own thread from this
+ // pool. Note that this means that message()
+ // must call promoteFollower().
+ //
+ }
+ finally
+ {
+ stream.reset();
+ }
+ }
+ }
+
+ if(_sizeMax > 1)
+ {
+ synchronized(this)
+ {
+ if(!_destroyed)
+ {
+ //
+ // First we reap threads that have been
+ // destroyed before.
+ //
+ int sz = _threads.size();
+ assert(_running <= sz);
+ if(_running < sz)
+ {
+ java.util.Iterator i = _threads.iterator();
+ while(i.hasNext())
+ {
+ EventHandlerThread thread = (EventHandlerThread)i.next();
+
+ if(!thread.isAlive())
+ {
+ try
+ {
+ thread.join();
+ i.remove();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+ }
+ }
+
+ //
+ // Now we check if this thread can be destroyed, based
+ // on a load factor.
+ //
+
+ //
+ // The load factor jumps immediately to the number of
+ // threads that are currently in use, but decays
+ // exponentially if the number of threads in use is
+ // smaller than the load factor. This reflects that we
+ // create threads immediately when they are needed,
+ // but want the number of threads to slowly decline to
+ // the configured minimum.
+ //
+ double inUse = (double)_inUse;
+ if(_load < inUse)
+ {
+ _load = inUse;
+ }
+ else
+ {
+ final double loadFactor = 0.05; // TODO: Configurable?
+ final double oneMinusLoadFactor = 1 - loadFactor;
+ _load = _load * oneMinusLoadFactor + _inUse * loadFactor;
+ }
+
+ if(_running > _size)
+ {
+ int load = (int)(_load + 0.5);
+
+ //
+ // We add one to the load factor because one
+ // additional thread is needed for select().
+ //
+ if(load + 1 < _running)
+ {
+ assert(_inUse > 0);
+ --_inUse;
+
+ assert(_running > 0);
+ --_running;
+
+ return false;
+ }
+ }
+
+ assert(_inUse > 0);
+ --_inUse;
+ }
+
+ while(!_promote)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ _promote = false;
+ }
+
+ if(TRACE_THREAD)
+ {
+ trace("thread " + Thread.currentThread() + " has the lock");
+ }
+ }
}
}
private boolean
read(EventHandler handler)
{
- boolean moreData = false;
+ boolean moreData = false;
BasicStream stream = handler._stream;
@@ -931,50 +931,50 @@ public final class ThreadPool
}
int pos = stream.pos();
- if(pos < Protocol.headerSize)
- {
- //
- // This situation is possible for small UDP packets.
- //
- throw new Ice.IllegalMessageSizeException();
- }
+ if(pos < Protocol.headerSize)
+ {
+ //
+ // This situation is possible for small UDP packets.
+ //
+ throw new Ice.IllegalMessageSizeException();
+ }
stream.pos(0);
- byte[] m = new byte[4];
- m[0] = stream.readByte();
- m[1] = stream.readByte();
- m[2] = stream.readByte();
- m[3] = stream.readByte();
- if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1]
- || m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
- {
- Ice.BadMagicException ex = new Ice.BadMagicException();
- ex.badMagic = m;
- throw ex;
- }
-
- byte pMajor = stream.readByte();
- byte pMinor = stream.readByte();
- if(pMajor != Protocol.protocolMajor || pMinor > Protocol.protocolMinor)
- {
- Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException();
- e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor;
- e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor;
- e.major = Protocol.protocolMajor;
- e.minor = Protocol.protocolMinor;
- throw e;
- }
-
- byte eMajor = stream.readByte();
- byte eMinor = stream.readByte();
- if(eMajor != Protocol.encodingMajor || eMinor > Protocol.encodingMinor)
- {
- Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException();
- e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor;
- e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor;
- e.major = Protocol.encodingMajor;
- e.minor = Protocol.encodingMinor;
- throw e;
- }
+ byte[] m = new byte[4];
+ m[0] = stream.readByte();
+ m[1] = stream.readByte();
+ m[2] = stream.readByte();
+ m[3] = stream.readByte();
+ if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1]
+ || m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
+ {
+ Ice.BadMagicException ex = new Ice.BadMagicException();
+ ex.badMagic = m;
+ throw ex;
+ }
+
+ byte pMajor = stream.readByte();
+ byte pMinor = stream.readByte();
+ if(pMajor != Protocol.protocolMajor || pMinor > Protocol.protocolMinor)
+ {
+ Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException();
+ e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor;
+ e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor;
+ e.major = Protocol.protocolMajor;
+ e.minor = Protocol.protocolMinor;
+ throw e;
+ }
+
+ byte eMajor = stream.readByte();
+ byte eMinor = stream.readByte();
+ if(eMajor != Protocol.encodingMajor || eMinor > Protocol.encodingMinor)
+ {
+ Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException();
+ e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor;
+ e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor;
+ e.major = Protocol.encodingMajor;
+ e.minor = Protocol.encodingMinor;
+ throw e;
+ }
byte messageType = stream.readByte();
byte compress = stream.readByte();
@@ -994,26 +994,26 @@ public final class ThreadPool
stream.pos(pos);
if(stream.pos() != stream.size())
- {
- if(handler.datagram())
- {
- if(_warnUdp)
- {
- _instance.initializationData().logger.warning("DatagramLimitException: maximum size of "
- + stream.pos() + " exceeded");
- }
- stream.pos(0);
- stream.resize(0, true);
- throw new Ice.DatagramLimitException();
- }
- else
- {
- moreData = handler.read(stream);
- assert(stream.pos() == stream.size());
- }
+ {
+ if(handler.datagram())
+ {
+ if(_warnUdp)
+ {
+ _instance.initializationData().logger.warning("DatagramLimitException: maximum size of "
+ + stream.pos() + " exceeded");
+ }
+ stream.pos(0);
+ stream.resize(0, true);
+ throw new Ice.DatagramLimitException();
+ }
+ else
+ {
+ moreData = handler.read(stream);
+ assert(stream.pos() == stream.size());
+ }
}
- return moreData;
+ return moreData;
}
/*
@@ -1070,13 +1070,13 @@ public final class ThreadPool
Ice.SocketException se = new Ice.SocketException();
se.initCause(ex);
//throw se;
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- se.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "':\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- continue;
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ se.printStackTrace(pw);
+ pw.flush();
+ String s = "exception in `" + _prefix + "':\n" + sw.toString();
+ _instance.initializationData().logger.error(s);
+ continue;
}
}
}
@@ -1096,14 +1096,14 @@ public final class ThreadPool
trace("select on " + _selector.keys().size() + " keys, thread id = " + Thread.currentThread());
}
- if(_timeout > 0)
- {
- ret = _selector.select(_timeout * 1000);
- }
- else
- {
- ret = _selector.select();
- }
+ if(_timeout > 0)
+ {
+ ret = _selector.select(_timeout * 1000);
+ }
+ else
+ {
+ ret = _selector.select();
+ }
}
catch(java.io.IOException ex)
{
@@ -1121,13 +1121,13 @@ public final class ThreadPool
Ice.SocketException se = new Ice.SocketException();
se.initCause(ex);
//throw se;
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- se.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "':\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- continue;
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ se.printStackTrace(pw);
+ pw.flush();
+ String s = "exception in `" + _prefix + "':\n" + sw.toString();
+ _instance.initializationData().logger.error(s);
+ continue;
}
if(TRACE_SELECT)
@@ -1228,18 +1228,18 @@ public final class ThreadPool
public void
run()
{
- if(_instance.initializationData().threadHook != null)
- {
- _instance.initializationData().threadHook.start();
- }
+ if(_instance.initializationData().threadHook != null)
+ {
+ _instance.initializationData().threadHook.start();
+ }
BasicStream stream = new BasicStream(_instance);
- boolean promote;
+ boolean promote;
try
{
- promote = ThreadPool.this.run(stream);
+ promote = ThreadPool.this.run(stream);
}
catch(Ice.LocalException ex)
{
@@ -1249,7 +1249,7 @@ public final class ThreadPool
pw.flush();
String s = "exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString();
_instance.initializationData().logger.error(s);
- promote = true;
+ promote = true;
}
catch(java.lang.Exception ex)
{
@@ -1259,32 +1259,32 @@ public final class ThreadPool
pw.flush();
String s = "unknown exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString();
_instance.initializationData().logger.error(s);
- promote = true;
+ promote = true;
}
- if(promote && _sizeMax > 1)
- {
- //
- // Promote a follower, but w/o modifying _inUse or
- // creating new threads.
- //
- synchronized(ThreadPool.this)
- {
- assert(!_promote);
- _promote = true;
- ThreadPool.this.notify();
- }
- }
+ if(promote && _sizeMax > 1)
+ {
+ //
+ // Promote a follower, but w/o modifying _inUse or
+ // creating new threads.
+ //
+ synchronized(ThreadPool.this)
+ {
+ assert(!_promote);
+ _promote = true;
+ ThreadPool.this.notify();
+ }
+ }
if(TRACE_THREAD)
{
trace("run() terminated");
}
- if(_instance.initializationData().threadHook != null)
- {
- _instance.initializationData().threadHook.stop();
- }
+ if(_instance.initializationData().threadHook != null)
+ {
+ _instance.initializationData().threadHook.stop();
+ }
}
}