diff options
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 152 |
1 files changed, 124 insertions, 28 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 03e3713fe93..8ab584e9623 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -25,7 +25,7 @@ public final class ThreadPool } } } - + static final class FinishedWorkItem implements ThreadPoolWorkItem { public @@ -60,6 +60,16 @@ public final class ThreadPool private final EventHandlerThread _thread; } + static final class InterruptWorkItem implements ThreadPoolWorkItem + { + public void execute(ThreadPoolCurrent current) + { + // Nothing to do, this is just used to interrupt the thread pool selector. + } + } + + private static ThreadPoolWorkItem _interruptWorkItem = new InterruptWorkItem(); + // // Exception raised by the thread pool work queue when the thread pool // is destroyed. @@ -257,7 +267,33 @@ public final class ThreadPool update(EventHandler handler, int remove, int add) { assert(!_destroyed); + + // Don't remove what needs to be added + remove &= ~add; + + // Don't remove/add if already un-registered or registered + remove = handler._registered & remove; + add = ~handler._registered & add; + if(remove == add) + { + return; + } + _selector.update(handler, remove, add); + + if((add & SocketOperation.Read) != 0 && handler._hasMoreData.value && + (handler._disabled & SocketOperation.Read) == 0) + { + if(_pendingHandlers.isEmpty()) + { + _workQueue.queue(_interruptWorkItem); // Interrupt select() + } + _pendingHandlers.add(handler); + } + else if((remove & SocketOperation.Read) != 0) + { + _pendingHandlers.remove(handler); + } } public void @@ -312,6 +348,8 @@ public final class ThreadPool { ThreadPoolCurrent current = new ThreadPoolCurrent(_instance, this, thread); boolean select = false; + java.util.List<EventHandlerOpPair> handlers = new java.util.ArrayList<EventHandlerOpPair>(); + while(true) { if(current._handler != null) @@ -335,7 +373,7 @@ public final class ThreadPool { try { - _selector.select(_serverIdleTime); + _selector.select(handlers, _serverIdleTime); } catch(Selector.TimeoutException ex) { @@ -356,8 +394,25 @@ public final class ThreadPool { if(select) { - _selector.finishSelect(_handlers, _serverIdleTime); + java.util.List<EventHandlerOpPair> tmp = _handlers; + _handlers = handlers; + handlers = tmp; + + if(!_pendingHandlers.isEmpty()) + { + for(EventHandlerOpPair pair : _handlers) + { + _pendingHandlers.remove(pair.handler); + } + for(EventHandler p : _pendingHandlers) + { + _handlers.add(new EventHandlerOpPair(p, SocketOperation.Read)); + } + _pendingHandlers.clear(); + } + _nextHandler = _handlers.iterator(); + _selector.finishSelect(); select = false; } else if(!current._leader && followerWait(current)) @@ -375,9 +430,14 @@ public final class ThreadPool // --_inUseIO; - if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) + if(current._handler._hasMoreData.value && + (current._handler._registered & SocketOperation.Read) != 0) { - _selector.hasMoreData(current._handler); + if(_pendingHandlers.isEmpty()) + { + _workQueue.queue(_interruptWorkItem); + } + _pendingHandlers.add(current._handler); } } else @@ -386,7 +446,19 @@ public final class ThreadPool // If the handler called ioCompleted(), we re-enable the handler in // case it was disabled and we decrease the number of thread in use. // - _selector.enable(current._handler, current.operation); + if(_serialize) + { + _selector.enable(current._handler, current.operation); + if(current._handler._hasMoreData.value && + (current._handler._registered & SocketOperation.Read) != 0) + { + if(_pendingHandlers.isEmpty()) + { + _workQueue.queue(_interruptWorkItem); // Interrupt select() + } + _pendingHandlers.add(current._handler); + } + } assert(_inUse > 0); --_inUse; } @@ -396,20 +468,34 @@ public final class ThreadPool return; // Wait timed-out. } } - else if(!current._ioCompleted && - (current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) + else if(current._handler._hasMoreData.value && + (current._handler._registered & SocketOperation.Read) != 0) { - _selector.hasMoreData(current._handler); + if(_pendingHandlers.isEmpty()) + { + _workQueue.queue(_interruptWorkItem); // Interrupt select() + } + _pendingHandlers.add(current._handler); } // // Get the next ready handler. // - if(_nextHandler.hasNext()) + EventHandlerOpPair next = null; + while(_nextHandler.hasNext()) + { + EventHandlerOpPair n = _nextHandler.next(); + if((n.op & n.handler._registered) != 0) + { + next = n; + break; + } + } + if(next != null) { current._ioCompleted = false; - current._handler = _nextHandler.next(); - current.operation = current._handler._ready; + current._handler = next.handler; + current.operation = next.op; thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO); } else @@ -431,6 +517,7 @@ public final class ThreadPool } else { + _handlers.clear(); _selector.startSelect(); select = true; thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle); @@ -462,17 +549,29 @@ public final class ThreadPool if(_sizeMax > 1) { --_inUseIO; - - if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) + + if(!_destroyed) { - _selector.hasMoreData(current._handler); + if(_serialize) + { + _selector.disable(current._handler, current.operation); + + // Make sure the handler isn't in the set of pending handlers (this can + // for example occur if the handler is has more data and its added by + // ThreadPool::update while we were processing IO). + _pendingHandlers.remove(current._handler); + } + else if(current._handler._hasMoreData.value && + (current._handler._registered & SocketOperation.Read) != 0) + { + if(_pendingHandlers.isEmpty()) + { + _workQueue.queue(_interruptWorkItem); // Interrupt select() + } + _pendingHandlers.add(current._handler); + } } - - if(_serialize && !_destroyed) - { - _selector.disable(current._handler, current.operation); - } - + if(current._leader) { // @@ -527,10 +626,6 @@ public final class ThreadPool } } } - else if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) - { - _selector.hasMoreData(current._handler); - } } private synchronized void @@ -563,7 +658,7 @@ public final class ThreadPool // // Wait to be promoted and for all the IO threads to be done. // - while(!_promote || _inUseIO == _sizeIO || !_nextHandler.hasNext() && _inUseIO > 0) + while(!_promote || _inUseIO == _sizeIO || (!_nextHandler.hasNext() && _inUseIO > 0)) { try { @@ -740,8 +835,9 @@ public final class ThreadPool private int _inUse; // Number of threads that are currently in use. private int _inUseIO; // Number of threads that are currently performing IO. - private java.util.List<EventHandler> _handlers = new java.util.ArrayList<EventHandler>(); - private java.util.Iterator<EventHandler> _nextHandler; + private java.util.List<EventHandlerOpPair> _handlers = new java.util.ArrayList<EventHandlerOpPair>(); + private java.util.Iterator<EventHandlerOpPair> _nextHandler; + private java.util.HashSet<EventHandler> _pendingHandlers = new java.util.HashSet<EventHandler>(); private boolean _promote; } |