diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-08-21 15:55:01 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-08-21 15:55:01 +0200 |
commit | b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a (patch) | |
tree | 183215e2dbeadfbc871b800ce09726e58af38b91 /java/src/IceInternal/ThreadPoolWorkQueue.java | |
parent | adding compression cookbook demo (diff) | |
download | ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.bz2 ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.xz ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.zip |
IOCP changes, bug 3501, 4200, 4156, 3101
Diffstat (limited to 'java/src/IceInternal/ThreadPoolWorkQueue.java')
-rw-r--r-- | java/src/IceInternal/ThreadPoolWorkQueue.java | 182 |
1 files changed, 182 insertions, 0 deletions
diff --git a/java/src/IceInternal/ThreadPoolWorkQueue.java b/java/src/IceInternal/ThreadPoolWorkQueue.java new file mode 100644 index 00000000000..3157cd8a6d0 --- /dev/null +++ b/java/src/IceInternal/ThreadPoolWorkQueue.java @@ -0,0 +1,182 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +final class ThreadPoolWorkQueue extends EventHandler +{ + ThreadPoolWorkQueue(ThreadPool threadPool, Instance instance, Selector selector) + { + _threadPool = threadPool; + _instance = instance; + _selector = selector; + _destroyed = false; + + Network.SocketPair pair = Network.createPipe(); + _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; + _fdIntrWrite = pair.sink; + try + { + pair.source.configureBlocking(false); + } + catch(java.io.IOException ex) + { + Ice.SyscallException sys = new Ice.SyscallException(); + sys.initCause(ex); + throw sys; + } + + _selector.initialize(this); + _selector.update(this, SocketOperation.None, SocketOperation.Read); + } + + protected synchronized void + finalize() + throws Throwable + { + IceUtilInternal.Assert.FinalizerAssert(_destroyed); + } + + public synchronized void + close() + { + try + { + _fdIntrWrite.close(); + } + catch(java.io.IOException ex) + { + } + _fdIntrWrite = null; + + try + { + _fdIntrRead.close(); + } + catch(java.io.IOException ex) + { + } + _fdIntrRead = null; + } + + public synchronized + void destroy() + { + assert(!_destroyed); + _destroyed = true; + postMessage(); + } + + public synchronized void + queue(ThreadPoolWorkItem item) + { + if(_destroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + _workItems.add(item); + postMessage(); + } + + public void + message(ThreadPoolCurrent current) + { + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); + try + { + buf.rewind(); + int ret = _fdIntrRead.read(buf); + assert(ret > 0); + } + catch(java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } + + ThreadPoolWorkItem workItem = null; + synchronized(this) + { + if(!_workItems.isEmpty()) + { + workItem = _workItems.removeFirst(); + } + else + { + assert(_destroyed); + postMessage(); + } + } + + if(workItem != null) + { + workItem.execute(current); + } + else + { + _threadPool.ioCompleted(current); + throw new ThreadPool.DestroyedException(); + } + } + + public void + finished(ThreadPoolCurrent current) + { + assert(false); + } + + public String + toString() + { + return "work queue"; + } + + public java.nio.channels.SelectableChannel + fd() + { + return (java.nio.channels.SelectableChannel)_fdIntrRead; + } + + public boolean + hasMoreData() + { + return false; + } + + public void + postMessage() + { + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); + buf.put(0, (byte)0); + while(buf.hasRemaining()) + { + try + { + _fdIntrWrite.write(buf); + } + catch(java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } + } + } + + private final ThreadPool _threadPool; + private final Instance _instance; + private final Selector _selector; + boolean _destroyed; + + private java.nio.channels.ReadableByteChannel _fdIntrRead; + private java.nio.channels.WritableByteChannel _fdIntrWrite; + + private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>(); +}
\ No newline at end of file |