diff options
author | Benoit Foucher <benoit@zeroc.com> | 2007-11-27 11:58:35 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2007-11-27 11:58:35 +0100 |
commit | 47f800495093fd7679a315e2d730fea22f6135b7 (patch) | |
tree | a7b8d3488f3841367dd03d10cae293f36fd10481 /java/src/IceInternal/ThreadPool.java | |
parent | Fixed SystemException to no longer derive from LocalException (diff) | |
download | ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.bz2 ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.xz ice-47f800495093fd7679a315e2d730fea22f6135b7.zip |
- Added support for non-blocking AMI/batch requests, connection
creation.
- Added support for AMI oneway requests.
- Changed collocation optimization to not perform any DNS lookups.
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 173 |
1 files changed, 120 insertions, 53 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 30ed86a3082..6306ca10475 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -146,6 +146,7 @@ public final class ThreadPool assert(!_destroyed); assert(_handlerMap.isEmpty()); assert(_changes.isEmpty()); + assert(_workItems.isEmpty()); _destroyed = true; setInterrupt(); } @@ -193,6 +194,14 @@ public final class ThreadPool setInterrupt(); } + public synchronized void + execute(ThreadPoolWorkItem workItem) + { + assert(!_destroyed); + _workItems.add(workItem); + setInterrupt(); + } + public void promoteFollower() { @@ -468,6 +477,7 @@ public final class ThreadPool } EventHandler handler = null; + ThreadPoolWorkItem workItem = null; // // Only call select() if there are no pending handlers with additional data @@ -510,14 +520,15 @@ public final class ThreadPool } // - // There are two possiblities for an interrupt: + // There are three possiblities for an interrupt: // // 1. The thread pool has been destroyed. // // 2. An event handler was registered or unregistered. // - + // 3. A work item has been scheduled. // + // Thread pool destroyed? // if(_destroyed) @@ -547,57 +558,79 @@ public final class ThreadPool // An event handler must have been registered // or unregistered. // - assert(!_changes.isEmpty()); - FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); - - if(change.handler != null) // Addition if handler is set. + if(!_changes.isEmpty()) { - int op; - if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) - { - op = java.nio.channels.SelectionKey.OP_READ; - } - else + FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); + + if(change.handler != null) // Addition if handler is set. { - op = java.nio.channels.SelectionKey.OP_ACCEPT; - } + int op; + if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) + { + op = java.nio.channels.SelectionKey.OP_READ; + } + else + { + op = java.nio.channels.SelectionKey.OP_ACCEPT; + } - java.nio.channels.SelectionKey key = null; - try - { - key = change.fd.register(_selector, op, change.handler); + java.nio.channels.SelectionKey key = null; + try + { + key = change.fd.register(_selector, op, change.handler); + } + catch(java.nio.channels.ClosedChannelException ex) + { + // + // This is expected if the transceiver finishConnect() call failed + // and the connection is a background connection. + // + } + _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key)); + + // + // If the handler is readable and already has some data to read add it + // to the _pendingHandlers list to ensure it will be processed. + // + if(change.handler.readable() && change.handler.hasMoreData()) + { + _pendingHandlers.add(change.handler); + } + + if(TRACE_REGISTRATION) + { + trace("added handler (" + change.handler.getClass().getName() + ") for fd " + + change.fd); + } + + continue; } - catch(java.nio.channels.ClosedChannelException ex) + else // Removal if handler is not set. { - assert(false); - } - _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key)); + HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd); + assert(pair != null); + handler = pair.handler; + finished = true; + if(pair.key != null) + { + pair.key.cancel(); + } - if(TRACE_REGISTRATION) - { - trace("added handler (" + change.handler.getClass().getName() + ") for fd " + - change.fd); - } + if(TRACE_REGISTRATION) + { + trace("removed handler (" + handler.getClass().getName() + ") for fd " + + change.fd); + } - continue; + // Don't continue; we have to call + // finished() on the event handler below, + // outside the thread synchronization. + } } - else // Removal if handler is not set. + else { - HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd); - assert(pair != null); - handler = pair.handler; - finished = true; - pair.key.cancel(); - - if(TRACE_REGISTRATION) - { - trace("removed handler (" + handler.getClass().getName() + ") for fd " + - change.fd); - } - - // Don't continue; we have to call - // finished() on the event handler below, - // outside the thread synchronization. + assert(!_workItems.isEmpty()); + workItem = (ThreadPoolWorkItem)_workItems.removeFirst(); } } else @@ -672,6 +705,29 @@ public final class ThreadPool // promoteFollower(). // } + else if(workItem != null) + { + try + { + workItem.execute(this); + } + catch(Ice.LocalException ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "exception in `" + _prefix + "' while calling execute():\n" + sw.toString(); + _instance.initializationData().logger.error(s); + } + + // + // No "continue", because we want execute() to + // be called in its own thread from this + // pool. Note that this means that execute() + // must call promoteFollower(). + // + } else { assert(handler != null); @@ -716,17 +772,23 @@ public final class ThreadPool { try { + if(!read(handler)) + { + continue; // Can't read without blocking. + } + // - // If read returns true, the handler has more data for the thread pool - // to process. + // If the handler has more data to process add it to the _pendingHandlers list + // to ensure it will be processed. // - if(read(handler)) + if(handler.hasMoreData()) { _pendingHandlers.add(handler); } } - catch(Ice.TimeoutException ex) // Expected. + catch(Ice.TimeoutException ex) { + assert(false); // This shouldn't occur as we only perform non-blocking reads. continue; } catch(Ice.DatagramLimitException ex) // Expected. @@ -914,8 +976,6 @@ public final class ThreadPool private boolean read(EventHandler handler) { - boolean moreData = false; - BasicStream stream = handler._stream; if(stream.size() == 0) @@ -926,7 +986,10 @@ public final class ThreadPool if(stream.pos() != stream.size()) { - moreData = handler.read(stream); + if(!handler.read(stream)) + { + return false; + } assert(stream.pos() == stream.size()); } @@ -1008,12 +1071,15 @@ public final class ThreadPool } else { - moreData = handler.read(stream); + if(!handler.read(stream)) + { + return false; + } assert(stream.pos() == stream.size()); } } - return moreData; + return true; } /* @@ -1205,6 +1271,7 @@ public final class ThreadPool private java.util.Set _keys; private java.util.LinkedList _changes = new java.util.LinkedList(); + private java.util.LinkedList _workItems = new java.util.LinkedList(); private java.util.HashMap _handlerMap = new java.util.HashMap(); |