summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2008-03-06 10:13:42 +0100
committerBenoit Foucher <benoit@zeroc.com>2008-03-06 10:13:42 +0100
commitc6dbd090d9691cc0116a2967b2827b858b184dfe (patch)
tree6d2ad80c98665c9090b16f97c400ab4b33c7ab73 /java/src/IceInternal/ThreadPool.java
parentMerge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff)
downloadice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.bz2
ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.xz
ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.zip
Removed thread-per-connection and added serialize mode
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java660
1 files changed, 142 insertions, 518 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index f38ea735b6f..6f19615ecf0 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -26,11 +26,13 @@ public final class ThreadPool
_destroyed = false;
_prefix = prefix;
_timeout = timeout;
+ _selector = new Selector(instance, timeout);
_threadIndex = 0;
_running = 0;
_inUse = 0;
_load = 1.0;
_promote = true;
+ _serialize = _instance.initializationData().properties.getPropertyAsInt(_prefix + ".Serialize") > 0;
_warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
String programName = _instance.initializationData().properties.getProperty("Ice.ProgramName");
@@ -43,29 +45,6 @@ public final class ThreadPool
_programNamePrefix = "";
}
- Network.SocketPair pair = Network.createPipe();
- _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
- _fdIntrWrite = pair.sink;
-
- try
- {
- _selector = java.nio.channels.Selector.open();
- pair.source.configureBlocking(false);
- _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ);
- }
- catch(java.io.IOException ex)
- {
- Ice.SyscallException sys = new Ice.SyscallException();
- sys.initCause(ex);
- throw sys;
- }
-
- //
- // The Selector holds a Set representing the selected keys. The
- // Set reference doesn't change, so we obtain it once here.
- //
- _keys = _selector.selectedKeys();
-
//
// We use just one thread as the default. This is the fastest
// possible setting, still allows one level of nesting, and
@@ -144,53 +123,71 @@ public final class ThreadPool
}
assert(!_destroyed);
- assert(_handlerMap.isEmpty());
_destroyed = true;
- setInterrupt();
+ _selector.setInterrupt();
}
public synchronized void
- _register(java.nio.channels.SelectableChannel fd, EventHandler handler)
+ _register(EventHandler handler)
{
- if(TRACE_REGISTRATION)
+ assert(!_destroyed);
+
+ if(!handler._registered)
{
- trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd);
+ if(TRACE_REGISTRATION)
+ {
+ trace("adding handler of type " + handler.getClass().getName() + " for channel " + handler.fd());
+ }
+
+ if(!handler._serializing)
+ {
+ _selector.add(handler, SocketStatus.NeedRead);
+ }
+ handler._registered = true;
}
- assert(!_destroyed);
- _changes.add(new FdHandlerPair(fd, handler));
- setInterrupt();
}
public synchronized void
- unregister(java.nio.channels.SelectableChannel fd)
+ unregister(EventHandler handler)
{
- if(TRACE_REGISTRATION)
+ assert(!_destroyed);
+ if(handler._registered)
{
- if(TRACE_STACK_TRACE)
+ if(TRACE_REGISTRATION)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- try
- {
- throw new RuntimeException();
- }
- catch(RuntimeException ex)
- {
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- }
- trace("removing handler for channel " + fd + "\n" + sw.toString());
+ trace("removing handler for channel " + handler.fd());
}
- else
+
+ if(!handler._serializing)
{
- trace("removing handler for channel " + fd);
+ _selector.remove(handler);
}
+ handler._registered = false;
}
+ }
+ public synchronized void
+ finish(EventHandler handler)
+ {
assert(!_destroyed);
- _changes.add(new FdHandlerPair(fd, null));
- setInterrupt();
- }
+
+ if(TRACE_REGISTRATION)
+ {
+ trace("finishing handler for channel " + handler.fd());
+ }
+
+ if(handler._registered)
+ {
+ if(!handler._serializing)
+ {
+ _selector.remove(handler);
+ }
+ handler._registered = false;
+ }
+
+ _finished.add(handler);
+ _selector.setInterrupt();
+ }
public synchronized void
execute(ThreadPoolWorkItem workItem)
@@ -200,16 +197,25 @@ public final class ThreadPool
throw new Ice.CommunicatorDestroyedException();
}
_workItems.add(workItem);
- setInterrupt();
+ _selector.setInterrupt();
}
public void
- promoteFollower()
+ promoteFollower(EventHandler handler)
{
if(_sizeMax > 1)
{
synchronized(this)
{
+ if(_serialize && handler != null)
+ {
+ handler._serializing = true;
+ if(handler._registered)
+ {
+ _selector.remove(handler);
+ }
+ }
+
assert(!_promote);
_promote = true;
notify();
@@ -280,65 +286,9 @@ public final class ThreadPool
}
//
- // Cleanup the selector, and the socket pair.
+ // Destroy the selector
//
- try
- {
- if(_selector != null)
- {
- try
- {
- _selector.close();
- }
- catch(java.io.IOException ex)
- {
- //
- // BUGFIX:
- //
- // Ignore this exception. This shouldn't happen
- // but for some reasons the close() call raises
- // "java.io.IOException: Bad file descriptor" on
- // Mac OS X 10.3.x (it works fine on OS X 10.4.x)
- //
- }
- _selector = null;
- }
-
- if(_fdIntrWrite != null)
- {
- try
- {
- _fdIntrWrite.close();
- }
- catch(java.io.IOException ex)
- {
- //
- // BUGFIX:
- //
- // Ignore this exception. This shouldn't happen
- // but for some reasons the close() call raises
- // "java.io.IOException: No such file or
- // directory" under Linux with JDK 1.4.2.
- //
- }
- _fdIntrWrite = null;
- }
-
- if(_fdIntrRead != null)
- {
- _fdIntrRead.close();
- _fdIntrRead = null;
- }
- }
- catch(java.io.IOException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "' while calling close():\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- }
+ _selector.destroy();
}
public String
@@ -346,91 +296,6 @@ public final class ThreadPool
{
return _prefix;
}
-
- private void
- clearInterrupt()
- {
- if(TRACE_INTERRUPT)
- {
- trace("clearInterrupt");
- if(TRACE_STACK_TRACE)
- {
- try
- {
- throw new RuntimeException();
- }
- catch(RuntimeException ex)
- {
- ex.printStackTrace();
- }
- }
- }
-
- byte b = 0;
-
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
- try
- {
- while(true)
- {
- buf.rewind();
- if(_fdIntrRead.read(buf) != 1)
- {
- break;
- }
-
- if(TRACE_INTERRUPT)
- {
- trace("clearInterrupt got byte " + (int)buf.get(0));
- }
-
- b = buf.get(0);
- break;
- }
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
-
- private void
- setInterrupt()
- {
- if(TRACE_INTERRUPT)
- {
- trace("setInterrupt()");
- if(TRACE_STACK_TRACE)
- {
- try
- {
- throw new RuntimeException();
- }
- catch(RuntimeException ex)
- {
- ex.printStackTrace();
- }
- }
- }
-
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
- buf.put(0, (byte)0);
- while(buf.hasRemaining())
- {
- try
- {
- _fdIntrWrite.write(buf);
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
- }
//
// Each thread supplies a BasicStream, to avoid creating excessive
@@ -465,206 +330,85 @@ public final class ThreadPool
while(true)
{
- if(TRACE_REGISTRATION)
- {
- java.util.Set<java.nio.channels.SelectionKey> keys = _selector.keys();
- trace("selecting on " + keys.size() + " channels:");
- java.util.Iterator<java.nio.channels.SelectionKey> i = keys.iterator();
- while(i.hasNext())
- {
- java.nio.channels.SelectionKey key = i.next();
- trace(" " + key.channel());
- }
- }
-
- EventHandler handler = null;
- ThreadPoolWorkItem workItem = null;
-
- //
- // Only call select() if there are no pending handlers with additional data
- // for us to read.
- //
- if(!_pendingHandlers.isEmpty())
+ try
{
- handler = _pendingHandlers.removeFirst();
+ _selector.select();
}
- else
+ catch(java.io.IOException ex)
{
- select();
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ //throw se;
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ se.printStackTrace(pw);
+ pw.flush();
+ String s = "exception in `" + _prefix + "':\n" + sw.toString();
+ _instance.initializationData().logger.error(s);
+ continue;
}
+ EventHandler handler = null;
+ ThreadPoolWorkItem workItem = null;
boolean finished = false;
boolean shutdown = false;
- if(handler == null)
+ synchronized(this)
{
- synchronized(this)
+ if(_selector.checkTimeout())
+ {
+ assert(_timeout > 0);
+ shutdown = true;
+ }
+ else if(_selector.isInterrupted())
{
- if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout.
+ if(_selector.processInterrupt())
{
- if(TRACE_SELECT)
- {
- trace("timeout");
- }
-
- assert(_timeout > 0);
- _timeout = 0;
- shutdown = true;
+ continue;
+ }
+
+ //
+ // There are three possiblities for an interrupt:
+ //
+ // 1. The thread pool has been destroyed.
+ //
+ // 2. An event handler is being finished.
+ //
+ // 3. A work item has been scheduled.
+ //
+
+ if(!_finished.isEmpty())
+ {
+ _selector.clearInterrupt();
+ handler = _finished.removeFirst();
+ finished = true;
+ }
+ else if(!_workItems.isEmpty())
+ {
+ //
+ // Work items must be executed first even if the thread pool is destroyed.
+ //
+ _selector.clearInterrupt();
+ workItem = _workItems.removeFirst();
+ }
+ else if(_destroyed)
+ {
+ //
+ // Don't clear the interrupt if destroyed, so that the other threads exit as well.
+ //
+ return true;
}
else
{
- if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable())
- {
- if(TRACE_SELECT || TRACE_INTERRUPT)
- {
- trace("detected interrupt");
- }
-
- //
- // There are three possiblities for an interrupt:
- //
- // 1. The thread pool has been destroyed.
- //
- // 2. An event handler was registered or unregistered.
- //
- // 3. A work item has been scheduled.
- //
-
- if(!_workItems.isEmpty())
- {
- //
- // Work items must be executed first even if the thread pool is destroyed.
- //
-
- //
- // Remove the interrupt channel from the selected key set.
- //
- _keys.remove(_fdIntrReadKey);
- clearInterrupt();
- assert(!_workItems.isEmpty());
- workItem = _workItems.removeFirst();
- }
- else if(_destroyed)
- {
- if(TRACE_SHUTDOWN)
- {
- trace("destroyed, thread id = " + Thread.currentThread());
- }
-
- //
- // Don't clear the interrupt fd if destroyed, so that the other threads exit as well.
- //
- return true;
- }
- else
- {
- //
- // Remove the interrupt channel from the selected key set.
- //
- _keys.remove(_fdIntrReadKey);
- clearInterrupt();
-
- //
- // An event handler must have been registered or unregistered.
- //
- assert(!_changes.isEmpty());
- FdHandlerPair change = _changes.removeFirst();
-
- if(change.handler != null) // Addition if handler is set.
- {
- int op;
- if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
- {
- op = java.nio.channels.SelectionKey.OP_READ;
- }
- else
- {
- op = java.nio.channels.SelectionKey.OP_ACCEPT;
- }
-
- java.nio.channels.SelectionKey key = null;
- try
- {
- key = change.fd.register(_selector, op, change.handler);
- }
- catch(java.nio.channels.ClosedChannelException ex)
- {
- assert(false);
- }
- _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
-
- //
- // If the handler is readable and already has some data to read add it
- // to the _pendingHandlers list to ensure it will be processed.
- //
- if(change.handler.readable() && change.handler.hasMoreData())
- {
- _pendingHandlers.add(change.handler);
- }
-
- if(TRACE_REGISTRATION)
- {
- trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
- change.fd);
- }
-
- continue;
- }
- else // Removal if handler is not set.
- {
- HandlerKeyPair pair = _handlerMap.remove(change.fd);
- assert(pair != null);
- handler = pair.handler;
- finished = true;
- pair.key.cancel();
-
- if(TRACE_REGISTRATION)
- {
- trace("removed handler (" + handler.getClass().getName() + ") for fd " +
- change.fd);
- }
-
- // Don't continue; we have to call
- // finished() on the event handler below,
- // outside the thread synchronization.
- }
- }
- }
- else
- {
- java.nio.channels.SelectionKey key = null;
- java.util.Iterator<java.nio.channels.SelectionKey> iter = _keys.iterator();
- while(iter.hasNext())
- {
- //
- // Ignore selection keys that have been cancelled
- //
- java.nio.channels.SelectionKey k = iter.next();
- iter.remove();
- if(k.isValid() && k != _fdIntrReadKey)
- {
- if(TRACE_SELECT)
- {
- trace("found a key: " + keyToString(k));
- }
-
- key = k;
- break;
- }
- }
-
- if(key == null)
- {
- if(TRACE_SELECT)
- {
- trace("didn't find a valid key");
- }
-
- continue;
- }
-
- handler = (EventHandler)key.attachment();
- }
+ assert(false);
+ }
+ }
+ else
+ {
+ handler = (EventHandler)_selector.getNextSelected();
+ if(handler == null)
+ {
+ continue;
}
}
}
@@ -675,11 +419,6 @@ public final class ThreadPool
if(shutdown)
{
- if(TRACE_SHUTDOWN)
- {
- trace("shutdown detected");
- }
-
//
// Initiate server shutdown.
//
@@ -693,7 +432,7 @@ public final class ThreadPool
continue;
}
- promoteFollower();
+ promoteFollower(null);
factory.shutdown();
//
@@ -732,8 +471,7 @@ public final class ThreadPool
if(finished)
{
//
- // Notify a handler about its removal from
- // the thread pool.
+ // Notify a handler about its removal from the thread pool.
//
try
{
@@ -773,14 +511,10 @@ public final class ThreadPool
{
continue; // Can't read without blocking.
}
-
- //
- // If the handler has more data to process add it to the _pendingHandlers list
- // to ensure it will be processed.
- //
+
if(handler.hasMoreData())
{
- _pendingHandlers.add(handler);
+ _selector.hasMoreData(handler);
}
}
catch(Ice.TimeoutException ex)
@@ -871,6 +605,15 @@ public final class ThreadPool
{
if(!_destroyed)
{
+ if(_serialize && handler != null && handler._serializing)
+ {
+ if(handler._registered)
+ {
+ _selector.add(handler, SocketStatus.NeedRead);
+ }
+ handler._serializing = false;
+ }
+
//
// First we reap threads that have been
// destroyed before.
@@ -1146,84 +889,6 @@ public final class ThreadPool
*/
private void
- select()
- {
- int ret = 0;
- int spuriousWakeUp = 0;
- while(true)
- {
- try
- {
- if(TRACE_SELECT)
- {
- trace("select on " + _selector.keys().size() + " keys, thread id = " + Thread.currentThread());
- }
-
- if(_timeout > 0)
- {
- ret = _selector.select(_timeout * 1000);
- }
- else
- {
- ret = _selector.select();
- }
- }
- catch(java.io.IOException ex)
- {
- //
- // Pressing Ctrl-C causes select() to raise an
- // IOException, which seems like a JDK bug. We trap
- // for that special case here and ignore it.
- // Hopefully we're not masking something important!
- //
- if(Network.interrupted(ex))
- {
- continue;
- }
-
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- //throw se;
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- se.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "':\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- continue;
- }
-
- if(TRACE_SELECT)
- {
- trace("select() returned " + ret + ", _keys.size() = " + _keys.size());
- }
-
- if(ret == 0 && _timeout <= 0)
- {
- //
- // This is necessary to prevent a busy loop in case of a spurious wake-up which
- // sometime occurs in the client thread pool when the communicator is destroyed.
- // If there are too many successive spurious wake-ups, we log an error.
- //
- try
- {
- Thread.currentThread().sleep(1);
- }
- catch(java.lang.InterruptedException ex)
- {
- }
-
- if(++spuriousWakeUp > 100)
- {
- _instance.initializationData().logger.error("spurious selector wake up in `" + _prefix + "'");
- }
- }
-
- break;
- }
- }
-
- private void
trace(String msg)
{
System.err.println(_prefix + ": " + msg);
@@ -1253,57 +918,15 @@ public final class ThreadPool
return key.channel() + " " + ops;
}
- private static final class FdHandlerPair
- {
- java.nio.channels.SelectableChannel fd;
- EventHandler handler;
-
- FdHandlerPair(java.nio.channels.SelectableChannel fd, EventHandler handler)
- {
- this.fd = fd;
- this.handler = handler;
- }
- }
-
- private static final class HandlerKeyPair
- {
- EventHandler handler;
- java.nio.channels.SelectionKey key;
-
- HandlerKeyPair(EventHandler handler, java.nio.channels.SelectionKey key)
- {
- this.handler = handler;
- this.key = key;
- }
- }
-
private Instance _instance;
private boolean _destroyed;
private final String _prefix;
private final String _programNamePrefix;
-
- private java.nio.channels.ReadableByteChannel _fdIntrRead;
- private java.nio.channels.SelectionKey _fdIntrReadKey;
- private java.nio.channels.WritableByteChannel _fdIntrWrite;
- private java.nio.channels.Selector _selector;
- private java.util.Set<java.nio.channels.SelectionKey> _keys;
-
- private java.util.LinkedList<FdHandlerPair> _changes = new java.util.LinkedList<FdHandlerPair>();
+ private final Selector _selector;
private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>();
-
- private java.util.Map<java.nio.channels.SelectableChannel, HandlerKeyPair> _handlerMap =
- new java.util.HashMap<java.nio.channels.SelectableChannel, HandlerKeyPair>();
-
+ private java.util.LinkedList<EventHandler> _finished = new java.util.LinkedList<EventHandler>();
private int _timeout;
- //
- // Since the Java5 SSL transceiver can read more data from the socket than is
- // actually requested, we have to keep a separate list of handlers that need
- // the thread pool to read more data before it re-enters a blocking call to
- // select().
- //
- private java.util.LinkedList<EventHandler> _pendingHandlers = new java.util.LinkedList<EventHandler>();
-
private final class EventHandlerThread extends Thread
{
EventHandlerThread(String name)
@@ -1377,6 +1000,7 @@ public final class ThreadPool
private final int _size; // Number of threads that are pre-created.
private final int _sizeMax; // Maximum number of threads.
private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.
+ private final boolean _serialize; // True if requests need to be serialized over the connection.
private java.util.List<EventHandlerThread> _threads; // All threads, running or not.
private int _threadIndex; // For assigning thread names.