diff options
author | Matthew Newhook <matthew@zeroc.com> | 2014-10-01 12:47:13 -0230 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2014-10-01 12:47:13 -0230 |
commit | 7f35ba1420d2ccf8ed40d57b6dadc3fad2509d79 (patch) | |
tree | dfa1173236b7b6928611fe540de89bce4703d359 /java/src/IceInternal/ThreadPoolWorkQueue.java | |
parent | Lots of minor fixes, few simplifications to OutgoingAsync code (diff) | |
download | ice-7f35ba1420d2ccf8ed40d57b6dadc3fad2509d79.tar.bz2 ice-7f35ba1420d2ccf8ed40d57b6dadc3fad2509d79.tar.xz ice-7f35ba1420d2ccf8ed40d57b6dadc3fad2509d79.zip |
Use selector.wakeup() instead of a socket pair to wake the selector.
Diffstat (limited to 'java/src/IceInternal/ThreadPoolWorkQueue.java')
-rw-r--r-- | java/src/IceInternal/ThreadPoolWorkQueue.java | 125 |
1 files changed, 25 insertions, 100 deletions
diff --git a/java/src/IceInternal/ThreadPoolWorkQueue.java b/java/src/IceInternal/ThreadPoolWorkQueue.java index c46fe75fd3a..208c562ae9c 100644 --- a/java/src/IceInternal/ThreadPoolWorkQueue.java +++ b/java/src/IceInternal/ThreadPoolWorkQueue.java @@ -9,31 +9,17 @@ package IceInternal; -import java.util.concurrent.ExecutorService; +import java.util.List; final class ThreadPoolWorkQueue extends EventHandler { ThreadPoolWorkQueue(Instance instance, ThreadPool threadPool, Selector selector) { - _executor = instance.getQueueExecutor(); _threadPool = threadPool; _selector = selector; _destroyed = false; - - Network.SocketPair pair = Network.createPipe(); - _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; - _fdIntrWrite = pair.sink; - try - { - pair.source.configureBlocking(false); - } - catch(java.io.IOException ex) - { - throw new Ice.SyscallException(ex); - } - - _selector.initialize(this); - _selector.update(this, SocketOperation.None, SocketOperation.Read); + + _registered = SocketOperation.Read; } @Override @@ -54,37 +40,15 @@ final class ThreadPoolWorkQueue extends EventHandler } } - public synchronized void - close() - { - try - { - _fdIntrWrite.close(); - } - catch(java.io.IOException ex) - { - } - _fdIntrWrite = null; - - try - { - _fdIntrRead.close(); - } - catch(java.io.IOException ex) - { - } - _fdIntrRead = null; - } - - public synchronized + synchronized void destroy() { assert(!_destroyed); _destroyed = true; - postMessage(); + _selector.wakeup(); } - public synchronized void + synchronized void queue(ThreadPoolWorkItem item) { if(_destroyed) @@ -93,25 +57,13 @@ final class ThreadPoolWorkQueue extends EventHandler } assert(item != null); _workItems.add(item); - postMessage(); + _selector.wakeup(); } @Override public void message(ThreadPoolCurrent current) { - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - try - { - buf.rewind(); - int ret = _fdIntrRead.read(buf); - assert(ret > 0); - } - catch(java.io.IOException ex) - { - throw new Ice.SocketException(ex); - } - ThreadPoolWorkItem workItem = null; synchronized(this) { @@ -123,7 +75,7 @@ final class ThreadPoolWorkQueue extends EventHandler else { assert(_destroyed); - postMessage(); + _selector.wakeup(); } } @@ -153,63 +105,36 @@ final class ThreadPoolWorkQueue extends EventHandler } @Override - public java.nio.channels.SelectableChannel - fd() + public java.nio.channels.SelectableChannel fd() { - return (java.nio.channels.SelectableChannel)_fdIntrRead; + return null; } - public void - postMessage() + // Return the number of pending events. + synchronized int size() { - if(_executor != null) + if(_destroyed) { - _executor.submit(new Runnable() { - - @Override - public void run() - { - postMessageInternal(); - } - }); - } - else { - postMessageInternal(); + return 1; } + return _workItems.size(); } - - private void - postMessageInternal() + + synchronized void update(List<EventHandlerOpPair> handlers) { - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - buf.put(0, (byte)0); - while(buf.hasRemaining()) + int sz = size(); + while(sz > 0) { - try - { - _fdIntrWrite.write(buf); - } - // - // This is thrown if the thread is interrupted. - // - catch(java.nio.channels.ClosedChannelException ex) - { - break; - } - catch(java.io.IOException ex) - { - throw new Ice.SocketException(ex); - } + handlers.add(_opPair); + --sz; } } private final ThreadPool _threadPool; - private final Selector _selector; - boolean _destroyed; - - private java.nio.channels.ReadableByteChannel _fdIntrRead; - private java.nio.channels.WritableByteChannel _fdIntrWrite; + private boolean _destroyed; + private Selector _selector; + private EventHandlerOpPair _opPair = new EventHandlerOpPair(this, SocketOperation.Read); private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>(); - private ExecutorService _executor; + } |