summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPoolWorkQueue.java
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2014-10-01 12:47:13 -0230
committerMatthew Newhook <matthew@zeroc.com>2014-10-01 12:47:13 -0230
commit7f35ba1420d2ccf8ed40d57b6dadc3fad2509d79 (patch)
treedfa1173236b7b6928611fe540de89bce4703d359 /java/src/IceInternal/ThreadPoolWorkQueue.java
parentLots of minor fixes, few simplifications to OutgoingAsync code (diff)
downloadice-7f35ba1420d2ccf8ed40d57b6dadc3fad2509d79.tar.bz2
ice-7f35ba1420d2ccf8ed40d57b6dadc3fad2509d79.tar.xz
ice-7f35ba1420d2ccf8ed40d57b6dadc3fad2509d79.zip
Use selector.wakeup() instead of a socket pair to wake the selector.
Diffstat (limited to 'java/src/IceInternal/ThreadPoolWorkQueue.java')
-rw-r--r--java/src/IceInternal/ThreadPoolWorkQueue.java125
1 files changed, 25 insertions, 100 deletions
diff --git a/java/src/IceInternal/ThreadPoolWorkQueue.java b/java/src/IceInternal/ThreadPoolWorkQueue.java
index c46fe75fd3a..208c562ae9c 100644
--- a/java/src/IceInternal/ThreadPoolWorkQueue.java
+++ b/java/src/IceInternal/ThreadPoolWorkQueue.java
@@ -9,31 +9,17 @@
package IceInternal;
-import java.util.concurrent.ExecutorService;
+import java.util.List;
final class ThreadPoolWorkQueue extends EventHandler
{
ThreadPoolWorkQueue(Instance instance, ThreadPool threadPool, Selector selector)
{
- _executor = instance.getQueueExecutor();
_threadPool = threadPool;
_selector = selector;
_destroyed = false;
-
- Network.SocketPair pair = Network.createPipe();
- _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
- _fdIntrWrite = pair.sink;
- try
- {
- pair.source.configureBlocking(false);
- }
- catch(java.io.IOException ex)
- {
- throw new Ice.SyscallException(ex);
- }
-
- _selector.initialize(this);
- _selector.update(this, SocketOperation.None, SocketOperation.Read);
+
+ _registered = SocketOperation.Read;
}
@Override
@@ -54,37 +40,15 @@ final class ThreadPoolWorkQueue extends EventHandler
}
}
- public synchronized void
- close()
- {
- try
- {
- _fdIntrWrite.close();
- }
- catch(java.io.IOException ex)
- {
- }
- _fdIntrWrite = null;
-
- try
- {
- _fdIntrRead.close();
- }
- catch(java.io.IOException ex)
- {
- }
- _fdIntrRead = null;
- }
-
- public synchronized
+ synchronized
void destroy()
{
assert(!_destroyed);
_destroyed = true;
- postMessage();
+ _selector.wakeup();
}
- public synchronized void
+ synchronized void
queue(ThreadPoolWorkItem item)
{
if(_destroyed)
@@ -93,25 +57,13 @@ final class ThreadPoolWorkQueue extends EventHandler
}
assert(item != null);
_workItems.add(item);
- postMessage();
+ _selector.wakeup();
}
@Override
public void
message(ThreadPoolCurrent current)
{
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
- try
- {
- buf.rewind();
- int ret = _fdIntrRead.read(buf);
- assert(ret > 0);
- }
- catch(java.io.IOException ex)
- {
- throw new Ice.SocketException(ex);
- }
-
ThreadPoolWorkItem workItem = null;
synchronized(this)
{
@@ -123,7 +75,7 @@ final class ThreadPoolWorkQueue extends EventHandler
else
{
assert(_destroyed);
- postMessage();
+ _selector.wakeup();
}
}
@@ -153,63 +105,36 @@ final class ThreadPoolWorkQueue extends EventHandler
}
@Override
- public java.nio.channels.SelectableChannel
- fd()
+ public java.nio.channels.SelectableChannel fd()
{
- return (java.nio.channels.SelectableChannel)_fdIntrRead;
+ return null;
}
- public void
- postMessage()
+ // Return the number of pending events.
+ synchronized int size()
{
- if(_executor != null)
+ if(_destroyed)
{
- _executor.submit(new Runnable() {
-
- @Override
- public void run()
- {
- postMessageInternal();
- }
- });
- }
- else {
- postMessageInternal();
+ return 1;
}
+ return _workItems.size();
}
-
- private void
- postMessageInternal()
+
+ synchronized void update(List<EventHandlerOpPair> handlers)
{
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
- buf.put(0, (byte)0);
- while(buf.hasRemaining())
+ int sz = size();
+ while(sz > 0)
{
- try
- {
- _fdIntrWrite.write(buf);
- }
- //
- // This is thrown if the thread is interrupted.
- //
- catch(java.nio.channels.ClosedChannelException ex)
- {
- break;
- }
- catch(java.io.IOException ex)
- {
- throw new Ice.SocketException(ex);
- }
+ handlers.add(_opPair);
+ --sz;
}
}
private final ThreadPool _threadPool;
- private final Selector _selector;
- boolean _destroyed;
-
- private java.nio.channels.ReadableByteChannel _fdIntrRead;
- private java.nio.channels.WritableByteChannel _fdIntrWrite;
+ private boolean _destroyed;
+ private Selector _selector;
+ private EventHandlerOpPair _opPair = new EventHandlerOpPair(this, SocketOperation.Read);
private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>();
- private ExecutorService _executor;
+
}