summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2007-11-27 11:58:35 +0100
committerBenoit Foucher <benoit@zeroc.com>2007-11-27 11:58:35 +0100
commit47f800495093fd7679a315e2d730fea22f6135b7 (patch)
treea7b8d3488f3841367dd03d10cae293f36fd10481 /java/src/IceInternal/ThreadPool.java
parentFixed SystemException to no longer derive from LocalException (diff)
downloadice-47f800495093fd7679a315e2d730fea22f6135b7.tar.bz2
ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.xz
ice-47f800495093fd7679a315e2d730fea22f6135b7.zip
- Added support for non-blocking AMI/batch requests, connection
creation. - Added support for AMI oneway requests. - Changed collocation optimization to not perform any DNS lookups.
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java173
1 files changed, 120 insertions, 53 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 30ed86a3082..6306ca10475 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -146,6 +146,7 @@ public final class ThreadPool
assert(!_destroyed);
assert(_handlerMap.isEmpty());
assert(_changes.isEmpty());
+ assert(_workItems.isEmpty());
_destroyed = true;
setInterrupt();
}
@@ -193,6 +194,14 @@ public final class ThreadPool
setInterrupt();
}
+ public synchronized void
+ execute(ThreadPoolWorkItem workItem)
+ {
+ assert(!_destroyed);
+ _workItems.add(workItem);
+ setInterrupt();
+ }
+
public void
promoteFollower()
{
@@ -468,6 +477,7 @@ public final class ThreadPool
}
EventHandler handler = null;
+ ThreadPoolWorkItem workItem = null;
//
// Only call select() if there are no pending handlers with additional data
@@ -510,14 +520,15 @@ public final class ThreadPool
}
//
- // There are two possiblities for an interrupt:
+ // There are three possiblities for an interrupt:
//
// 1. The thread pool has been destroyed.
//
// 2. An event handler was registered or unregistered.
//
-
+ // 3. A work item has been scheduled.
//
+
// Thread pool destroyed?
//
if(_destroyed)
@@ -547,57 +558,79 @@ public final class ThreadPool
// An event handler must have been registered
// or unregistered.
//
- assert(!_changes.isEmpty());
- FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
-
- if(change.handler != null) // Addition if handler is set.
+ if(!_changes.isEmpty())
{
- int op;
- if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
- {
- op = java.nio.channels.SelectionKey.OP_READ;
- }
- else
+ FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
+
+ if(change.handler != null) // Addition if handler is set.
{
- op = java.nio.channels.SelectionKey.OP_ACCEPT;
- }
+ int op;
+ if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
+ {
+ op = java.nio.channels.SelectionKey.OP_READ;
+ }
+ else
+ {
+ op = java.nio.channels.SelectionKey.OP_ACCEPT;
+ }
- java.nio.channels.SelectionKey key = null;
- try
- {
- key = change.fd.register(_selector, op, change.handler);
+ java.nio.channels.SelectionKey key = null;
+ try
+ {
+ key = change.fd.register(_selector, op, change.handler);
+ }
+ catch(java.nio.channels.ClosedChannelException ex)
+ {
+ //
+ // This is expected if the transceiver finishConnect() call failed
+ // and the connection is a background connection.
+ //
+ }
+ _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
+
+ //
+ // If the handler is readable and already has some data to read add it
+ // to the _pendingHandlers list to ensure it will be processed.
+ //
+ if(change.handler.readable() && change.handler.hasMoreData())
+ {
+ _pendingHandlers.add(change.handler);
+ }
+
+ if(TRACE_REGISTRATION)
+ {
+ trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
+ change.fd);
+ }
+
+ continue;
}
- catch(java.nio.channels.ClosedChannelException ex)
+ else // Removal if handler is not set.
{
- assert(false);
- }
- _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
+ HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd);
+ assert(pair != null);
+ handler = pair.handler;
+ finished = true;
+ if(pair.key != null)
+ {
+ pair.key.cancel();
+ }
- if(TRACE_REGISTRATION)
- {
- trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
- change.fd);
- }
+ if(TRACE_REGISTRATION)
+ {
+ trace("removed handler (" + handler.getClass().getName() + ") for fd " +
+ change.fd);
+ }
- continue;
+ // Don't continue; we have to call
+ // finished() on the event handler below,
+ // outside the thread synchronization.
+ }
}
- else // Removal if handler is not set.
+ else
{
- HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd);
- assert(pair != null);
- handler = pair.handler;
- finished = true;
- pair.key.cancel();
-
- if(TRACE_REGISTRATION)
- {
- trace("removed handler (" + handler.getClass().getName() + ") for fd " +
- change.fd);
- }
-
- // Don't continue; we have to call
- // finished() on the event handler below,
- // outside the thread synchronization.
+ assert(!_workItems.isEmpty());
+ workItem = (ThreadPoolWorkItem)_workItems.removeFirst();
}
}
else
@@ -672,6 +705,29 @@ public final class ThreadPool
// promoteFollower().
//
}
+ else if(workItem != null)
+ {
+ try
+ {
+ workItem.execute(this);
+ }
+ catch(Ice.LocalException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "exception in `" + _prefix + "' while calling execute():\n" + sw.toString();
+ _instance.initializationData().logger.error(s);
+ }
+
+ //
+ // No "continue", because we want execute() to
+ // be called in its own thread from this
+ // pool. Note that this means that execute()
+ // must call promoteFollower().
+ //
+ }
else
{
assert(handler != null);
@@ -716,17 +772,23 @@ public final class ThreadPool
{
try
{
+ if(!read(handler))
+ {
+ continue; // Can't read without blocking.
+ }
+
//
- // If read returns true, the handler has more data for the thread pool
- // to process.
+ // If the handler has more data to process add it to the _pendingHandlers list
+ // to ensure it will be processed.
//
- if(read(handler))
+ if(handler.hasMoreData())
{
_pendingHandlers.add(handler);
}
}
- catch(Ice.TimeoutException ex) // Expected.
+ catch(Ice.TimeoutException ex)
{
+ assert(false); // This shouldn't occur as we only perform non-blocking reads.
continue;
}
catch(Ice.DatagramLimitException ex) // Expected.
@@ -914,8 +976,6 @@ public final class ThreadPool
private boolean
read(EventHandler handler)
{
- boolean moreData = false;
-
BasicStream stream = handler._stream;
if(stream.size() == 0)
@@ -926,7 +986,10 @@ public final class ThreadPool
if(stream.pos() != stream.size())
{
- moreData = handler.read(stream);
+ if(!handler.read(stream))
+ {
+ return false;
+ }
assert(stream.pos() == stream.size());
}
@@ -1008,12 +1071,15 @@ public final class ThreadPool
}
else
{
- moreData = handler.read(stream);
+ if(!handler.read(stream))
+ {
+ return false;
+ }
assert(stream.pos() == stream.size());
}
}
- return moreData;
+ return true;
}
/*
@@ -1205,6 +1271,7 @@ public final class ThreadPool
private java.util.Set _keys;
private java.util.LinkedList _changes = new java.util.LinkedList();
+ private java.util.LinkedList _workItems = new java.util.LinkedList();
private java.util.HashMap _handlerMap = new java.util.HashMap();