summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java152
1 files changed, 124 insertions, 28 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 03e3713fe93..8ab584e9623 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -25,7 +25,7 @@ public final class ThreadPool
}
}
}
-
+
static final class FinishedWorkItem implements ThreadPoolWorkItem
{
public
@@ -60,6 +60,16 @@ public final class ThreadPool
private final EventHandlerThread _thread;
}
+ static final class InterruptWorkItem implements ThreadPoolWorkItem
+ {
+ public void execute(ThreadPoolCurrent current)
+ {
+ // Nothing to do, this is just used to interrupt the thread pool selector.
+ }
+ }
+
+ private static ThreadPoolWorkItem _interruptWorkItem = new InterruptWorkItem();
+
//
// Exception raised by the thread pool work queue when the thread pool
// is destroyed.
@@ -257,7 +267,33 @@ public final class ThreadPool
update(EventHandler handler, int remove, int add)
{
assert(!_destroyed);
+
+ // Don't remove what needs to be added
+ remove &= ~add;
+
+ // Don't remove/add if already un-registered or registered
+ remove = handler._registered & remove;
+ add = ~handler._registered & add;
+ if(remove == add)
+ {
+ return;
+ }
+
_selector.update(handler, remove, add);
+
+ if((add & SocketOperation.Read) != 0 && handler._hasMoreData.value &&
+ (handler._disabled & SocketOperation.Read) == 0)
+ {
+ if(_pendingHandlers.isEmpty())
+ {
+ _workQueue.queue(_interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.add(handler);
+ }
+ else if((remove & SocketOperation.Read) != 0)
+ {
+ _pendingHandlers.remove(handler);
+ }
}
public void
@@ -312,6 +348,8 @@ public final class ThreadPool
{
ThreadPoolCurrent current = new ThreadPoolCurrent(_instance, this, thread);
boolean select = false;
+ java.util.List<EventHandlerOpPair> handlers = new java.util.ArrayList<EventHandlerOpPair>();
+
while(true)
{
if(current._handler != null)
@@ -335,7 +373,7 @@ public final class ThreadPool
{
try
{
- _selector.select(_serverIdleTime);
+ _selector.select(handlers, _serverIdleTime);
}
catch(Selector.TimeoutException ex)
{
@@ -356,8 +394,25 @@ public final class ThreadPool
{
if(select)
{
- _selector.finishSelect(_handlers, _serverIdleTime);
+ java.util.List<EventHandlerOpPair> tmp = _handlers;
+ _handlers = handlers;
+ handlers = tmp;
+
+ if(!_pendingHandlers.isEmpty())
+ {
+ for(EventHandlerOpPair pair : _handlers)
+ {
+ _pendingHandlers.remove(pair.handler);
+ }
+ for(EventHandler p : _pendingHandlers)
+ {
+ _handlers.add(new EventHandlerOpPair(p, SocketOperation.Read));
+ }
+ _pendingHandlers.clear();
+ }
+
_nextHandler = _handlers.iterator();
+ _selector.finishSelect();
select = false;
}
else if(!current._leader && followerWait(current))
@@ -375,9 +430,14 @@ public final class ThreadPool
//
--_inUseIO;
- if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
+ if(current._handler._hasMoreData.value &&
+ (current._handler._registered & SocketOperation.Read) != 0)
{
- _selector.hasMoreData(current._handler);
+ if(_pendingHandlers.isEmpty())
+ {
+ _workQueue.queue(_interruptWorkItem);
+ }
+ _pendingHandlers.add(current._handler);
}
}
else
@@ -386,7 +446,19 @@ public final class ThreadPool
// If the handler called ioCompleted(), we re-enable the handler in
// case it was disabled and we decrease the number of thread in use.
//
- _selector.enable(current._handler, current.operation);
+ if(_serialize)
+ {
+ _selector.enable(current._handler, current.operation);
+ if(current._handler._hasMoreData.value &&
+ (current._handler._registered & SocketOperation.Read) != 0)
+ {
+ if(_pendingHandlers.isEmpty())
+ {
+ _workQueue.queue(_interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.add(current._handler);
+ }
+ }
assert(_inUse > 0);
--_inUse;
}
@@ -396,20 +468,34 @@ public final class ThreadPool
return; // Wait timed-out.
}
}
- else if(!current._ioCompleted &&
- (current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
+ else if(current._handler._hasMoreData.value &&
+ (current._handler._registered & SocketOperation.Read) != 0)
{
- _selector.hasMoreData(current._handler);
+ if(_pendingHandlers.isEmpty())
+ {
+ _workQueue.queue(_interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.add(current._handler);
}
//
// Get the next ready handler.
//
- if(_nextHandler.hasNext())
+ EventHandlerOpPair next = null;
+ while(_nextHandler.hasNext())
+ {
+ EventHandlerOpPair n = _nextHandler.next();
+ if((n.op & n.handler._registered) != 0)
+ {
+ next = n;
+ break;
+ }
+ }
+ if(next != null)
{
current._ioCompleted = false;
- current._handler = _nextHandler.next();
- current.operation = current._handler._ready;
+ current._handler = next.handler;
+ current.operation = next.op;
thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO);
}
else
@@ -431,6 +517,7 @@ public final class ThreadPool
}
else
{
+ _handlers.clear();
_selector.startSelect();
select = true;
thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle);
@@ -462,17 +549,29 @@ public final class ThreadPool
if(_sizeMax > 1)
{
--_inUseIO;
-
- if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
+
+ if(!_destroyed)
{
- _selector.hasMoreData(current._handler);
+ if(_serialize)
+ {
+ _selector.disable(current._handler, current.operation);
+
+ // Make sure the handler isn't in the set of pending handlers (this can
+ // for example occur if the handler is has more data and its added by
+ // ThreadPool::update while we were processing IO).
+ _pendingHandlers.remove(current._handler);
+ }
+ else if(current._handler._hasMoreData.value &&
+ (current._handler._registered & SocketOperation.Read) != 0)
+ {
+ if(_pendingHandlers.isEmpty())
+ {
+ _workQueue.queue(_interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.add(current._handler);
+ }
}
-
- if(_serialize && !_destroyed)
- {
- _selector.disable(current._handler, current.operation);
- }
-
+
if(current._leader)
{
//
@@ -527,10 +626,6 @@ public final class ThreadPool
}
}
}
- else if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
- {
- _selector.hasMoreData(current._handler);
- }
}
private synchronized void
@@ -563,7 +658,7 @@ public final class ThreadPool
//
// Wait to be promoted and for all the IO threads to be done.
//
- while(!_promote || _inUseIO == _sizeIO || !_nextHandler.hasNext() && _inUseIO > 0)
+ while(!_promote || _inUseIO == _sizeIO || (!_nextHandler.hasNext() && _inUseIO > 0))
{
try
{
@@ -740,8 +835,9 @@ public final class ThreadPool
private int _inUse; // Number of threads that are currently in use.
private int _inUseIO; // Number of threads that are currently performing IO.
- private java.util.List<EventHandler> _handlers = new java.util.ArrayList<EventHandler>();
- private java.util.Iterator<EventHandler> _nextHandler;
+ private java.util.List<EventHandlerOpPair> _handlers = new java.util.ArrayList<EventHandlerOpPair>();
+ private java.util.Iterator<EventHandlerOpPair> _nextHandler;
+ private java.util.HashSet<EventHandler> _pendingHandlers = new java.util.HashSet<EventHandler>();
private boolean _promote;
}