summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ThreadPool.java
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2014-08-07 14:36:07 -0230
committerMatthew Newhook <matthew@zeroc.com>2014-08-07 14:36:07 -0230
commitb36ae21c88735cbd2c39c5ccde2572a8fcc4e928 (patch)
treedfd5eee6e7d61a9c6efcbaabe916639009aaa9af /java/src/IceInternal/ThreadPool.java
parentAdd @Override where possible, and remove trailing white space. (diff)
downloadice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.bz2
ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.xz
ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.zip
ICE-1593 Handling thread interrupts in Java
- Added Ice.BackgroundIO property to perform all IO in a non-user thread. This makes Ice for Java interrupt safe. This is implemented by the QueueRequestHanbler. - EndpointHostResolver now uses an executor instead of a thread. - Added java/demo/Ice/interrupt and java/test/Ice/interrupt. - Made several changes that must be ported to C++ & C#. - InvocationTimeout exceptions can hang forever. - Connection establishment is always asynchronous. - RequestHandler.requestTimeout and asyncRequestTimeout have been renamed to requestCancel and asyncRequestCancel.
Diffstat (limited to 'java/src/IceInternal/ThreadPool.java')
-rw-r--r--java/src/IceInternal/ThreadPool.java91
1 files changed, 58 insertions, 33 deletions
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index e9554519655..0e57a6a61c7 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -57,7 +57,14 @@ public final class ThreadPool
{
// No call to ioCompleted, this shouldn't block (and we don't want to cause
// a new thread to be started).
- _thread.join();
+ try
+ {
+ _thread.join();
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore.
+ }
}
private final EventHandlerThread _thread;
@@ -183,7 +190,7 @@ public final class ThreadPool
_hasPriority = hasPriority;
_priority = priority;
- _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector);
+ _workQueue = new ThreadPoolWorkQueue(_instance, this, _selector);
_nextHandler = _handlers.iterator();
@@ -216,7 +223,14 @@ public final class ThreadPool
_instance.initializationData().logger.error(s);
destroy();
- joinWithAllThreads();
+ try
+ {
+ joinWithAllThreads();
+ }
+ catch (InterruptedException e)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
throw ex;
}
}
@@ -350,6 +364,7 @@ public final class ThreadPool
public void
joinWithAllThreads()
+ throws InterruptedException
{
//
// _threads is immutable after destroy() has been called,
@@ -363,6 +378,11 @@ public final class ThreadPool
}
//
+ // TODO: MJN: InterruptedException leads to a leak as the
+ // work queue and selector are not destroyed?
+ //
+
+ //
// Destroy the selector
//
_workQueue.close();
@@ -692,36 +712,50 @@ public final class ThreadPool
//
while(!_promote || _inUseIO == _sizeIO || (!_nextHandler.hasNext() && _inUseIO > 0))
{
- try
+ if(_threadIdleTime > 0)
{
- if(_threadIdleTime > 0)
+ long before = IceInternal.Time.currentMonotonicTimeMillis();
+ boolean interrupted = false;
+ try
{
- long before = IceInternal.Time.currentMonotonicTimeMillis();
+ //
+ // If the wait is interrupted then we'll let the thread die as if it timed out.
+ //
wait(_threadIdleTime * 1000);
- if(IceInternal.Time.currentMonotonicTimeMillis() - before >= _threadIdleTime * 1000)
+ }
+ catch (InterruptedException e)
+ {
+ interrupted = true;
+ }
+ if(interrupted || IceInternal.Time.currentMonotonicTimeMillis() - before >= _threadIdleTime * 1000)
+ {
+ if(!_destroyed && (!_promote || _inUseIO == _sizeIO ||
+ (!_nextHandler.hasNext() && _inUseIO > 0)))
{
- if(!_destroyed && (!_promote || _inUseIO == _sizeIO ||
- (!_nextHandler.hasNext() && _inUseIO > 0)))
+ if(_instance.traceLevels().threadPool >= 1)
{
- if(_instance.traceLevels().threadPool >= 1)
- {
- String s = "shrinking " + _prefix + ": Size=" + (_threads.size() - 1);
- _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
- }
- assert(_threads.size() > 1); // Can only be called by a waiting follower thread.
- _threads.remove(current._thread);
- _workQueue.queue(new JoinThreadWorkItem(current._thread));
- return true;
+ String s = "shrinking " + _prefix + ": Size=" + (_threads.size() - 1);
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
}
+ assert(_threads.size() > 1); // Can only be called by a waiting follower thread.
+ _threads.remove(current._thread);
+ _workQueue.queue(new JoinThreadWorkItem(current._thread));
+ return true;
}
}
- else
+ }
+ else
+ {
+ try
{
wait();
}
- }
- catch(InterruptedException ex)
- {
+ catch (InterruptedException e)
+ {
+ //
+ // Eat the InterruptedException.
+ //
+ }
}
}
current._leader = true; // The current thread has become the leader.
@@ -777,18 +811,9 @@ public final class ThreadPool
public void
join()
+ throws InterruptedException
{
- while(true)
- {
- try
- {
- _thread.join();
- break;
- }
- catch(InterruptedException ex)
- {
- }
- }
+ _thread.join();
}
public void