diff options
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/IceInternal/Selector.java | 35 | ||||
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 29 | ||||
-rw-r--r-- | java/src/IceInternal/ThreadPoolWorkQueue.java | 125 |
3 files changed, 57 insertions, 132 deletions
diff --git a/java/src/IceInternal/Selector.java b/java/src/IceInternal/Selector.java index 9b0835daad7..ffccf323d93 100644 --- a/java/src/IceInternal/Selector.java +++ b/java/src/IceInternal/Selector.java @@ -15,7 +15,7 @@ public final class Selector { } - public Selector(Instance instance) + Selector(Instance instance) { _instance = instance; _selecting = false; @@ -37,7 +37,7 @@ public final class Selector _keys = _selector.selectedKeys(); } - public void destroy() + void destroy() { try { @@ -49,12 +49,12 @@ public final class Selector _selector = null; } - public void initialize(EventHandler handler) + void initialize(EventHandler handler) { updateImpl(handler); } - public void update(EventHandler handler, int remove, int add) + void update(EventHandler handler, int remove, int add) { int previous = handler._registered; handler._registered = handler._registered & ~remove; @@ -67,7 +67,7 @@ public final class Selector updateImpl(handler); } - public void enable(EventHandler handler, int status) + void enable(EventHandler handler, int status) { if((handler._disabled & status) == 0) { @@ -81,7 +81,7 @@ public final class Selector } } - public void disable(EventHandler handler, int status) + void disable(EventHandler handler, int status) { if((handler._disabled & status) != 0) { @@ -95,7 +95,7 @@ public final class Selector } } - public void finish(EventHandler handler) + void finish(EventHandler handler) { if(handler._registered != 0) { @@ -111,7 +111,7 @@ public final class Selector } } - public void startSelect() + void startSelect() { if(_interrupted) { @@ -125,12 +125,12 @@ public final class Selector _selecting = true; } - public void finishSelect() + void finishSelect() { _selecting = false; } - public void select(java.util.List<EventHandlerOpPair> handlers, long timeout) + void select(java.util.List<EventHandlerOpPair> handlers, long timeout) throws TimeoutException { while(true) @@ -192,7 +192,6 @@ public final class Selector } handlers.clear(); - if(_interrupted) // Interrupted, we have to process the interrupt before returning any handlers { return; @@ -225,7 +224,7 @@ public final class Selector } _spuriousWakeUp = 0; - + for(java.nio.channels.SelectionKey key : _keys) { EventHandler handler = (EventHandler)key.attachment(); @@ -245,6 +244,11 @@ public final class Selector } _keys.clear(); } + + void wakeup() + { + _selector.wakeup(); + } private void updateImpl(EventHandler handler) { @@ -297,7 +301,7 @@ public final class Selector _changes.clear(); } - int toJavaOps(EventHandler handler, int o) + private int toJavaOps(EventHandler handler, int o) { int op = 0; if((o & SocketOperation.Read) != 0) @@ -322,7 +326,7 @@ public final class Selector return op; } - int fromJavaOps(int o) + private int fromJavaOps(int o) { int op = 0; if((o & (java.nio.channels.SelectionKey.OP_READ | java.nio.channels.SelectionKey.OP_ACCEPT)) != 0) @@ -343,10 +347,9 @@ public final class Selector final private Instance _instance; private java.nio.channels.Selector _selector; - private java.util.Set<java.nio.channels.SelectionKey> _keys; private java.util.HashSet<EventHandler> _changes = new java.util.HashSet<EventHandler>(); private boolean _selecting; private boolean _interrupted; - private int _spuriousWakeUp; + private int _spuriousWakeUp; } diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 291f2973e3b..7ac3440f732 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -191,7 +191,6 @@ public final class ThreadPool _priority = priority; _workQueue = new ThreadPoolWorkQueue(_instance, this, _selector); - _nextHandler = _handlers.iterator(); if(_instance.traceLevels().threadPool >= 1) @@ -378,14 +377,8 @@ public final class ThreadPool } // - // TODO: MJN: InterruptedException leads to a leak as the - // work queue and selector are not destroyed? - // - - // // Destroy the selector // - _workQueue.close(); _selector.destroy(); } @@ -417,21 +410,25 @@ public final class ThreadPool } else if(select) { - try - { - _selector.select(handlers, _serverIdleTime); - } - catch(Selector.TimeoutException ex) + if(_workQueue.size() == 0) { - synchronized(this) + try + { + _selector.select(handlers, _serverIdleTime); + } + catch(Selector.TimeoutException ex) { - if(!_destroyed && _inUse == 0) + synchronized(this) { - _workQueue.queue(new ShutdownWorkItem()); // Select timed-out. + if(!_destroyed && _inUse == 0) + { + _workQueue.queue(new ShutdownWorkItem()); // Select timed-out. + } + continue; } - continue; } } + _workQueue.update(handlers); } synchronized(this) diff --git a/java/src/IceInternal/ThreadPoolWorkQueue.java b/java/src/IceInternal/ThreadPoolWorkQueue.java index c46fe75fd3a..208c562ae9c 100644 --- a/java/src/IceInternal/ThreadPoolWorkQueue.java +++ b/java/src/IceInternal/ThreadPoolWorkQueue.java @@ -9,31 +9,17 @@ package IceInternal; -import java.util.concurrent.ExecutorService; +import java.util.List; final class ThreadPoolWorkQueue extends EventHandler { ThreadPoolWorkQueue(Instance instance, ThreadPool threadPool, Selector selector) { - _executor = instance.getQueueExecutor(); _threadPool = threadPool; _selector = selector; _destroyed = false; - - Network.SocketPair pair = Network.createPipe(); - _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; - _fdIntrWrite = pair.sink; - try - { - pair.source.configureBlocking(false); - } - catch(java.io.IOException ex) - { - throw new Ice.SyscallException(ex); - } - - _selector.initialize(this); - _selector.update(this, SocketOperation.None, SocketOperation.Read); + + _registered = SocketOperation.Read; } @Override @@ -54,37 +40,15 @@ final class ThreadPoolWorkQueue extends EventHandler } } - public synchronized void - close() - { - try - { - _fdIntrWrite.close(); - } - catch(java.io.IOException ex) - { - } - _fdIntrWrite = null; - - try - { - _fdIntrRead.close(); - } - catch(java.io.IOException ex) - { - } - _fdIntrRead = null; - } - - public synchronized + synchronized void destroy() { assert(!_destroyed); _destroyed = true; - postMessage(); + _selector.wakeup(); } - public synchronized void + synchronized void queue(ThreadPoolWorkItem item) { if(_destroyed) @@ -93,25 +57,13 @@ final class ThreadPoolWorkQueue extends EventHandler } assert(item != null); _workItems.add(item); - postMessage(); + _selector.wakeup(); } @Override public void message(ThreadPoolCurrent current) { - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - try - { - buf.rewind(); - int ret = _fdIntrRead.read(buf); - assert(ret > 0); - } - catch(java.io.IOException ex) - { - throw new Ice.SocketException(ex); - } - ThreadPoolWorkItem workItem = null; synchronized(this) { @@ -123,7 +75,7 @@ final class ThreadPoolWorkQueue extends EventHandler else { assert(_destroyed); - postMessage(); + _selector.wakeup(); } } @@ -153,63 +105,36 @@ final class ThreadPoolWorkQueue extends EventHandler } @Override - public java.nio.channels.SelectableChannel - fd() + public java.nio.channels.SelectableChannel fd() { - return (java.nio.channels.SelectableChannel)_fdIntrRead; + return null; } - public void - postMessage() + // Return the number of pending events. + synchronized int size() { - if(_executor != null) + if(_destroyed) { - _executor.submit(new Runnable() { - - @Override - public void run() - { - postMessageInternal(); - } - }); - } - else { - postMessageInternal(); + return 1; } + return _workItems.size(); } - - private void - postMessageInternal() + + synchronized void update(List<EventHandlerOpPair> handlers) { - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - buf.put(0, (byte)0); - while(buf.hasRemaining()) + int sz = size(); + while(sz > 0) { - try - { - _fdIntrWrite.write(buf); - } - // - // This is thrown if the thread is interrupted. - // - catch(java.nio.channels.ClosedChannelException ex) - { - break; - } - catch(java.io.IOException ex) - { - throw new Ice.SocketException(ex); - } + handlers.add(_opPair); + --sz; } } private final ThreadPool _threadPool; - private final Selector _selector; - boolean _destroyed; - - private java.nio.channels.ReadableByteChannel _fdIntrRead; - private java.nio.channels.WritableByteChannel _fdIntrWrite; + private boolean _destroyed; + private Selector _selector; + private EventHandlerOpPair _opPair = new EventHandlerOpPair(this, SocketOperation.Read); private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>(); - private ExecutorService _executor; + } |