summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPoolWorkQueue.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2009-08-21 15:55:01 +0200
committerBenoit Foucher <benoit@zeroc.com>2009-08-21 15:55:01 +0200
commitb9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a (patch)
tree183215e2dbeadfbc871b800ce09726e58af38b91 /java/src/IceInternal/ThreadPoolWorkQueue.java
parentadding compression cookbook demo (diff)
downloadice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.bz2
ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.xz
ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.zip
IOCP changes, bug 3501, 4200, 4156, 3101
Diffstat (limited to 'java/src/IceInternal/ThreadPoolWorkQueue.java')
-rw-r--r--java/src/IceInternal/ThreadPoolWorkQueue.java182
1 files changed, 182 insertions, 0 deletions
diff --git a/java/src/IceInternal/ThreadPoolWorkQueue.java b/java/src/IceInternal/ThreadPoolWorkQueue.java
new file mode 100644
index 00000000000..3157cd8a6d0
--- /dev/null
+++ b/java/src/IceInternal/ThreadPoolWorkQueue.java
@@ -0,0 +1,182 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+final class ThreadPoolWorkQueue extends EventHandler
+{
+ ThreadPoolWorkQueue(ThreadPool threadPool, Instance instance, Selector selector)
+ {
+ _threadPool = threadPool;
+ _instance = instance;
+ _selector = selector;
+ _destroyed = false;
+
+ Network.SocketPair pair = Network.createPipe();
+ _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
+ _fdIntrWrite = pair.sink;
+ try
+ {
+ pair.source.configureBlocking(false);
+ }
+ catch(java.io.IOException ex)
+ {
+ Ice.SyscallException sys = new Ice.SyscallException();
+ sys.initCause(ex);
+ throw sys;
+ }
+
+ _selector.initialize(this);
+ _selector.update(this, SocketOperation.None, SocketOperation.Read);
+ }
+
+ protected synchronized void
+ finalize()
+ throws Throwable
+ {
+ IceUtilInternal.Assert.FinalizerAssert(_destroyed);
+ }
+
+ public synchronized void
+ close()
+ {
+ try
+ {
+ _fdIntrWrite.close();
+ }
+ catch(java.io.IOException ex)
+ {
+ }
+ _fdIntrWrite = null;
+
+ try
+ {
+ _fdIntrRead.close();
+ }
+ catch(java.io.IOException ex)
+ {
+ }
+ _fdIntrRead = null;
+ }
+
+ public synchronized
+ void destroy()
+ {
+ assert(!_destroyed);
+ _destroyed = true;
+ postMessage();
+ }
+
+ public synchronized void
+ queue(ThreadPoolWorkItem item)
+ {
+ if(_destroyed)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+ _workItems.add(item);
+ postMessage();
+ }
+
+ public void
+ message(ThreadPoolCurrent current)
+ {
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
+ try
+ {
+ buf.rewind();
+ int ret = _fdIntrRead.read(buf);
+ assert(ret > 0);
+ }
+ catch(java.io.IOException ex)
+ {
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
+ }
+
+ ThreadPoolWorkItem workItem = null;
+ synchronized(this)
+ {
+ if(!_workItems.isEmpty())
+ {
+ workItem = _workItems.removeFirst();
+ }
+ else
+ {
+ assert(_destroyed);
+ postMessage();
+ }
+ }
+
+ if(workItem != null)
+ {
+ workItem.execute(current);
+ }
+ else
+ {
+ _threadPool.ioCompleted(current);
+ throw new ThreadPool.DestroyedException();
+ }
+ }
+
+ public void
+ finished(ThreadPoolCurrent current)
+ {
+ assert(false);
+ }
+
+ public String
+ toString()
+ {
+ return "work queue";
+ }
+
+ public java.nio.channels.SelectableChannel
+ fd()
+ {
+ return (java.nio.channels.SelectableChannel)_fdIntrRead;
+ }
+
+ public boolean
+ hasMoreData()
+ {
+ return false;
+ }
+
+ public void
+ postMessage()
+ {
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
+ buf.put(0, (byte)0);
+ while(buf.hasRemaining())
+ {
+ try
+ {
+ _fdIntrWrite.write(buf);
+ }
+ catch(java.io.IOException ex)
+ {
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
+ }
+ }
+ }
+
+ private final ThreadPool _threadPool;
+ private final Instance _instance;
+ private final Selector _selector;
+ boolean _destroyed;
+
+ private java.nio.channels.ReadableByteChannel _fdIntrRead;
+ private java.nio.channels.WritableByteChannel _fdIntrWrite;
+
+ private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>();
+} \ No newline at end of file