summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPoolWorkQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/ThreadPoolWorkQueue.java')
-rw-r--r--java/src/IceInternal/ThreadPoolWorkQueue.java36
1 files changed, 33 insertions, 3 deletions
diff --git a/java/src/IceInternal/ThreadPoolWorkQueue.java b/java/src/IceInternal/ThreadPoolWorkQueue.java
index 3cf5bc4430e..c46fe75fd3a 100644
--- a/java/src/IceInternal/ThreadPoolWorkQueue.java
+++ b/java/src/IceInternal/ThreadPoolWorkQueue.java
@@ -9,12 +9,14 @@
package IceInternal;
+import java.util.concurrent.ExecutorService;
+
final class ThreadPoolWorkQueue extends EventHandler
{
- ThreadPoolWorkQueue(ThreadPool threadPool, Instance instance, Selector selector)
+ ThreadPoolWorkQueue(Instance instance, ThreadPool threadPool, Selector selector)
{
+ _executor = instance.getQueueExecutor();
_threadPool = threadPool;
- _instance = instance;
_selector = selector;
_destroyed = false;
@@ -89,6 +91,7 @@ final class ThreadPoolWorkQueue extends EventHandler
{
throw new Ice.CommunicatorDestroyedException();
}
+ assert(item != null);
_workItems.add(item);
postMessage();
}
@@ -115,6 +118,7 @@ final class ThreadPoolWorkQueue extends EventHandler
if(!_workItems.isEmpty())
{
workItem = _workItems.removeFirst();
+ assert(workItem != null);
}
else
{
@@ -158,6 +162,25 @@ final class ThreadPoolWorkQueue extends EventHandler
public void
postMessage()
{
+ if(_executor != null)
+ {
+ _executor.submit(new Runnable() {
+
+ @Override
+ public void run()
+ {
+ postMessageInternal();
+ }
+ });
+ }
+ else {
+ postMessageInternal();
+ }
+ }
+
+ private void
+ postMessageInternal()
+ {
java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
buf.put(0, (byte)0);
while(buf.hasRemaining())
@@ -166,6 +189,13 @@ final class ThreadPoolWorkQueue extends EventHandler
{
_fdIntrWrite.write(buf);
}
+ //
+ // This is thrown if the thread is interrupted.
+ //
+ catch(java.nio.channels.ClosedChannelException ex)
+ {
+ break;
+ }
catch(java.io.IOException ex)
{
throw new Ice.SocketException(ex);
@@ -174,7 +204,6 @@ final class ThreadPoolWorkQueue extends EventHandler
}
private final ThreadPool _threadPool;
- private final Instance _instance;
private final Selector _selector;
boolean _destroyed;
@@ -182,4 +211,5 @@ final class ThreadPoolWorkQueue extends EventHandler
private java.nio.channels.WritableByteChannel _fdIntrWrite;
private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>();
+ private ExecutorService _executor;
}