summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/src')
-rw-r--r--java/src/IceInternal/Selector.java35
-rw-r--r--java/src/IceInternal/ThreadPool.java29
-rw-r--r--java/src/IceInternal/ThreadPoolWorkQueue.java125
3 files changed, 57 insertions, 132 deletions
diff --git a/java/src/IceInternal/Selector.java b/java/src/IceInternal/Selector.java
index 9b0835daad7..ffccf323d93 100644
--- a/java/src/IceInternal/Selector.java
+++ b/java/src/IceInternal/Selector.java
@@ -15,7 +15,7 @@ public final class Selector
{
}
- public Selector(Instance instance)
+ Selector(Instance instance)
{
_instance = instance;
_selecting = false;
@@ -37,7 +37,7 @@ public final class Selector
_keys = _selector.selectedKeys();
}
- public void destroy()
+ void destroy()
{
try
{
@@ -49,12 +49,12 @@ public final class Selector
_selector = null;
}
- public void initialize(EventHandler handler)
+ void initialize(EventHandler handler)
{
updateImpl(handler);
}
- public void update(EventHandler handler, int remove, int add)
+ void update(EventHandler handler, int remove, int add)
{
int previous = handler._registered;
handler._registered = handler._registered & ~remove;
@@ -67,7 +67,7 @@ public final class Selector
updateImpl(handler);
}
- public void enable(EventHandler handler, int status)
+ void enable(EventHandler handler, int status)
{
if((handler._disabled & status) == 0)
{
@@ -81,7 +81,7 @@ public final class Selector
}
}
- public void disable(EventHandler handler, int status)
+ void disable(EventHandler handler, int status)
{
if((handler._disabled & status) != 0)
{
@@ -95,7 +95,7 @@ public final class Selector
}
}
- public void finish(EventHandler handler)
+ void finish(EventHandler handler)
{
if(handler._registered != 0)
{
@@ -111,7 +111,7 @@ public final class Selector
}
}
- public void startSelect()
+ void startSelect()
{
if(_interrupted)
{
@@ -125,12 +125,12 @@ public final class Selector
_selecting = true;
}
- public void finishSelect()
+ void finishSelect()
{
_selecting = false;
}
- public void select(java.util.List<EventHandlerOpPair> handlers, long timeout)
+ void select(java.util.List<EventHandlerOpPair> handlers, long timeout)
throws TimeoutException
{
while(true)
@@ -192,7 +192,6 @@ public final class Selector
}
handlers.clear();
-
if(_interrupted) // Interrupted, we have to process the interrupt before returning any handlers
{
return;
@@ -225,7 +224,7 @@ public final class Selector
}
_spuriousWakeUp = 0;
-
+
for(java.nio.channels.SelectionKey key : _keys)
{
EventHandler handler = (EventHandler)key.attachment();
@@ -245,6 +244,11 @@ public final class Selector
}
_keys.clear();
}
+
+ void wakeup()
+ {
+ _selector.wakeup();
+ }
private void updateImpl(EventHandler handler)
{
@@ -297,7 +301,7 @@ public final class Selector
_changes.clear();
}
- int toJavaOps(EventHandler handler, int o)
+ private int toJavaOps(EventHandler handler, int o)
{
int op = 0;
if((o & SocketOperation.Read) != 0)
@@ -322,7 +326,7 @@ public final class Selector
return op;
}
- int fromJavaOps(int o)
+ private int fromJavaOps(int o)
{
int op = 0;
if((o & (java.nio.channels.SelectionKey.OP_READ | java.nio.channels.SelectionKey.OP_ACCEPT)) != 0)
@@ -343,10 +347,9 @@ public final class Selector
final private Instance _instance;
private java.nio.channels.Selector _selector;
-
private java.util.Set<java.nio.channels.SelectionKey> _keys;
private java.util.HashSet<EventHandler> _changes = new java.util.HashSet<EventHandler>();
private boolean _selecting;
private boolean _interrupted;
- private int _spuriousWakeUp;
+ private int _spuriousWakeUp;
}
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 291f2973e3b..7ac3440f732 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -191,7 +191,6 @@ public final class ThreadPool
_priority = priority;
_workQueue = new ThreadPoolWorkQueue(_instance, this, _selector);
-
_nextHandler = _handlers.iterator();
if(_instance.traceLevels().threadPool >= 1)
@@ -378,14 +377,8 @@ public final class ThreadPool
}
//
- // TODO: MJN: InterruptedException leads to a leak as the
- // work queue and selector are not destroyed?
- //
-
- //
// Destroy the selector
//
- _workQueue.close();
_selector.destroy();
}
@@ -417,21 +410,25 @@ public final class ThreadPool
}
else if(select)
{
- try
- {
- _selector.select(handlers, _serverIdleTime);
- }
- catch(Selector.TimeoutException ex)
+ if(_workQueue.size() == 0)
{
- synchronized(this)
+ try
+ {
+ _selector.select(handlers, _serverIdleTime);
+ }
+ catch(Selector.TimeoutException ex)
{
- if(!_destroyed && _inUse == 0)
+ synchronized(this)
{
- _workQueue.queue(new ShutdownWorkItem()); // Select timed-out.
+ if(!_destroyed && _inUse == 0)
+ {
+ _workQueue.queue(new ShutdownWorkItem()); // Select timed-out.
+ }
+ continue;
}
- continue;
}
}
+ _workQueue.update(handlers);
}
synchronized(this)
diff --git a/java/src/IceInternal/ThreadPoolWorkQueue.java b/java/src/IceInternal/ThreadPoolWorkQueue.java
index c46fe75fd3a..208c562ae9c 100644
--- a/java/src/IceInternal/ThreadPoolWorkQueue.java
+++ b/java/src/IceInternal/ThreadPoolWorkQueue.java
@@ -9,31 +9,17 @@
package IceInternal;
-import java.util.concurrent.ExecutorService;
+import java.util.List;
final class ThreadPoolWorkQueue extends EventHandler
{
ThreadPoolWorkQueue(Instance instance, ThreadPool threadPool, Selector selector)
{
- _executor = instance.getQueueExecutor();
_threadPool = threadPool;
_selector = selector;
_destroyed = false;
-
- Network.SocketPair pair = Network.createPipe();
- _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
- _fdIntrWrite = pair.sink;
- try
- {
- pair.source.configureBlocking(false);
- }
- catch(java.io.IOException ex)
- {
- throw new Ice.SyscallException(ex);
- }
-
- _selector.initialize(this);
- _selector.update(this, SocketOperation.None, SocketOperation.Read);
+
+ _registered = SocketOperation.Read;
}
@Override
@@ -54,37 +40,15 @@ final class ThreadPoolWorkQueue extends EventHandler
}
}
- public synchronized void
- close()
- {
- try
- {
- _fdIntrWrite.close();
- }
- catch(java.io.IOException ex)
- {
- }
- _fdIntrWrite = null;
-
- try
- {
- _fdIntrRead.close();
- }
- catch(java.io.IOException ex)
- {
- }
- _fdIntrRead = null;
- }
-
- public synchronized
+ synchronized
void destroy()
{
assert(!_destroyed);
_destroyed = true;
- postMessage();
+ _selector.wakeup();
}
- public synchronized void
+ synchronized void
queue(ThreadPoolWorkItem item)
{
if(_destroyed)
@@ -93,25 +57,13 @@ final class ThreadPoolWorkQueue extends EventHandler
}
assert(item != null);
_workItems.add(item);
- postMessage();
+ _selector.wakeup();
}
@Override
public void
message(ThreadPoolCurrent current)
{
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
- try
- {
- buf.rewind();
- int ret = _fdIntrRead.read(buf);
- assert(ret > 0);
- }
- catch(java.io.IOException ex)
- {
- throw new Ice.SocketException(ex);
- }
-
ThreadPoolWorkItem workItem = null;
synchronized(this)
{
@@ -123,7 +75,7 @@ final class ThreadPoolWorkQueue extends EventHandler
else
{
assert(_destroyed);
- postMessage();
+ _selector.wakeup();
}
}
@@ -153,63 +105,36 @@ final class ThreadPoolWorkQueue extends EventHandler
}
@Override
- public java.nio.channels.SelectableChannel
- fd()
+ public java.nio.channels.SelectableChannel fd()
{
- return (java.nio.channels.SelectableChannel)_fdIntrRead;
+ return null;
}
- public void
- postMessage()
+ // Return the number of pending events.
+ synchronized int size()
{
- if(_executor != null)
+ if(_destroyed)
{
- _executor.submit(new Runnable() {
-
- @Override
- public void run()
- {
- postMessageInternal();
- }
- });
- }
- else {
- postMessageInternal();
+ return 1;
}
+ return _workItems.size();
}
-
- private void
- postMessageInternal()
+
+ synchronized void update(List<EventHandlerOpPair> handlers)
{
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
- buf.put(0, (byte)0);
- while(buf.hasRemaining())
+ int sz = size();
+ while(sz > 0)
{
- try
- {
- _fdIntrWrite.write(buf);
- }
- //
- // This is thrown if the thread is interrupted.
- //
- catch(java.nio.channels.ClosedChannelException ex)
- {
- break;
- }
- catch(java.io.IOException ex)
- {
- throw new Ice.SocketException(ex);
- }
+ handlers.add(_opPair);
+ --sz;
}
}
private final ThreadPool _threadPool;
- private final Selector _selector;
- boolean _destroyed;
-
- private java.nio.channels.ReadableByteChannel _fdIntrRead;
- private java.nio.channels.WritableByteChannel _fdIntrWrite;
+ private boolean _destroyed;
+ private Selector _selector;
+ private EventHandlerOpPair _opPair = new EventHandlerOpPair(this, SocketOperation.Read);
private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>();
- private ExecutorService _executor;
+
}