diff options
author | Matthew Newhook <matthew@zeroc.com> | 2014-08-07 14:36:07 -0230 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2014-08-07 14:36:07 -0230 |
commit | b36ae21c88735cbd2c39c5ccde2572a8fcc4e928 (patch) | |
tree | dfd5eee6e7d61a9c6efcbaabe916639009aaa9af /java/src | |
parent | Add @Override where possible, and remove trailing white space. (diff) | |
download | ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.bz2 ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.xz ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.zip |
ICE-1593 Handling thread interrupts in Java
- Added Ice.BackgroundIO property to perform all IO in a non-user
thread. This makes Ice for Java interrupt safe. This is implemented
by the QueueRequestHanbler.
- EndpointHostResolver now uses an executor instead of a thread.
- Added java/demo/Ice/interrupt and java/test/Ice/interrupt.
- Made several changes that must be ported to C++ & C#.
- InvocationTimeout exceptions can hang forever.
- Connection establishment is always asynchronous.
- RequestHandler.requestTimeout and asyncRequestTimeout have been
renamed to requestCancel and asyncRequestCancel.
Diffstat (limited to 'java/src')
37 files changed, 1375 insertions, 1185 deletions
diff --git a/java/src/Ice/Application.java b/java/src/Ice/Application.java index 3c649f72301..4190f39043e 100644 --- a/java/src/Ice/Application.java +++ b/java/src/Ice/Application.java @@ -240,16 +240,22 @@ public abstract class Application synchronized(_mutex) { + boolean interrupted = false; while(_callbackInProgress) { try { _mutex.wait(); } - catch(java.lang.InterruptedException ex) + catch(InterruptedException ex) { + interrupted = true; } } + if(interrupted) + { + Thread.currentThread().interrupt(); + } if(_destroyed) { @@ -504,7 +510,7 @@ public abstract class Application } // - // Note that we let the IllegalStateException propogate + // Note that we let the IllegalStateException propagate // out if necessary. // if(newHook != null) @@ -590,6 +596,7 @@ public abstract class Application } catch(InterruptedException ex) { + break; } } } @@ -625,6 +632,7 @@ public abstract class Application } catch(InterruptedException ex) { + break; } } } diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java index 2cce84b373e..eed405ad00c 100644 --- a/java/src/Ice/AsyncResult.java +++ b/java/src/Ice/AsyncResult.java @@ -88,6 +88,7 @@ public class AsyncResult } catch(InterruptedException ex) { + throw new Ice.OperationInterruptedException(); } } } @@ -127,6 +128,7 @@ public class AsyncResult } catch(InterruptedException ex) { + throw new Ice.OperationInterruptedException(); } } } @@ -217,6 +219,12 @@ public class AsyncResult } catch(InterruptedException ex) { + // + // Remove the EndCalled flag since it should be possible to + // call end_* again on the AsyncResult. + // + _state &= ~EndCalled; + throw new Ice.OperationInterruptedException(); } } if(_exception != null) @@ -485,7 +493,8 @@ public class AsyncResult if(handler != null) { - handler.asyncRequestTimedOut((IceInternal.OutgoingAsyncMessageCallback)this); + handler.asyncRequestCanceled((IceInternal.OutgoingAsyncMessageCallback)this, + new Ice.InvocationTimeoutException()); } } diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 8d90a391253..3e8c735a99d 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -42,24 +42,45 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None)) { - if(callback != null) - { - _startCallback = callback; - return; - } + _startCallback = callback; + return; + } - // - // Wait for the connection to be validated. - // + // + // We start out in holding state. + // + setState(StateHolding); + } + } + catch(Ice.LocalException ex) + { + exception(ex); + callback.connectionStartFailed(this, _exception); + return; + } + + callback.connectionStartCompleted(this); + } + + public void + startAndWait() + throws InterruptedException + { + try + { + synchronized(this) + { + if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed. + { + assert(_exception != null); + throw (Ice.LocalException)_exception.fillInStackTrace(); + } + + if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None)) + { while(_state <= StateNotValidated) { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + wait(); } if(_state >= StateClosing) @@ -78,21 +99,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne catch(Ice.LocalException ex) { exception(ex); - if(callback != null) - { - callback.connectionStartFailed(this, _exception); - return; - } - else - { - waitUntilFinished(); - throw ex; - } - } - - if(callback != null) - { - callback.connectionStartCompleted(this); + waitUntilFinished(); + return; } } @@ -171,6 +179,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } catch(InterruptedException ex) { + throw new Ice.OperationInterruptedException(); } } @@ -208,21 +217,17 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public synchronized void waitUntilHolding() + throws InterruptedException { while(_state < StateHolding || _dispatchCount > 0) { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + wait(); } } public synchronized void waitUntilFinished() + throws InterruptedException { // // We wait indefinitely until the connection is finished and all @@ -232,13 +237,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // while(_state < StateFinished || _dispatchCount > 0) { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + wait(); } assert(_state == StateFinished); @@ -488,19 +487,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne prepareBatchRequest(IceInternal.BasicStream os) throws IceInternal.RetryException { - // - // Wait if flushing is currently in progress. - // - while(_batchStreamInUse && _exception == null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } + waitBatchStreamInUse(); if(_exception != null) { @@ -692,7 +679,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne flushBatchRequests() { IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(this, _instance, __flushBatchRequests_name); - out.invoke(); + try + { + out.invoke(); + } + catch(InterruptedException ex) + { + throw new Ice.OperationInterruptedException(); + } } private static final String __flushBatchRequests_name = "flushBatchRequests"; @@ -769,17 +763,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne synchronized public boolean flushBatchRequests(IceInternal.BatchOutgoing out) { - while(_batchStreamInUse && _exception == null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - + waitBatchStreamInUse(); if(_exception != null) { throw (Ice.LocalException)_exception.fillInStackTrace(); @@ -832,16 +816,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne synchronized public int flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync) { - while(_batchStreamInUse && _exception == null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } + waitBatchStreamInUse(); if(_exception != null) { @@ -941,8 +916,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff); } - synchronized public void - requestTimedOut(IceInternal.OutgoingMessageCallback out) + synchronized public boolean + requestCanceled(IceInternal.OutgoingMessageCallback out, Ice.LocalException ex) { java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); while(it.hasNext()) @@ -964,8 +939,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { it.remove(); } - out.finished(new InvocationTimeoutException()); - return; // We're done. + out.finished(ex); + return true; // We're done. } } @@ -977,16 +952,17 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(it2.next() == o) { - o.finished(new InvocationTimeoutException()); + o.finished(ex); it2.remove(); - return; // We're done. + return true; // We're done. } } } + return false; } - public void - asyncRequestTimedOut(IceInternal.OutgoingAsyncMessageCallback outAsync) + public boolean + asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex) { java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); while(it.hasNext()) @@ -1008,8 +984,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { it.remove(); } - outAsync.__dispatchInvocationTimeout(_threadPool, this); - return; // We're done + outAsync.__dispatchInvocationCancel(ex, _threadPool, this); + return true; // We're done } } @@ -1022,13 +998,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(it2.next() == o) { it2.remove(); - outAsync.__dispatchInvocationTimeout(_threadPool, this); - return; // We're done. + outAsync.__dispatchInvocationCancel(ex, _threadPool, this); + return true; // We're done. } } } - } + return false; + } @Override synchronized public void @@ -3092,6 +3069,35 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } + private void waitBatchStreamInUse() + { + // + // This is similar to a mutex lock in that the flag is + // only true for a short time period. As such we don't permit the + // wait to be interrupted. Instead the interrupted status is saved\ + // and restored. + // + boolean interrupted = false; + while(_batchStreamInUse && _exception == null) + { + try + { + wait(); + } + catch (InterruptedException e) + { + interrupted = true; + } + } + // + // Restore the interrupted flag if we were interrupted. + // + if(interrupted) + { + Thread.currentThread().interrupt(); + } + } + private static class OutgoingMessage { OutgoingMessage(IceInternal.BasicStream stream, boolean compress, boolean adopt) @@ -3263,4 +3269,5 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished }; + } diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java index 6e2c67ac852..c3ab836967c 100644 --- a/java/src/Ice/ObjectAdapterI.java +++ b/java/src/Ice/ObjectAdapterI.java @@ -9,6 +9,8 @@ package Ice; +import java.util.Map; + public final class ObjectAdapterI implements ObjectAdapter { @Override @@ -22,7 +24,7 @@ public final class ObjectAdapterI implements ObjectAdapter } @Override - public synchronized Communicator + public Communicator getCommunicator() { return _communicator; @@ -108,7 +110,7 @@ public final class ObjectAdapterI implements ObjectAdapter synchronized(this) { - assert(!_deactivated); // Not possible if _waitForActivate = true; + assert(_deactivated == DeactivatedState.Steady); // Not possible if _waitForActivate = true; // // Signal threads waiting for the activation. @@ -156,7 +158,21 @@ public final class ObjectAdapterI implements ObjectAdapter for(IceInternal.IncomingConnectionFactory factory : incomingConnectionFactories) { - factory.waitUntilHolding(); + try + { + factory.waitUntilHolding(); + } + catch(InterruptedException ex) + { + synchronized(this) + { + if(--_waitForHold == 0) + { + notifyAll(); + } + } + throw new Ice.OperationInterruptedException(); + } } synchronized(this) @@ -169,7 +185,7 @@ public final class ObjectAdapterI implements ObjectAdapter // // If we don't need to retry, we're done. Otherwise, we wait until // all the waiters finish waiting on the connections and we try - // again waiting on all the conncetions. This is necessary in the + // again waiting on all the connections. This is necessary in the // case activate() is called by another thread while waitForHold() // waits on the some connection, if we didn't retry, waitForHold() // could return only after waiting on a subset of the connections. @@ -187,8 +203,9 @@ public final class ObjectAdapterI implements ObjectAdapter { wait(); } - catch(java.lang.InterruptedException ex) + catch(InterruptedException ex) { + throw new Ice.OperationInterruptedException(); } } _waitForHoldRetry = false; @@ -210,7 +227,7 @@ public final class ObjectAdapterI implements ObjectAdapter // Ignore deactivation requests if the object adapter has // already been deactivated. // - if(_deactivated) + if(_deactivated != DeactivatedState.Steady) { return; } @@ -228,6 +245,7 @@ public final class ObjectAdapterI implements ObjectAdapter } catch(InterruptedException ex) { + throw new Ice.OperationInterruptedException(); } } @@ -249,9 +267,7 @@ public final class ObjectAdapterI implements ObjectAdapter outgoingConnectionFactory = _instance.outgoingConnectionFactory(); locatorInfo = _locatorInfo; - _deactivated = true; - - notifyAll(); + _deactivated = DeactivatedState.Deactivating; } try @@ -282,48 +298,54 @@ public final class ObjectAdapterI implements ObjectAdapter // requests being dispatched. // outgoingConnectionFactory.removeAdapter(this); + + synchronized(this) + { + _deactivated = DeactivatedState.Deactivated; + notifyAll(); + } } @Override public void waitForDeactivate() { - IceInternal.IncomingConnectionFactory[] incomingConnectionFactories; - - synchronized(this) + try { - if(_destroyed) - { - return; - } - - // - // Wait for deactivation of the adapter itself, and - // for the return of all direct method calls using this - // adapter. - // - while(!_deactivated || _directCount > 0) + IceInternal.IncomingConnectionFactory[] incomingConnectionFactories; + synchronized(this) { - try + if(_destroyed) { - wait(); + return; } - catch(InterruptedException ex) + + // + // Wait for deactivation of the adapter itself, and + // for the return of all direct method calls using this + // adapter. + // + while((_deactivated != DeactivatedState.Deactivated) || _directCount > 0) { + wait(); } + + incomingConnectionFactories = + _incomingConnectionFactories.toArray(new IceInternal.IncomingConnectionFactory[0]); } - incomingConnectionFactories = - _incomingConnectionFactories.toArray(new IceInternal.IncomingConnectionFactory[0]); + // + // Now we wait for until all incoming connection factories are + // finished. + // + for(IceInternal.IncomingConnectionFactory f : incomingConnectionFactories) + { + f.waitUntilFinished(); + } } - - // - // Now we wait for until all incoming connection factories are - // finished. - // - for(IceInternal.IncomingConnectionFactory f : incomingConnectionFactories) + catch (InterruptedException e) { - f.waitUntilFinished(); + throw new Ice.OperationInterruptedException(); } } @@ -331,7 +353,7 @@ public final class ObjectAdapterI implements ObjectAdapter public synchronized boolean isDeactivated() { - return _deactivated; + return _deactivated == DeactivatedState.Deactivated; } @Override @@ -352,6 +374,7 @@ public final class ObjectAdapterI implements ObjectAdapter } catch(InterruptedException ex) { + throw new Ice.OperationInterruptedException(); } } @@ -384,7 +407,18 @@ public final class ObjectAdapterI implements ObjectAdapter if(_threadPool != null) { _threadPool.destroy(); - _threadPool.joinWithAllThreads(); + try + { + _threadPool.joinWithAllThreads(); + } + catch (InterruptedException e) + { + synchronized(this) + { + _destroying = false; + } + throw new Ice.OperationInterruptedException(); + } } IceInternal.ObjectAdapterFactory objectAdapterFactory; @@ -495,7 +529,7 @@ public final class ObjectAdapterI implements ObjectAdapter } @Override - public synchronized java.util.Map<String, Ice.Object> + public synchronized Map<String, Object> removeAllFacets(Identity ident) { checkForDeactivation(); @@ -877,7 +911,7 @@ public final class ObjectAdapterI implements ObjectAdapter IceInternal.ObjectAdapterFactory objectAdapterFactory, String name, RouterPrx router, boolean noConfig) { - _deactivated = false; + _deactivated = DeactivatedState.Steady; _instance = instance; _communicator = communicator; _objectAdapterFactory = objectAdapterFactory; @@ -929,7 +963,7 @@ public final class ObjectAdapterI implements ObjectAdapter // // These need to be set to prevent finalizer from complaining. // - _deactivated = true; + _deactivated = DeactivatedState.Deactivated; _destroyed = true; _instance = null; _incomingConnectionFactories = null; @@ -1089,7 +1123,7 @@ public final class ObjectAdapterI implements ObjectAdapter { try { - if(!_deactivated) + if(_deactivated != DeactivatedState.Deactivated) { _instance.initializationData().logger.warning("object adapter `" + getName() + "' has not been deactivated"); @@ -1173,7 +1207,7 @@ public final class ObjectAdapterI implements ObjectAdapter private void checkForDeactivation() { - if(_deactivated) + if(_deactivated != DeactivatedState.Steady) { ObjectAdapterDeactivatedException ex = new ObjectAdapterDeactivatedException(); ex.name = getName(); @@ -1606,7 +1640,12 @@ public final class ObjectAdapterI implements ObjectAdapter return noProps; } - private boolean _deactivated; + private enum DeactivatedState { + Steady, + Deactivating, + Deactivated + }; + private DeactivatedState _deactivated; private IceInternal.Instance _instance; private Communicator _communicator; private IceInternal.ObjectAdapterFactory _objectAdapterFactory; diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index 27504e0d4c2..2f9d1cd67d4 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -10,6 +10,7 @@ package Ice; import Ice.Instrumentation.InvocationObserver; +import IceInternal.QueueRequestHandler; /** * Base class for all proxies. @@ -2391,8 +2392,15 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.RequestHandler handler = null; try { - handler = __getRequestHandler(false); - return handler.getConnection(true); + handler = __getRequestHandler(); + try + { + return handler.waitForConnection(); + } + catch (InterruptedException e) + { + throw new Ice.OperationInterruptedException(); + } } catch(Ice.Exception ex) { @@ -2412,6 +2420,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } catch(InterruptedException ex1) { + throw new Ice.OperationInterruptedException(); } } } @@ -2460,7 +2469,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable { try { - return handler.getConnection(false); + return handler.getConnection(); } catch(LocalException ex) { @@ -2477,7 +2486,14 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable ice_flushBatchRequests() { IceInternal.BatchOutgoing __og = new IceInternal.BatchOutgoing(this, __ice_flushBatchRequests_name); - __og.invoke(); + try + { + __og.invoke(); + } + catch(InterruptedException ex) + { + throw new Ice.OperationInterruptedException(); + } } /** @@ -2746,7 +2762,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } public final IceInternal.RequestHandler - __getRequestHandler(boolean async) + __getRequestHandler() { if(_reference.getCacheConnection()) { @@ -2756,16 +2772,12 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable { return _requestHandler; } - // async = true to avoid blocking with the proxy mutex locked. - _requestHandler = createRequestHandler(true); + _requestHandler = createRequestHandler(); return _requestHandler; } } - final int mode = _reference.getMode(); - return createRequestHandler(async || - mode == IceInternal.Reference.ModeBatchOneway || - mode == IceInternal.Reference.ModeBatchDatagram); + return createRequestHandler(); } public void @@ -2777,7 +2789,21 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable { if(previous == _requestHandler) { - _requestHandler = handler; + if(handler != null) + { + if(_reference.getInstance().queueRequests()) + { + _requestHandler = new QueueRequestHandler(_reference.getInstance(), handler); + } + else + { + _requestHandler = handler; + } + } + else + { + _requestHandler = null; + } } else if(previous != null && _requestHandler != null) { @@ -2788,9 +2814,23 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable // update the request handler. See bug ICE-5489 for reasons why // this can be useful. // - if(previous.getConnection(false) == _requestHandler.getConnection(false)) + if(previous.getConnection() == _requestHandler.getConnection()) { - _requestHandler = handler; + if(handler != null) + { + if(_reference.getInstance().queueRequests()) + { + _requestHandler = new QueueRequestHandler(_reference.getInstance(), handler); + } + else + { + _requestHandler = handler; + } + } + else + { + _requestHandler = null; + } } } catch(Ice.Exception ex) @@ -2803,7 +2843,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } private IceInternal.RequestHandler - createRequestHandler(boolean async) + createRequestHandler() { if(_reference.getCollocationOptimized()) { @@ -2814,14 +2854,12 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } } - if(async) - { - return (new IceInternal.ConnectRequestHandler(_reference, this)).connect(); - } - else + IceInternal.RequestHandler handler = (new IceInternal.ConnectRequestHandler(_reference, this)).connect(); + if(_reference.getInstance().queueRequests()) { - return new IceInternal.ConnectionRequestHandler(_reference, this); + handler = new QueueRequestHandler(_reference.getInstance(), handler); } + return handler; } // diff --git a/java/src/Ice/ThreadNotification.java b/java/src/Ice/ThreadNotification.java index ef274ee9b41..f98e7570fa8 100644 --- a/java/src/Ice/ThreadNotification.java +++ b/java/src/Ice/ThreadNotification.java @@ -11,7 +11,7 @@ package Ice; /** * Interface for thread notification hooks. Applications can derive - * a class tat implements the <code>start</code> and <code>stop</code> + * a class that implements the <code>start</code> and <code>stop</code> * methods to intercept creation and destruction of threads created * by the Ice run time. * diff --git a/java/src/Ice/Util.java b/java/src/Ice/Util.java index ece779d6e06..e591f4b75af 100644 --- a/java/src/Ice/Util.java +++ b/java/src/Ice/Util.java @@ -736,7 +736,6 @@ public final class Util public final static Ice.EncodingVersion Encoding_1_0 = new Ice.EncodingVersion((byte)1, (byte)0); public final static Ice.EncodingVersion Encoding_1_1 = new Ice.EncodingVersion((byte)1, (byte)1); - private static String _localAddress = null; private static java.lang.Object _processLoggerMutex = new java.lang.Object(); private static Logger _processLogger = null; } diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java index 8d8a999e5af..efc602aa773 100644 --- a/java/src/IceInternal/BasicStream.java +++ b/java/src/IceInternal/BasicStream.java @@ -9,6 +9,8 @@ package IceInternal; +import java.io.IOException; + public class BasicStream { public @@ -941,16 +943,31 @@ public class BasicStream { return null; } + ObjectInputStream in = null; try { InputStreamWrapper w = new InputStreamWrapper(sz, this); - ObjectInputStream in = new ObjectInputStream(_instance, w); + in = new ObjectInputStream(_instance, w); return (java.io.Serializable)in.readObject(); } catch(java.lang.Exception ex) { throw new Ice.MarshalException("cannot deserialize object", ex); } + finally + { + if(in != null) + { + try + { + in.close(); + } + catch (IOException ex) + { + throw new Ice.MarshalException("cannot deserialize object", ex); + } + } + } } public void diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java index a3294769e56..a5f958eb10b 100644 --- a/java/src/IceInternal/BatchOutgoing.java +++ b/java/src/IceInternal/BatchOutgoing.java @@ -35,6 +35,7 @@ public final class BatchOutgoing implements OutgoingMessageCallback public void invoke() + throws InterruptedException { assert(_proxy != null || _connection != null); @@ -49,13 +50,7 @@ public final class BatchOutgoing implements OutgoingMessageCallback { while(_exception == null && !_sent) { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + wait(); } if(_exception != null) { @@ -68,7 +63,7 @@ public final class BatchOutgoing implements OutgoingMessageCallback RequestHandler handler = null; try { - handler = _proxy.__getRequestHandler(false); + handler = _proxy.__getRequestHandler(); if(handler.sendRequest(this)) { return; @@ -84,17 +79,11 @@ public final class BatchOutgoing implements OutgoingMessageCallback long deadline = now + timeout; while(_exception == null && !_sent && !timedOut) { - try - { - wait(deadline - now); - if(_exception == null && !_sent) - { - now = Time.currentMonotonicTimeMillis(); - timedOut = now >= deadline; - } - } - catch(InterruptedException ex) + wait(deadline - now); + if(_exception == null && !_sent) { + now = Time.currentMonotonicTimeMillis(); + timedOut = now >= deadline; } } } @@ -102,32 +91,21 @@ public final class BatchOutgoing implements OutgoingMessageCallback { while(_exception == null && !_sent) { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + wait(); } } } if(timedOut) { - handler.requestTimedOut(this); - - synchronized(this) + if(handler.requestCanceled(this, new Ice.InvocationTimeoutException())) { - while(_exception == null) + synchronized(this) { - try + while(_exception == null) { wait(); } - catch(InterruptedException ex) - { - } } } } diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java index 1d839796356..74777db0d4b 100644 --- a/java/src/IceInternal/BatchOutgoingAsync.java +++ b/java/src/IceInternal/BatchOutgoingAsync.java @@ -86,7 +86,7 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync @Override public void - __dispatchInvocationTimeout(ThreadPool threadPool, Ice.Connection connection) + __dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) { threadPool.dispatch( new DispatchWorkItem(connection) @@ -95,7 +95,7 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync public void run() { - BatchOutgoingAsync.this.__finished(new Ice.InvocationTimeoutException()); + BatchOutgoingAsync.this.__finished(ex); } }); } diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java index 1b47feccbb9..4c79b2e3c58 100644 --- a/java/src/IceInternal/CollocatedRequestHandler.java +++ b/java/src/IceInternal/CollocatedRequestHandler.java @@ -97,17 +97,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler synchronized public void prepareBatchRequest(BasicStream os) { - while(_batchStreamInUse) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } - + waitStreamInUse(); if(_batchStream.isEmpty()) { try @@ -232,8 +222,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } @Override - synchronized public void - requestTimedOut(OutgoingMessageCallback out) + synchronized public boolean + requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex) { Integer requestId = _sendRequests.get(out); if(requestId != null) @@ -242,8 +232,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { _requests.remove(requestId); } - out.finished(new Ice.InvocationTimeoutException()); + out.finished(ex); _sendRequests.remove(out); + return true; } else if(out instanceof Outgoing) { @@ -253,17 +244,18 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { if(e.getValue() == o) { - out.finished(new Ice.InvocationTimeoutException()); + out.finished(ex); _requests.remove(e.getKey()); - return; // We're done. + return true; // We're done. } } } + return false; } @Override - synchronized public void - asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync) + synchronized public boolean + asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex) { Integer requestId = _sendAsyncRequests.get(outAsync); if(requestId != null) @@ -273,8 +265,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler _asyncRequests.remove(requestId); } _sendAsyncRequests.remove(outAsync); - outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null); - return; // We're done + outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); + return true; // We're done } if(outAsync instanceof OutgoingAsync) @@ -286,11 +278,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler if(e.getValue() == o) { _asyncRequests.remove(e.getKey()); - outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null); - return; // We're done + outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); + return true; // We're done } } } + return false; } public void @@ -364,17 +357,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler int invokeNum; synchronized(this) { - while(_batchStreamInUse) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } - + waitStreamInUse(); invokeNum = _batchRequestNum; if(_batchRequestNum > 0) @@ -428,16 +411,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler int invokeNum; synchronized(this) { - while(_batchStreamInUse) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } + waitStreamInUse(); invokeNum = _batchRequestNum; if(_batchRequestNum > 0) @@ -570,7 +544,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler @Override public Ice.ConnectionI - getConnection(boolean wait) + getConnection() + { + return null; + } + + @Override + public Ice.ConnectionI + waitForConnection() { return null; } @@ -720,6 +701,36 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } + private void + waitStreamInUse() + { + // + // This is similar to a mutex lock in that the stream is + // only "locked" while marshaling. As such we don't permit the wait + // to be interrupted. Instead the interrupted status is saved and + // restored. + // + boolean interrupted = false; + while(_batchStreamInUse) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + interrupted = true; + } + } + // + // Restore the interrupted flag if we were interrupted. + // + if(interrupted) + { + Thread.currentThread().interrupt(); + } + } + private final Reference _reference; private final boolean _dispatcher; private final boolean _response; diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java index 6e96ea9f81f..3c3d07bcc4e 100644 --- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java +++ b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java @@ -60,8 +60,10 @@ public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult return false; } + // TODO: MJN: This is missing a test. + @Override public void - __finished(Ice.LocalException ex, boolean sent) + __finished(Ice.Exception ex) { if(_childObserver != null) { diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 276395ef3ef..9cc70b6a76d 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -9,6 +9,8 @@ package IceInternal; +import Ice.ConnectionI; + public class ConnectRequestHandler implements RequestHandler, Reference.GetConnectionCallback, RouterInfo.AddProxyCallback { @@ -62,17 +64,7 @@ public class ConnectRequestHandler { synchronized(this) { - while(_batchRequestInProgress) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } - + waitBatchRequestInProgress(); try { if(!initialized()) @@ -189,14 +181,14 @@ public class ConnectRequestHandler } @Override - public void - requestTimedOut(OutgoingMessageCallback out) + public boolean + requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex) { synchronized(this) { if(_exception != null) { - return; // The request has been notified of a failure already. + return false; // The request has been notified of a failure already. } if(!initialized()) @@ -207,26 +199,26 @@ public class ConnectRequestHandler Request request = it.next(); if(request.out == out) { - out.finished(new Ice.InvocationTimeoutException()); + out.finished(ex); it.remove(); - return; + return true; } } assert(false); // The request has to be queued if it timed out and we're not initialized yet. } } - _connection.requestTimedOut(out); + return _connection.requestCanceled(out, ex); } @Override - public void - asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync) + public boolean + asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex) { synchronized(this) { if(_exception != null) { - return; // The request has been notified of a failure already. + return false; // The request has been notified of a failure already. } if(!initialized()) @@ -238,14 +230,14 @@ public class ConnectRequestHandler if(request.outAsync == outAsync) { it.remove(); - outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null); - return; // We're done + outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); + return true; // We're done } } assert(false); // The request has to be queued if it timed out and we're not initialized yet. } } - _connection.asyncRequestTimedOut(outAsync); + return _connection.asyncRequestCanceled(outAsync, ex); } @Override @@ -256,37 +248,33 @@ public class ConnectRequestHandler } @Override - synchronized public Ice.ConnectionI - getConnection(boolean waitInit) - { - if(waitInit) - { - // - // Wait for the connection establishment to complete or fail. - // - while(!_initialized && _exception == null) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } - } - + synchronized public ConnectionI + getConnection() { if(_exception != null) { throw (Ice.LocalException)_exception.fillInStackTrace(); } else { - assert(!waitInit || _initialized); return _connection; } } + @Override + synchronized public + ConnectionI waitForConnection() + throws InterruptedException + { + // + // Wait for the connection establishment to complete or fail. + // + while(!_initialized && _exception == null) + { + wait(); + } + return getConnection(); + } + // // Implementation of Reference.GetConnectionCallback // @@ -338,14 +326,14 @@ public class ConnectRequestHandler if(!_requests.isEmpty()) { _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection) - { - @Override - public void - run() - { - flushRequestsWithException(); - }; - }); + { + @Override + public void + run() + { + flushRequestsWithException(); + }; + }); } notifyAll(); @@ -393,16 +381,29 @@ public class ConnectRequestHandler } else { + // + // This is similar to a mutex lock in that the flag is + // only true for a short period of time. + // + boolean interrupted = false; while(_flushing && _exception == null) { try { wait(); } - catch(java.lang.InterruptedException ex) + catch(InterruptedException ex) { + interrupted = true; } } + // + // Restore the interrupted status. + // + if(interrupted) + { + Thread.currentThread().interrupt(); + } if(_exception != null) { @@ -421,17 +422,7 @@ public class ConnectRequestHandler synchronized(this) { assert(_connection != null && !_initialized); - - while(_batchRequestInProgress) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } + waitBatchRequestInProgress(); // // We set the _flushing flag to true to prevent any additional queuing. Callers @@ -566,7 +557,35 @@ public class ConnectRequestHandler } } - void + private void + waitBatchRequestInProgress() + { + // + // This is similar to a mutex lock in that the stream is + // only "locked" while the request is in progress. + // + boolean interrupted = false; + while(_batchRequestInProgress) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + interrupted = true; + } + } + // + // Restore the interrupted flag if we were interrupted. + // + if(interrupted) + { + Thread.currentThread().interrupt(); + } + } + + private void flushRequestsWithException() { for(Request request : _requests) diff --git a/java/src/IceInternal/ConnectionRequestHandler.java b/java/src/IceInternal/ConnectionRequestHandler.java index 8d44be04e5b..4dcf9db63e8 100644 --- a/java/src/IceInternal/ConnectionRequestHandler.java +++ b/java/src/IceInternal/ConnectionRequestHandler.java @@ -13,54 +13,51 @@ public class ConnectionRequestHandler implements RequestHandler { @Override public void - prepareBatchRequest(BasicStream out) - throws RetryException - { + prepareBatchRequest(BasicStream out) throws RetryException { _connection.prepareBatchRequest(out); } @Override public void - finishBatchRequest(BasicStream out) - { + finishBatchRequest(BasicStream out) { _connection.finishBatchRequest(out, _compress); } @Override public void - abortBatchRequest() - { + abortBatchRequest() { _connection.abortBatchRequest(); } @Override public boolean sendRequest(OutgoingMessageCallback out) - throws RetryException - { - return out.send(_connection, _compress, _response) && !_response; // Finished if sent and no response + throws RetryException { + // + // Finished if sent and no response. + // + return out.send(_connection, _compress, _response) && !_response; } @Override public int sendAsyncRequest(OutgoingAsyncMessageCallback out) - throws RetryException - { + throws RetryException { return out.__send(_connection, _compress, _response); } @Override - public void - requestTimedOut(OutgoingMessageCallback out) + public boolean + requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex) { - _connection.requestTimedOut(out); + return _connection.requestCanceled(out, ex); } @Override - public void - asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync) + public boolean + asyncRequestCanceled(OutgoingAsyncMessageCallback outgoingAsync, Ice.LocalException ex) { - _connection.asyncRequestTimedOut(outAsync); + return _connection.asyncRequestCanceled(outgoingAsync, ex); } @Override @@ -72,35 +69,20 @@ public class ConnectionRequestHandler implements RequestHandler @Override public Ice.ConnectionI - getConnection(boolean wait) + getConnection() { return _connection; } - public - ConnectionRequestHandler(Reference ref, Ice.ObjectPrx proxy) + @Override + public Ice.ConnectionI + waitForConnection() { - _reference = ref; - _response = _reference.getMode() == Reference.ModeTwoway; - - Ice.BooleanHolder compress = new Ice.BooleanHolder(); - _connection = _reference.getConnection(compress); - _compress = compress.value; - - // - // If this proxy is for a non-local object, and we are using a router, then - // add this proxy to the router info object. - // - IceInternal.RouterInfo ri = _reference.getRouterInfo(); - if(ri != null) - { - ri.addProxy(proxy); - } + return _connection; } - public - ConnectionRequestHandler(Reference ref, Ice.ConnectionI connection, boolean compress) - { + public ConnectionRequestHandler(Reference ref, Ice.ConnectionI connection, + boolean compress) { _reference = ref; _response = _reference.getMode() == Reference.ModeTwoway; _connection = connection; @@ -111,4 +93,5 @@ public class ConnectionRequestHandler implements RequestHandler private final boolean _response; private final Ice.ConnectionI _connection; private final boolean _compress; + } diff --git a/java/src/IceInternal/EndpointHostResolver.java b/java/src/IceInternal/EndpointHostResolver.java index 66a75956a3d..ee49e23ddf2 100644 --- a/java/src/IceInternal/EndpointHostResolver.java +++ b/java/src/IceInternal/EndpointHostResolver.java @@ -18,13 +18,10 @@ public class EndpointHostResolver _preferIPv6 = instance.preferIPv6(); try { - _thread = new HelperThread(); + _threadName = Util.createThreadName(_instance.initializationData().properties, "Ice.HostResolver"); + _executor = java.util.concurrent.Executors.newFixedThreadPool(1, + Util.createThreadFactory(_instance.initializationData().properties, _threadName)); updateObserver(); - if(_instance.initializationData().properties.getProperty("Ice.ThreadPriority").length() > 0) - { - _thread.setPriority(Util.getThreadPriorityProperty(_instance.initializationData().properties, "Ice")); - } - _thread.start(); } catch(RuntimeException ex) { @@ -92,8 +89,8 @@ public class EndpointHostResolver return connectors; } - synchronized public void resolve(String host, int port, Ice.EndpointSelectionType selType, IPEndpointI endpoint, - EndpointI_connectors callback) + synchronized public void resolve(final String host, final int port, final Ice.EndpointSelectionType selType, final IPEndpointI endpoint, + final EndpointI_connectors callback) { // // TODO: Optimize to avoid the lookup if the given host is a textual IPv4 or IPv6 @@ -103,134 +100,105 @@ public class EndpointHostResolver assert(!_destroyed); - ResolveEntry entry = new ResolveEntry(); - entry.host = host; - entry.port = port; - entry.selType = selType; - entry.endpoint = endpoint; - entry.callback = callback; - - Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); - if(obsv != null) + final Ice.Instrumentation.ThreadObserver threadObserver = _observer; + final Ice.Instrumentation.Observer observer = getObserver(endpoint); + if(observer != null) { - entry.observer = obsv.getEndpointLookupObserver(endpoint); - if(entry.observer != null) - { - entry.observer.attach(); - } + observer.attach(); } - _queue.add(entry); - notify(); - } - - synchronized public void destroy() - { - assert(!_destroyed); - _destroyed = true; - notify(); - } - - public void joinWithThread() - { - if(_thread != null) - { - try - { - _thread.join(); - } - catch(InterruptedException ex) - { - } - if(_observer != null) + _executor.execute(new Runnable() { - _observer.detach(); - } - } - } - - public void run() - { - while(true) - { - ResolveEntry r; - Ice.Instrumentation.ThreadObserver threadObserver; - synchronized(this) - { - while(!_destroyed && _queue.isEmpty()) + @Override + public void run() { + synchronized(EndpointHostResolver.this) + { + if(_destroyed) + { + Ice.CommunicatorDestroyedException ex = new Ice.CommunicatorDestroyedException(); + if(observer != null) + { + observer.failed(ex.ice_name()); + observer.detach(); + } + callback.exception(ex); + return; + } + } + try { - wait(); + if(threadObserver != null) + { + threadObserver.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateIdle, + Ice.Instrumentation.ThreadState.ThreadStateInUseForOther); + } + + NetworkProxy networkProxy = _instance.networkProxy(); + if(networkProxy != null) + { + networkProxy = networkProxy.resolveHost(); + } + + callback.connectors(endpoint.connectors(Network.getAddresses(host, + port, + _protocol, + selType, + _preferIPv6), + networkProxy)); + + if(threadObserver != null) + { + threadObserver.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateInUseForOther, + Ice.Instrumentation.ThreadState.ThreadStateIdle); + } + + if(observer != null) + { + observer.detach(); + } } - catch(java.lang.InterruptedException ex) + catch(Ice.LocalException ex) { + if(observer != null) + { + observer.failed(ex.ice_name()); + observer.detach(); + } + callback.exception(ex); } } + }); + } - if(_destroyed) - { - break; - } - - r = _queue.removeFirst(); - threadObserver = _observer; - } - - try - { - if(threadObserver != null) - { - threadObserver.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateIdle, - Ice.Instrumentation.ThreadState.ThreadStateInUseForOther); - } - - NetworkProxy networkProxy = _instance.networkProxy(); - if(networkProxy != null) - { - networkProxy = networkProxy.resolveHost(); - } - - r.callback.connectors(r.endpoint.connectors(Network.getAddresses(r.host, - r.port, - _protocol, - r.selType, - _preferIPv6), - networkProxy)); + synchronized public void destroy() + { + assert(!_destroyed); + _destroyed = true; - if(threadObserver != null) - { - threadObserver.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateInUseForOther, - Ice.Instrumentation.ThreadState.ThreadStateIdle); - } + // + // Shutdown the executor. No new tasks will be accepted. + // Existing tasks will execute. + // + _executor.shutdown(); + } - if(r.observer != null) - { - r.observer.detach(); - } - } - catch(Ice.LocalException ex) - { - if(r.observer != null) - { - r.observer.failed(ex.ice_name()); - r.observer.detach(); - } - r.callback.exception(ex); - } + public void joinWithThread() + throws InterruptedException + { + // Wait for the executor to terminate. + try + { + _executor.awaitTermination(Long.MAX_VALUE, java.util.concurrent.TimeUnit.NANOSECONDS); } - - for(ResolveEntry entry : _queue) + finally { - Ice.CommunicatorDestroyedException ex = new Ice.CommunicatorDestroyedException(); - if(entry.observer != null) + if(_observer != null) { - entry.observer.failed(ex.ice_name()); - entry.observer.detach(); + _observer.detach(); } - entry.callback.exception(ex); } - _queue.clear(); } synchronized public void updateObserver() @@ -238,8 +206,7 @@ public class EndpointHostResolver Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) { - _observer = obsv.getThreadObserver("Communicator", - _thread.getName(), + _observer = obsv.getThreadObserver("Communicator", _threadName, Ice.Instrumentation.ThreadState.ThreadStateIdle, _observer); if(_observer != null) @@ -249,49 +216,22 @@ public class EndpointHostResolver } } - static class ResolveEntry + private Ice.Instrumentation.Observer + getObserver(IPEndpointI endpoint) { - String host; - int port; - Ice.EndpointSelectionType selType; - IPEndpointI endpoint; - EndpointI_connectors callback; - Ice.Instrumentation.Observer observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); + if(obsv != null) + { + return obsv.getEndpointLookupObserver(endpoint); + } + return null; } private final Instance _instance; private final int _protocol; private final boolean _preferIPv6; private boolean _destroyed; - private java.util.LinkedList<ResolveEntry> _queue = new java.util.LinkedList<ResolveEntry>(); private Ice.Instrumentation.ThreadObserver _observer; - - private final class HelperThread extends Thread - { - HelperThread() - { - String threadName = _instance.initializationData().properties.getProperty("Ice.ProgramName"); - if(threadName.length() > 0) - { - threadName += "-"; - } - setName(threadName + "Ice.HostResolver"); - } - - @Override - public void run() - { - try - { - EndpointHostResolver.this.run(); - } - catch(java.lang.Exception ex) - { - String s = "exception in endpoint host resolver thread " + getName() + ":\n" + Ex.toString(ex); - _instance.initializationData().logger.error(s); - } - } - } - - private HelperThread _thread; + private String _threadName; + private java.util.concurrent.ExecutorService _executor; } diff --git a/java/src/IceInternal/FixedReference.java b/java/src/IceInternal/FixedReference.java index 9df5d5f4b68..c930fe6156c 100644 --- a/java/src/IceInternal/FixedReference.java +++ b/java/src/IceInternal/FixedReference.java @@ -210,78 +210,70 @@ public class FixedReference extends Reference } @Override - public Ice.ConnectionI - getConnection(Ice.BooleanHolder compress) + public void + getConnection(GetConnectionCallback callback) { - switch(getMode()) + try { - case Reference.ModeTwoway: - case Reference.ModeOneway: - case Reference.ModeBatchOneway: + Ice.BooleanHolder compress = new Ice.BooleanHolder(); + switch(getMode()) { - if(_fixedConnection.endpoint().datagram()) + case Reference.ModeTwoway: + case Reference.ModeOneway: + case Reference.ModeBatchOneway: { - throw new Ice.NoEndpointException(""); + if(_fixedConnection.endpoint().datagram()) + { + throw new Ice.NoEndpointException(""); + } + break; } - break; - } - case Reference.ModeDatagram: - case Reference.ModeBatchDatagram: - { - if(!_fixedConnection.endpoint().datagram()) + case Reference.ModeDatagram: + case Reference.ModeBatchDatagram: { - throw new Ice.NoEndpointException(""); + if(!_fixedConnection.endpoint().datagram()) + { + throw new Ice.NoEndpointException(""); + } + break; } - break; } - } - - // - // If a secure connection is requested or secure overrides is set, - // check if the connection is secure. - // - boolean secure; - DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides(); - if(defaultsAndOverrides.overrideSecure) - { - secure = defaultsAndOverrides.overrideSecureValue; - } - else - { - secure = getSecure(); - } - if(secure && !_fixedConnection.endpoint().secure()) - { - throw new Ice.NoEndpointException(""); - } - _fixedConnection.throwException(); // Throw in case our connection is already destroyed. + // + // If a secure connection is requested or secure overrides is set, + // check if the connection is secure. + // + boolean secure; + DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides(); + if(defaultsAndOverrides.overrideSecure) + { + secure = defaultsAndOverrides.overrideSecureValue; + } + else + { + secure = getSecure(); + } + if(secure && !_fixedConnection.endpoint().secure()) + { + throw new Ice.NoEndpointException(""); + } - if(defaultsAndOverrides.overrideCompress) - { - compress.value = defaultsAndOverrides.overrideCompressValue; - } - else if(_overrideCompress) - { - compress.value = _compress; - } - else - { - compress.value = _fixedConnection.endpoint().compress(); - } - return _fixedConnection; - } + _fixedConnection.throwException(); // Throw in case our connection is already destroyed. - @Override - public void - getConnection(GetConnectionCallback callback) - { - try - { - Ice.BooleanHolder compress = new Ice.BooleanHolder(); - Ice.ConnectionI connection = getConnection(compress); - callback.setConnection(connection, compress.value); + if(defaultsAndOverrides.overrideCompress) + { + compress.value = defaultsAndOverrides.overrideCompressValue; + } + else if(_overrideCompress) + { + compress.value = _compress; + } + else + { + compress.value = _fixedConnection.endpoint().compress(); + } + callback.setConnection(_fixedConnection, compress.value); } catch(Ice.LocalException ex) { diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index 683d619875c..e0413fc1b5d 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -40,6 +40,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice public void waitUntilHolding() + throws InterruptedException { java.util.LinkedList<Ice.ConnectionI> connections; @@ -51,13 +52,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice // while(_state < StateHolding) { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + wait(); } // @@ -78,6 +73,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice public void waitUntilFinished() + throws InterruptedException { java.util.LinkedList<Ice.ConnectionI> connections = null; @@ -89,13 +85,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice // while(_state != StateFinished) { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + wait(); } // @@ -114,7 +104,20 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice { for(Ice.ConnectionI connection : connections) { - connection.waitUntilFinished(); + try + { + connection.waitUntilFinished(); + } + catch(InterruptedException e) + { + // + // Force close all of the connections. + // + for(Ice.ConnectionI c : connections) + { + c.close(true); + } + } } } @@ -375,7 +378,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice Ice.ConnectionI connection = new Ice.ConnectionI(_adapter.getCommunicator(), _instance, null, _transceiver, null, _endpoint, _adapter); - connection.start(null); + connection.startAndWait(); _connections.add(connection); } else @@ -425,6 +428,10 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice { throw (Ice.LocalException)ex; } + else if(ex instanceof InterruptedException) + { + throw new Ice.OperationInterruptedException(); + } else { throw new Ice.SyscallException(ex); diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index 8eb6de0f210..fbe2f3333fb 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -9,28 +9,27 @@ package IceInternal; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + public final class Instance { private class ObserverUpdaterI implements Ice.Instrumentation.ObserverUpdater { - ObserverUpdaterI(Instance instance) - { - _instance = instance; - } - - @Override public void + @Override + public void updateConnectionObservers() { - _instance.updateConnectionObservers(); + Instance.this.updateConnectionObservers(); } - @Override public void + @Override + public void updateThreadObservers() { - _instance.updateThreadObservers(); + Instance.this.updateThreadObservers(); } - - final private Instance _instance; } public Ice.InitializationData @@ -603,6 +602,18 @@ public final class Instance return _useApplicationClassLoader; } + public boolean + queueRequests() + { + return _hasQueueExecutor; + } + + public ExecutorService + getQueueExecutor() + { + return _queueExecutor; + } + // // Only for use by Ice.CommunicatorI // @@ -735,7 +746,6 @@ public final class Instance } } - _cacheMessageBuffers = _initData.properties.getPropertyAsIntWithDefault("Ice.CacheMessageBuffers", 2); _implicitContext = Ice.ImplicitContextI.create(_initData.properties.getProperty("Ice.ImplicitContext")); @@ -834,6 +844,22 @@ public final class Instance { _observer = _initData.observer; } + + if(_initData.properties.getPropertyAsInt("Ice.BackgroundIO") > 0) + { + _hasQueueExecutor = true; + _queueExecutor = Executors.newFixedThreadPool(1, + Util.createThreadFactory(_initData.properties, + Util.createThreadName(_initData.properties, "Ice.BackgroundIO"))); + // + // If background IO is enabled message buffers cannot be cached. + // + _cacheMessageBuffers = 0; + } + else + { + _cacheMessageBuffers = _initData.properties.getPropertyAsIntWithDefault("Ice.CacheMessageBuffers", 2); + } } catch(Ice.LocalException ex) { @@ -889,7 +915,7 @@ public final class Instance // if(_observer != null) { - _observer.setObserverUpdater(new ObserverUpdaterI(this)); + _observer.setObserverUpdater(new ObserverUpdaterI()); } // @@ -899,29 +925,9 @@ public final class Instance { java.util.concurrent.ScheduledThreadPoolExecutor executor = new java.util.concurrent.ScheduledThreadPoolExecutor(1, - new java.util.concurrent.ThreadFactory() - { - @Override - public Thread newThread(Runnable r) - { - Thread t = new Thread(r); - if(initializationData().properties.getProperty("Ice.ThreadPriority").length() > 0) - { - final int priority = Util.getThreadPriorityProperty( - initializationData().properties, "Ice"); - t.setPriority(priority); - } - - String threadName = initializationData().properties.getProperty("Ice.ProgramName"); - if(threadName.length() > 0) - { - threadName += "-"; - } - t.setName(threadName + "Ice.Timer"); + Util.createThreadFactory(_initData.properties, + Util.createThreadName(_initData.properties, "Ice.Timer"))); - return t; - } - }); executor.setRemoveOnCancelPolicy(true); executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); _timer = executor; @@ -1014,6 +1020,7 @@ public final class Instance _state = StateDestroyInProgress; } + if(_objectAdapterFactory != null) { _objectAdapterFactory.shutdown(); @@ -1031,7 +1038,19 @@ public final class Instance if(_outgoingConnectionFactory != null) { - _outgoingConnectionFactory.waitUntilFinished(); + try + { + _outgoingConnectionFactory.waitUntilFinished(); + } + catch (InterruptedException e) + { + // + // Restore the interrupt, otherwise the instance will be + // left in an undefined state. The thread joins below will + // interrupt which is fine. + // + Thread.currentThread().interrupt(); + } } if(_retryQueue != null) @@ -1132,20 +1151,41 @@ public final class Instance _state = StateDestroyed; } - // - // Join with threads outside the synchronization. - // - if(clientThreadPool != null) + try { - clientThreadPool.joinWithAllThreads(); + // + // Join with threads outside the synchronization. + // + if(clientThreadPool != null) + { + clientThreadPool.joinWithAllThreads(); + } + if(serverThreadPool != null) + { + serverThreadPool.joinWithAllThreads(); + } + if(endpointHostResolver != null) + { + endpointHostResolver.joinWithThread(); + } } - if(serverThreadPool != null) + catch(InterruptedException ex) { - serverThreadPool.joinWithAllThreads(); + throw new Ice.OperationInterruptedException(); } - if(endpointHostResolver != null) + + if(_queueExecutor != null) { - endpointHostResolver.joinWithThread(); + _queueExecutor.shutdown(); + try + { + _queueExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } + catch (InterruptedException e) + { + throw new Ice.OperationInterruptedException(); + } + _queueExecutor = null; } if(_initData.properties.getPropertyAsInt("Ice.Warn.UnusedProperties") > 0) @@ -1288,4 +1328,6 @@ public final class Instance final private boolean _useApplicationClassLoader; private static boolean _oneOffDone = false; + private boolean _hasQueueExecutor = false; + private ExecutorService _queueExecutor; } diff --git a/java/src/IceInternal/LocatorInfo.java b/java/src/IceInternal/LocatorInfo.java index 5afb2a3170a..7f043465c71 100644 --- a/java/src/IceInternal/LocatorInfo.java +++ b/java/src/IceInternal/LocatorInfo.java @@ -117,66 +117,6 @@ public final class LocatorInfo } } - synchronized EndpointI[] - getEndpoints(Reference ref, Reference wellKnownRef, int ttl, Ice.BooleanHolder cached) - { - if(!_response || _exception == null) - { - if(wellKnownRef != null) // This request is to resolve the endpoints of a cached well-known object ref - { - _wellKnownRefs.add(wellKnownRef); - } - if(!_sent) - { - _sent = true; - send(); - } - - while(!_response && _exception == null) - { - try - { - wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } - } - - if(_exception != null) - { - _locatorInfo.getEndpointsException(ref, _exception); // This throws. - } - - assert(_response); - EndpointI[] endpoints = null; - if(_proxy != null) - { - Reference r = ((Ice.ObjectPrxHelperBase)_proxy).__reference(); - if(!r.isIndirect()) - { - endpoints = r.getEndpoints(); - } - else if(ref.isWellKnown() && !r.isWellKnown()) - { - // - // We're resolving the endpoints of a well-known object and the proxy returned - // by the locator is an indirect proxy. We now need to resolve the endpoints - // of this indirect proxy. - // - return _locatorInfo.getEndpoints(r, ref, ttl, cached); - } - } - - cached.value = false; - if(_ref.getInstance().traceLevels().location >= 1) - { - _locatorInfo.getEndpointsTrace(ref, endpoints, false); - } - return endpoints == null ? new EndpointI[0] : endpoints; - } - Request(LocatorInfo locatorInfo, Reference ref) { _locatorInfo = locatorInfo; @@ -397,67 +337,6 @@ public final class LocatorInfo } } - public EndpointI[] - getEndpoints(Reference ref, int ttl, Ice.BooleanHolder cached) - { - return getEndpoints(ref, null, ttl, cached); - } - - public EndpointI[] - getEndpoints(Reference ref, Reference wellKnownRef, int ttl, Ice.BooleanHolder cached) - { - assert(ref.isIndirect()); - EndpointI[] endpoints = null; - cached.value = false; - if(!ref.isWellKnown()) - { - endpoints = _table.getAdapterEndpoints(ref.getAdapterId(), ttl, cached); - if(!cached.value) - { - if(_background && endpoints != null) - { - getAdapterRequest(ref).addCallback(ref, wellKnownRef, ttl, null); - } - else - { - return getAdapterRequest(ref).getEndpoints(ref, wellKnownRef, ttl, cached); - } - } - } - else - { - Reference r = _table.getObjectReference(ref.getIdentity(), ttl, cached); - if(!cached.value) - { - if(_background && r != null) - { - getObjectRequest(ref).addCallback(ref, null, ttl, null); - } - else - { - return getObjectRequest(ref).getEndpoints(ref, null, ttl, cached); - } - } - - if(!r.isIndirect()) - { - endpoints = r.getEndpoints(); - } - else if(!r.isWellKnown()) - { - return getEndpoints(r, ref, ttl, cached); - } - } - - assert(endpoints != null); - cached.value = true; - if(ref.getInstance().traceLevels().location >= 1) - { - getEndpointsTrace(ref, endpoints, true); - } - return endpoints; - } - public void getEndpoints(Reference ref, int ttl, GetEndpointsCallback callback) { diff --git a/java/src/IceInternal/ObjectAdapterFactory.java b/java/src/IceInternal/ObjectAdapterFactory.java index 308be141f5b..5b9688a70d7 100644 --- a/java/src/IceInternal/ObjectAdapterFactory.java +++ b/java/src/IceInternal/ObjectAdapterFactory.java @@ -26,12 +26,7 @@ public final class ObjectAdapterFactory return; } - _instance = null; - _communicator = null; - adapters = new java.util.LinkedList<Ice.ObjectAdapterI>(_adapters); - - notifyAll(); } // @@ -42,8 +37,16 @@ public final class ObjectAdapterFactory { adapter.deactivate(); } + + synchronized(this) + { + _instance = null; + _communicator = null; + notifyAll(); + } } + public void waitForShutdown() { @@ -61,6 +64,7 @@ public final class ObjectAdapterFactory } catch(InterruptedException ex) { + throw new Ice.OperationInterruptedException(); } } diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java index c4f79392e36..333671c8b86 100644 --- a/java/src/IceInternal/Outgoing.java +++ b/java/src/IceInternal/Outgoing.java @@ -97,11 +97,43 @@ public final class Outgoing implements OutgoingMessageCallback _exception = null; _sent = false; - _handler = _proxy.__getRequestHandler(false); - - if(_handler.sendRequest(this)) // Request sent and no response expected, we're done. + _handler = _proxy.__getRequestHandler(); + try + { + if(_handler.sendRequest(this)) // Request sent and no response expected, we're done. + { + return true; + } + } + catch(Ice.OperationInterruptedException ex) { - return true; + if(_handler.requestCanceled(this, new Ice.OperationInterruptedException())) + { + // + // Wait for the exception to propagate. It's possible the request handler ignores + // the timeout if there was a failure shortly before requestTimedOut got called. + // In this case, the exception should be set on the Outgoing. + // + synchronized(this) + { + boolean interrupted = false; + while(_exception == null) + { + try + { + wait(); + } + catch(InterruptedException ex2) + { + interrupted = true; + } + } + if(interrupted) + { + Thread.currentThread().interrupt(); + } + } + } } boolean timedOut = false; @@ -124,6 +156,7 @@ public final class Outgoing implements OutgoingMessageCallback } catch(InterruptedException ex) { + throw new Ice.OperationInterruptedException(); } if((_state == StateInProgress || !_sent) && _state != StateFailed) { @@ -142,6 +175,7 @@ public final class Outgoing implements OutgoingMessageCallback } catch(InterruptedException ex) { + throw new Ice.OperationInterruptedException(); } } } @@ -149,23 +183,30 @@ public final class Outgoing implements OutgoingMessageCallback if(timedOut) { - _handler.requestTimedOut(this); - - // - // Wait for the exception to propagate. It's possible the request handler ignores - // the timeout if there was a failure shortly before requestTimedOut got called. - // In this case, the exception should be set on the Outgoing. - // - synchronized(this) + if(_handler.requestCanceled(this, new Ice.InvocationTimeoutException())) { - while(_exception == null) + // + // Wait for the exception to propagate. It's possible the request handler ignores + // the timeout if there was a failure shortly before requestTimedOut got called. + // In this case, the exception should be set on the Outgoing. + // + synchronized(this) { - try + boolean interrupted = false; + while(_exception == null) { - wait(); + try + { + wait(); + } + catch(InterruptedException ex) + { + interrupted = true; + } } - catch(InterruptedException ex) + if(interrupted) { + Thread.currentThread().interrupt(); } } } @@ -203,6 +244,7 @@ public final class Outgoing implements OutgoingMessageCallback } catch(InterruptedException exi) { + throw new Ice.OperationInterruptedException(); } } } @@ -584,7 +626,7 @@ public final class Outgoing implements OutgoingMessageCallback { try { - _handler = _proxy.__getRequestHandler(true); + _handler = _proxy.__getRequestHandler(); _handler.prepareBatchRequest(_os); break; } diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 9095fdb94c5..2451e7dc99b 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -198,7 +198,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa @Override public void - __dispatchInvocationTimeout(ThreadPool threadPool, Ice.Connection connection) + __dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) { threadPool.dispatch( new DispatchWorkItem(connection) @@ -207,7 +207,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa public void run() { - OutgoingAsync.this.__finished(new Ice.InvocationTimeoutException()); + OutgoingAsync.this.__finished(ex); } }); } @@ -389,10 +389,10 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa { while(true) { + _handler = _proxy.__getRequestHandler(); try { _sent = false; - _handler = _proxy.__getRequestHandler(true); int status = _handler.sendAsyncRequest(this); if((status & AsyncStatus.Sent) > 0) { @@ -431,6 +431,15 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } break; } + catch(Ice.OperationInterruptedException ex) + { + // + // Clear the request handler, and cancel the outgoing request. + // + _proxy.__setRequestHandler(_handler, null); + _handler.asyncRequestCanceled(this, ex); + break; + } catch(RetryException ex) { _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry. diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java index 269ba2a0123..6c2259b35be 100644 --- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java +++ b/java/src/IceInternal/OutgoingAsyncMessageCallback.java @@ -43,7 +43,7 @@ public interface OutgoingAsyncMessageCallback void __finished(Ice.Exception ex); // - // Helper to dispatch invocation timeout. + // Helper to dispatch the cancellation exception. // - void __dispatchInvocationTimeout(ThreadPool threadPool, Ice.Connection connection); + void __dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection); } diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 63ebc0bd919..4a893444be8 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -81,227 +81,94 @@ public final class OutgoingConnectionFactory } } + // Called from Instance.destroy(). public void waitUntilFinished() + throws InterruptedException { - java.util.Map<Connector, java.util.List<Ice.ConnectionI> > connections = null; - - synchronized(this) + try { - // - // First we wait until the factory is destroyed. We also - // wait until there are no pending connections - // anymore. Only then we can be sure the _connections - // contains all connections. - // - while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0) + java.util.Map<Connector, java.util.List<Ice.ConnectionI> > connections = null; + synchronized(this) { - try + // + // First we wait until the factory is destroyed. We also + // wait until there are no pending connections + // anymore. Only then we can be sure the _connections + // contains all connections. + // + while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0) { wait(); } - catch(InterruptedException ex) - { - } - } - - // - // We want to wait until all connections are finished outside the - // thread synchronization. - // - connections = new java.util.HashMap<Connector, java.util.List<Ice.ConnectionI> >(_connections); - } - // - // Now we wait until the destruction of each connection is finished. - // - for(java.util.List<Ice.ConnectionI> connectionList : connections.values()) - { - for(Ice.ConnectionI connection : connectionList) - { - connection.waitUntilFinished(); - } - } - - synchronized(this) - { - // Ensure all the connections are finished and reapable at this point. - java.util.List<Ice.ConnectionI> cons = _monitor.swapReapedConnections(); - if(cons != null) - { - int size = 0; - for(java.util.List<Ice.ConnectionI> connectionList : _connections.values()) - { - size += connectionList.size(); - } - assert(cons.size() == size); - _connections.clear(); - _connectionsByEndpoint.clear(); - } - else - { - assert(_connections.isEmpty()); - assert(_connectionsByEndpoint.isEmpty()); + // + // We want to wait until all connections are finished outside the + // thread synchronization. + // + connections = new java.util.HashMap<Connector, java.util.List<Ice.ConnectionI> >(_connections); } - _monitor.destroy(); - } - } - - public Ice.ConnectionI - create(EndpointI[] endpts, boolean hasMore, Ice.EndpointSelectionType selType, Ice.BooleanHolder compress) - { - assert(endpts.length > 0); - - // - // Apply the overrides. - // - java.util.List<EndpointI> endpoints = applyOverrides(endpts); - - // - // Try to find a connection to one of the given endpoints. - // - Ice.ConnectionI connection = findConnectionByEndpoint(endpoints, compress); - if(connection != null) - { - return connection; - } - - Ice.LocalException exception = null; - - // - // If we didn't find a connection with the endpoints, we create the connectors - // for the endpoints. - // - java.util.List<ConnectorInfo> connectors = new java.util.ArrayList<ConnectorInfo>(); - java.util.Iterator<EndpointI> p = endpoints.iterator(); - while(p.hasNext()) - { - EndpointI endpoint = p.next(); // - // Create connectors for the endpoint. + // Now we wait until the destruction of each connection is finished. // - try + for(java.util.List<Ice.ConnectionI> connectionList : connections.values()) { - java.util.List<Connector> cons = endpoint.connectors(selType); - assert(cons.size() > 0); - for(Connector c : cons) - { - connectors.add(new ConnectorInfo(c, endpoint)); - } - } - catch(Ice.LocalException ex) - { - exception = ex; - handleException(exception, hasMore || p.hasNext()); - } - } - - if(connectors.isEmpty()) - { - assert(exception != null); - throw exception; - } - - // - // Try to get a connection to one of the connectors. A null result indicates that no - // connection was found and that we should try to establish the connection (and that - // the connectors were added to _pending to prevent other threads from establishing - // the connection). - // - connection = getConnection(connectors, null, compress); - if(connection != null) - { - return connection; - } - - // - // Try to establish the connection to the connectors. - // - DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); - java.util.Iterator<ConnectorInfo> q = connectors.iterator(); - ConnectorInfo ci = null; - while(q.hasNext()) - { - ci = q.next(); - - Ice.Instrumentation.Observer observer = null; - if(obsv != null) - { - observer = obsv.getConnectionEstablishmentObserver(ci.endpoint, ci.connector.toString()); - if(observer != null) + for(Ice.ConnectionI connection : connectionList) { - observer.attach(); + try + { + connection.waitUntilFinished(); + } + catch(InterruptedException e) + { + // + // Force close all of the connections. + // + for(java.util.List<Ice.ConnectionI> l : connections.values()) + { + for(Ice.ConnectionI c : l) + { + c.close(true); + } + } + throw e; + } } } - try + synchronized(this) { - connection = createConnection(ci.connector.connect(), ci); - connection.start(null); - - if(observer != null) + // Ensure all the connections are finished and reapable at this point. + java.util.List<Ice.ConnectionI> cons = _monitor.swapReapedConnections(); + if(cons != null) { - observer.detach(); - } - - if(defaultsAndOverrides.overrideCompress) - { - compress.value = defaultsAndOverrides.overrideCompressValue; + int size = 0; + for(java.util.List<Ice.ConnectionI> connectionList : _connections.values()) + { + size += connectionList.size(); + } + assert(cons.size() == size); + _connections.clear(); + _connectionsByEndpoint.clear(); } else { - compress.value = ci.endpoint.compress(); + assert(_connections.isEmpty()); + assert(_connectionsByEndpoint.isEmpty()); } - connection.activate(); - break; - } - catch(Ice.CommunicatorDestroyedException ex) - { - if(observer != null) - { - observer.failed(ex.ice_name()); - observer.detach(); - } - exception = ex; - handleConnectionException(exception, hasMore || p.hasNext()); - connection = null; - break; // No need to continue - } - catch(Ice.LocalException ex) - { - if(observer != null) - { - observer.failed(ex.ice_name()); - observer.detach(); - } - exception = ex; - handleConnectionException(exception, hasMore || p.hasNext()); - connection = null; + _monitor.destroy(); } } - - // - // Finish creating the connection (this removes the connectors from the _pending - // list and notifies any waiting threads). - // - if(connection != null) + catch(InterruptedException ex) { - finishGetConnection(connectors, ci, connection, null); - } - else - { - finishGetConnection(connectors, exception, null); - } - - if(connection == null) - { - assert(exception != null); - throw exception; + // Here wait() or waitUntilFinished() were interrupted. Clear the connections + // and such and continue along. + _connections.clear(); + _connectionsByEndpoint.clear(); + _monitor.destroy(); + throw ex; } - - return connection; } public void @@ -611,6 +478,7 @@ public final class OutgoingConnectionFactory private Ice.ConnectionI getConnection(java.util.List<ConnectorInfo> connectors, ConnectCallback cb, Ice.BooleanHolder compress) { + assert(cb != null); synchronized(this) { if(_destroyed) @@ -655,26 +523,7 @@ public final class OutgoingConnectionFactory if(addToPending(cb, connectors)) { - // - // If a callback is not specified we wait until another thread notifies us about a - // change to the pending list. Otherwise, if a callback is provided we're done: - // when the pending list changes the callback will be notified and will try to - // get the connection again. - // - if(cb == null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - else - { - return null; - } + return null; } else { @@ -1104,7 +953,7 @@ public final class OutgoingConnectionFactory } } - public void + void setConnection(Ice.ConnectionI connection, boolean compress) { // @@ -1115,7 +964,7 @@ public final class OutgoingConnectionFactory _factory.decPendingConnectCount(); // Must be called last. } - public void + void setException(Ice.LocalException ex) { // @@ -1125,13 +974,13 @@ public final class OutgoingConnectionFactory _factory.decPendingConnectCount(); // Must be called last. } - public boolean + boolean hasConnector(ConnectorInfo ci) { return _connectors.contains(ci); } - public boolean + boolean removeConnectors(java.util.List<ConnectorInfo> connectors) { _connectors.removeAll(connectors); @@ -1139,13 +988,13 @@ public final class OutgoingConnectionFactory return _connectors.isEmpty(); } - public void + void removeFromPending() { _factory.removeFromPending(this, _connectors); } - void + private void getConnectors() { try @@ -1166,7 +1015,7 @@ public final class OutgoingConnectionFactory nextEndpoint(); } - void + private void nextEndpoint() { try @@ -1181,7 +1030,7 @@ public final class OutgoingConnectionFactory } } - void + private void getConnection() { try @@ -1196,7 +1045,7 @@ public final class OutgoingConnectionFactory { // // A null return value from getConnection indicates that the connection - // is being established and that everthing has been done to ensure that + // is being established and that everything has been done to ensure that // the callback will be notified when the connection establishment is // done. // @@ -1213,7 +1062,7 @@ public final class OutgoingConnectionFactory } } - void + private void nextConnector() { Ice.ConnectionI connection = null; diff --git a/java/src/IceInternal/PropertyNames.java b/java/src/IceInternal/PropertyNames.java index ce5eee21f7c..f27b671e3bc 100644 --- a/java/src/IceInternal/PropertyNames.java +++ b/java/src/IceInternal/PropertyNames.java @@ -70,6 +70,7 @@ public final class PropertyNames new Property("Ice\\.Admin\\.Facets", false, null), new Property("Ice\\.Admin\\.InstanceName", false, null), new Property("Ice\\.Admin\\.ServerId", false, null), + new Property("Ice\\.BackgroundIO", false, null), new Property("Ice\\.BackgroundLocatorCacheUpdates", false, null), new Property("Ice\\.BatchAutoFlush", false, null), new Property("Ice\\.ChangeUser", false, null), diff --git a/java/src/IceInternal/ProxyBatchOutgoingAsync.java b/java/src/IceInternal/ProxyBatchOutgoingAsync.java index 15c4b26500f..4f24a1919e6 100644 --- a/java/src/IceInternal/ProxyBatchOutgoingAsync.java +++ b/java/src/IceInternal/ProxyBatchOutgoingAsync.java @@ -25,7 +25,7 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync RequestHandler handler = null; try { - handler = _proxy.__getRequestHandler(true); + handler = _proxy.__getRequestHandler(); int status = handler.sendAsyncRequest(this); if((status & AsyncStatus.Sent) > 0) { diff --git a/java/src/IceInternal/ProxyFactory.java b/java/src/IceInternal/ProxyFactory.java index c0b2d60e150..54b43d7ab29 100644 --- a/java/src/IceInternal/ProxyFactory.java +++ b/java/src/IceInternal/ProxyFactory.java @@ -9,6 +9,8 @@ package IceInternal; +import Ice.OperationInterruptedException; + public final class ProxyFactory { public Ice.ObjectPrx @@ -211,6 +213,14 @@ public final class ProxyFactory throw ex; } + // + // Don't retry on OperationInterruptedException. + // + if(ex instanceof OperationInterruptedException) + { + throw ex; + } + ++cnt; assert(cnt > 0); diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java new file mode 100644 index 00000000000..6c2a5b55f2c --- /dev/null +++ b/java/src/IceInternal/QueueRequestHandler.java @@ -0,0 +1,357 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; + +import Ice.CommunicatorDestroyedException; +import Ice.ConnectionI; + +public class QueueRequestHandler implements RequestHandler +{ + public + QueueRequestHandler(Instance instance, RequestHandler delegate) { + _executor = instance.getQueueExecutor(); + if(_executor == null) + { + throw new CommunicatorDestroyedException(); + } + assert(delegate != null); + _delegate = delegate; + } + + @Override + public void + prepareBatchRequest(final BasicStream out) throws RetryException + { + try + { + Future<Void> future = _executor.submit(new Callable<Void>() { + @Override + public Void call() throws RetryException + { + _delegate.prepareBatchRequest(out); + return null; + } + }); + + future.get(); + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch (InterruptedException e) + { + throw new Ice.OperationInterruptedException(); + } + catch (ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RetryException ex) + { + throw ex; + } + catch(RuntimeException ex) + { + throw ex; + } + catch(Throwable ex) + { + assert(false); + } + } + } + + @Override + public void + finishBatchRequest(final BasicStream out) + { + try + { + Future<Void> future = _executor.submit(new Callable<Void>() { + @Override + public Void call() + { + _delegate.finishBatchRequest(out); + return null; + } + }); + future.get(); + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch (InterruptedException e) + { + throw new Ice.OperationInterruptedException(); + } + catch (ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RuntimeException ex) + { + throw ex; + } + catch(Throwable ex) + { + assert(false); + } + } + } + + @Override + public void + abortBatchRequest() + { + try + { + Future<Void> future = _executor.submit(new Callable<Void>() { + @Override + public Void call() + { + _delegate.abortBatchRequest(); + return null; + } + }); + future.get(); + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch (InterruptedException e) + { + throw new Ice.OperationInterruptedException(); + } + catch (ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RuntimeException ex) + { + throw ex; + } + catch(Throwable ex) + { + assert(false); + } + } + } + + @Override + public boolean + sendRequest(final OutgoingMessageCallback out) throws RetryException + { + try + { + Future<Boolean> future = _executor.submit(new Callable<Boolean>() { + @Override + public Boolean call() throws RetryException + { + return _delegate.sendRequest(out); + } + }); + return future.get(); + } + catch (InterruptedException e) + { + throw new Ice.OperationInterruptedException(); + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch (ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RetryException ex) + { + throw ex; + } + catch(RuntimeException ex) + { + throw ex; + } + catch(Throwable ex) + { + assert(false); + } + } + return false; + } + + @Override + public int + sendAsyncRequest(final OutgoingAsyncMessageCallback out) throws RetryException + { + try + { + Future<Integer> future = _executor.submit(new Callable<Integer>() { + @Override + public Integer call() throws RetryException + { + return _delegate.sendAsyncRequest(out); + } + }); + return future.get(); + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch (InterruptedException e) + { + throw new Ice.OperationInterruptedException(); + } + catch (ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RetryException ex) + { + throw ex; + } + catch(RuntimeException ex) + { + throw ex; + } + catch(Throwable ex) + { + assert(false); + } + } + return 0; + } + + @Override + public boolean + requestCanceled(final OutgoingMessageCallback out, final Ice.LocalException ex) + { + try + { + Future<Boolean> future = _executor.submit(new Callable<Boolean>() { + @Override + public Boolean call() + { + return _delegate.requestCanceled(out, ex); + } + }); + return future.get(); + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch (InterruptedException e) + { + throw new Ice.OperationInterruptedException(); + } + catch (ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RuntimeException exc) + { + throw exc; + } + catch(Throwable exc) + { + assert(false); + } + } + return false; + } + + @Override + public boolean + asyncRequestCanceled(final OutgoingAsyncMessageCallback outAsync, final Ice.LocalException ex) + { + try + { + Future<Boolean> future = _executor.submit(new Callable<Boolean>() { + @Override + public Boolean call() + { + return _delegate.asyncRequestCanceled(outAsync, ex); + } + }); + return future.get(); + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch (InterruptedException e) + { + throw new Ice.OperationInterruptedException(); + } + catch (ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RuntimeException exc) + { + throw exc; + } + catch(Throwable exc) + { + assert(false); + } + } + return false; + } + + @Override + public Reference + getReference() + { + return _delegate.getReference(); + } + + @Override + public ConnectionI + getConnection() + { + return _delegate.getConnection(); + } + + @Override + public ConnectionI + waitForConnection() throws InterruptedException + { + return _delegate.waitForConnection(); + } + + private final RequestHandler _delegate; + private final ExecutorService _executor; +} diff --git a/java/src/IceInternal/Reference.java b/java/src/IceInternal/Reference.java index f0f49a75260..e4789875723 100644 --- a/java/src/IceInternal/Reference.java +++ b/java/src/IceInternal/Reference.java @@ -412,7 +412,6 @@ public abstract class Reference implements Cloneable // public abstract java.util.Map<String, String> toProperty(String prefix); - public abstract Ice.ConnectionI getConnection(Ice.BooleanHolder comp); public abstract void getConnection(GetConnectionCallback callback); @Override diff --git a/java/src/IceInternal/RequestHandler.java b/java/src/IceInternal/RequestHandler.java index e09c6019932..894c986a84d 100644 --- a/java/src/IceInternal/RequestHandler.java +++ b/java/src/IceInternal/RequestHandler.java @@ -22,10 +22,13 @@ public interface RequestHandler int sendAsyncRequest(OutgoingAsyncMessageCallback out) throws RetryException; - void requestTimedOut(OutgoingMessageCallback out); - void asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync); + boolean requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex); + boolean asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex); Reference getReference(); - Ice.ConnectionI getConnection(boolean wait); + Ice.ConnectionI getConnection(); + Ice.ConnectionI waitForConnection() + throws InterruptedException; + } diff --git a/java/src/IceInternal/RoutableReference.java b/java/src/IceInternal/RoutableReference.java index 705aebc6475..8e89f69cd15 100644 --- a/java/src/IceInternal/RoutableReference.java +++ b/java/src/IceInternal/RoutableReference.java @@ -496,72 +496,6 @@ public class RoutableReference extends Reference } @Override - public Ice.ConnectionI - getConnection(Ice.BooleanHolder comp) - { - if(_routerInfo != null) - { - // - // If we route, we send everything to the router's client - // proxy endpoints. - // - EndpointI[] endpts = _routerInfo.getClientEndpoints(); - if(endpts.length > 0) - { - applyOverrides(endpts); - return createConnection(endpts, comp); - } - } - - if(_endpoints.length > 0) - { - return createConnection(_endpoints, comp); - } - - while(true) - { - Ice.BooleanHolder cached = new Ice.BooleanHolder(false); - EndpointI[] endpts = null; - if(_locatorInfo != null) - { - endpts = _locatorInfo.getEndpoints(this, _locatorCacheTimeout, cached); - applyOverrides(endpts); - } - - if(endpts == null || endpts.length == 0) - { - throw new Ice.NoEndpointException(toString()); - } - - try - { - return createConnection(endpts, comp); - } - catch(Ice.NoEndpointException ex) - { - throw ex; // No need to retry if there's no endpoints. - } - catch(Ice.LocalException ex) - { - assert(_locatorInfo != null); - _locatorInfo.clearCache(this); - if(cached.value) - { - TraceLevels traceLevels = getInstance().traceLevels(); - if(traceLevels.retry >= 2) - { - String s = "connection to cached endpoints failed\n" + - "removing endpoints from cache and trying one more time\n" + ex; - getInstance().initializationData().logger.trace(traceLevels.retryCat, s); - } - continue; // Try again if the endpoints were cached. - } - throw ex; - } - } - } - - @Override public void getConnection(final GetConnectionCallback callback) { @@ -861,77 +795,6 @@ public class RoutableReference extends Reference return endpoints.toArray(new EndpointI[endpoints.size()]); } - protected Ice.ConnectionI - createConnection(EndpointI[] allEndpoints, Ice.BooleanHolder compress) - { - EndpointI[] endpoints = filterEndpoints(allEndpoints); - if(endpoints.length == 0) - { - throw new Ice.NoEndpointException(toString()); - } - - // - // Finally, create the connection. - // - OutgoingConnectionFactory factory = getInstance().outgoingConnectionFactory(); - Ice.ConnectionI connection = null; - if(getCacheConnection() || endpoints.length == 1) - { - // - // Get an existing connection or create one if there's no - // existing connection to one of the given endpoints. - // - connection = factory.create(endpoints, false, getEndpointSelection(), compress); - } - else - { - // - // Go through the list of endpoints and try to create the - // connection until it succeeds. This is different from just - // calling create() with the given endpoints since this might - // create a new connection even if there's an existing - // connection for one of the endpoints. - // - - Ice.LocalException exception = null; - EndpointI[] endpoint = new EndpointI[1]; - for(int i = 0; i < endpoints.length; ++i) - { - try - { - endpoint[0] = endpoints[i]; - final boolean more = i != endpoints.length - 1; - connection = factory.create(endpoint, more, getEndpointSelection(), compress); - break; - } - catch(Ice.LocalException ex) - { - exception = ex; - } - } - - if(connection == null) - { - assert(exception != null); - throw exception; - } - } - - assert(connection != null); - - // - // If we have a router, set the object adapter for this router - // (if any) to the new connection, so that callbacks from the - // router can be received over this new connection. - // - if(_routerInfo != null && _routerInfo.getAdapter() != null) - { - connection.setAdapter(_routerInfo.getAdapter()); - } - - return connection; - } - protected void createConnection(EndpointI[] allEndpoints, final GetConnectionCallback callback) { @@ -993,7 +856,7 @@ public class RoutableReference extends Reference new OutgoingConnectionFactory.CreateConnectionCallback() { @Override - public void + public void setConnection(Ice.ConnectionI connection, boolean compress) { // @@ -1009,7 +872,7 @@ public class RoutableReference extends Reference } @Override - public void + public void setException(final Ice.LocalException ex) { if(_exception == null) diff --git a/java/src/IceInternal/Selector.java b/java/src/IceInternal/Selector.java index 55c7582fbf5..9b0835daad7 100644 --- a/java/src/IceInternal/Selector.java +++ b/java/src/IceInternal/Selector.java @@ -209,8 +209,11 @@ public final class Selector { Thread.sleep(1); } - catch(java.lang.InterruptedException ex) + catch(InterruptedException ex) { + // + // Eat the InterruptedException (as we do in ThreadPool.promoteFollower). + // } if(++_spuriousWakeUp > 100) diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java index bdaa5bca9a1..135299ebbc7 100644 --- a/java/src/IceInternal/TcpTransceiver.java +++ b/java/src/IceInternal/TcpTransceiver.java @@ -154,7 +154,6 @@ final class TcpTransceiver implements Transceiver } @Override - @SuppressWarnings("deprecation") public int write(Buffer buf) { final int size = buf.b.limit(); @@ -240,7 +239,6 @@ final class TcpTransceiver implements Transceiver } @Override - @SuppressWarnings("deprecation") public int read(Buffer buf, Ice.BooleanHolder moreData) { int packetSize = buf.b.remaining(); @@ -254,8 +252,8 @@ final class TcpTransceiver implements Transceiver try { assert(_fd != null); - int ret = _fd.read(buf.b); + int ret = _fd.read(buf.b); if(ret == -1) { throw new Ice.ConnectionLostException(); @@ -330,7 +328,6 @@ final class TcpTransceiver implements Transceiver } } - @SuppressWarnings("deprecation") TcpTransceiver(ProtocolInstance instance, java.nio.channels.SocketChannel fd, NetworkProxy proxy, java.net.InetSocketAddress addr) { @@ -357,7 +354,6 @@ final class TcpTransceiver implements Transceiver } } - @SuppressWarnings("deprecation") TcpTransceiver(ProtocolInstance instance, java.nio.channels.SocketChannel fd) { _instance = instance; diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index e9554519655..0e57a6a61c7 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -57,7 +57,14 @@ public final class ThreadPool { // No call to ioCompleted, this shouldn't block (and we don't want to cause // a new thread to be started). - _thread.join(); + try + { + _thread.join(); + } + catch (InterruptedException e) + { + // Ignore. + } } private final EventHandlerThread _thread; @@ -183,7 +190,7 @@ public final class ThreadPool _hasPriority = hasPriority; _priority = priority; - _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector); + _workQueue = new ThreadPoolWorkQueue(_instance, this, _selector); _nextHandler = _handlers.iterator(); @@ -216,7 +223,14 @@ public final class ThreadPool _instance.initializationData().logger.error(s); destroy(); - joinWithAllThreads(); + try + { + joinWithAllThreads(); + } + catch (InterruptedException e) + { + throw new Ice.OperationInterruptedException(); + } throw ex; } } @@ -350,6 +364,7 @@ public final class ThreadPool public void joinWithAllThreads() + throws InterruptedException { // // _threads is immutable after destroy() has been called, @@ -363,6 +378,11 @@ public final class ThreadPool } // + // TODO: MJN: InterruptedException leads to a leak as the + // work queue and selector are not destroyed? + // + + // // Destroy the selector // _workQueue.close(); @@ -692,36 +712,50 @@ public final class ThreadPool // while(!_promote || _inUseIO == _sizeIO || (!_nextHandler.hasNext() && _inUseIO > 0)) { - try + if(_threadIdleTime > 0) { - if(_threadIdleTime > 0) + long before = IceInternal.Time.currentMonotonicTimeMillis(); + boolean interrupted = false; + try { - long before = IceInternal.Time.currentMonotonicTimeMillis(); + // + // If the wait is interrupted then we'll let the thread die as if it timed out. + // wait(_threadIdleTime * 1000); - if(IceInternal.Time.currentMonotonicTimeMillis() - before >= _threadIdleTime * 1000) + } + catch (InterruptedException e) + { + interrupted = true; + } + if(interrupted || IceInternal.Time.currentMonotonicTimeMillis() - before >= _threadIdleTime * 1000) + { + if(!_destroyed && (!_promote || _inUseIO == _sizeIO || + (!_nextHandler.hasNext() && _inUseIO > 0))) { - if(!_destroyed && (!_promote || _inUseIO == _sizeIO || - (!_nextHandler.hasNext() && _inUseIO > 0))) + if(_instance.traceLevels().threadPool >= 1) { - if(_instance.traceLevels().threadPool >= 1) - { - String s = "shrinking " + _prefix + ": Size=" + (_threads.size() - 1); - _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); - } - assert(_threads.size() > 1); // Can only be called by a waiting follower thread. - _threads.remove(current._thread); - _workQueue.queue(new JoinThreadWorkItem(current._thread)); - return true; + String s = "shrinking " + _prefix + ": Size=" + (_threads.size() - 1); + _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); } + assert(_threads.size() > 1); // Can only be called by a waiting follower thread. + _threads.remove(current._thread); + _workQueue.queue(new JoinThreadWorkItem(current._thread)); + return true; } } - else + } + else + { + try { wait(); } - } - catch(InterruptedException ex) - { + catch (InterruptedException e) + { + // + // Eat the InterruptedException. + // + } } } current._leader = true; // The current thread has become the leader. @@ -777,18 +811,9 @@ public final class ThreadPool public void join() + throws InterruptedException { - while(true) - { - try - { - _thread.join(); - break; - } - catch(InterruptedException ex) - { - } - } + _thread.join(); } public void diff --git a/java/src/IceInternal/ThreadPoolWorkQueue.java b/java/src/IceInternal/ThreadPoolWorkQueue.java index 3cf5bc4430e..c46fe75fd3a 100644 --- a/java/src/IceInternal/ThreadPoolWorkQueue.java +++ b/java/src/IceInternal/ThreadPoolWorkQueue.java @@ -9,12 +9,14 @@ package IceInternal; +import java.util.concurrent.ExecutorService; + final class ThreadPoolWorkQueue extends EventHandler { - ThreadPoolWorkQueue(ThreadPool threadPool, Instance instance, Selector selector) + ThreadPoolWorkQueue(Instance instance, ThreadPool threadPool, Selector selector) { + _executor = instance.getQueueExecutor(); _threadPool = threadPool; - _instance = instance; _selector = selector; _destroyed = false; @@ -89,6 +91,7 @@ final class ThreadPoolWorkQueue extends EventHandler { throw new Ice.CommunicatorDestroyedException(); } + assert(item != null); _workItems.add(item); postMessage(); } @@ -115,6 +118,7 @@ final class ThreadPoolWorkQueue extends EventHandler if(!_workItems.isEmpty()) { workItem = _workItems.removeFirst(); + assert(workItem != null); } else { @@ -158,6 +162,25 @@ final class ThreadPoolWorkQueue extends EventHandler public void postMessage() { + if(_executor != null) + { + _executor.submit(new Runnable() { + + @Override + public void run() + { + postMessageInternal(); + } + }); + } + else { + postMessageInternal(); + } + } + + private void + postMessageInternal() + { java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); buf.put(0, (byte)0); while(buf.hasRemaining()) @@ -166,6 +189,13 @@ final class ThreadPoolWorkQueue extends EventHandler { _fdIntrWrite.write(buf); } + // + // This is thrown if the thread is interrupted. + // + catch(java.nio.channels.ClosedChannelException ex) + { + break; + } catch(java.io.IOException ex) { throw new Ice.SocketException(ex); @@ -174,7 +204,6 @@ final class ThreadPoolWorkQueue extends EventHandler } private final ThreadPool _threadPool; - private final Instance _instance; private final Selector _selector; boolean _destroyed; @@ -182,4 +211,5 @@ final class ThreadPoolWorkQueue extends EventHandler private java.nio.channels.WritableByteChannel _fdIntrWrite; private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>(); + private ExecutorService _executor; } diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java index ff4917d33c1..aaca6911829 100644 --- a/java/src/IceInternal/UdpTransceiver.java +++ b/java/src/IceInternal/UdpTransceiver.java @@ -58,7 +58,6 @@ final class UdpTransceiver implements Transceiver } @Override - @SuppressWarnings("deprecation") public int write(Buffer buf) { // @@ -135,7 +134,6 @@ final class UdpTransceiver implements Transceiver } @Override - @SuppressWarnings("deprecation") public int read(Buffer buf, Ice.BooleanHolder moreData) { if(!buf.b.hasRemaining()) @@ -306,7 +304,6 @@ final class UdpTransceiver implements Transceiver // // Only for use by UdpEndpoint // - @SuppressWarnings("deprecation") UdpTransceiver(ProtocolInstance instance, java.net.InetSocketAddress addr, java.net.InetSocketAddress sourceAddr, String mcastInterface, int mcastTtl) { @@ -346,7 +343,6 @@ final class UdpTransceiver implements Transceiver // // Only for use by UdpEndpoint // - @SuppressWarnings("deprecation") UdpTransceiver(ProtocolInstance instance, String host, int port, String mcastInterface, boolean connect) { _instance = instance; diff --git a/java/src/IceInternal/Util.java b/java/src/IceInternal/Util.java index 4bd8ef592e2..647262fe90b 100644 --- a/java/src/IceInternal/Util.java +++ b/java/src/IceInternal/Util.java @@ -9,8 +9,41 @@ package IceInternal; +import java.util.concurrent.ThreadFactory; + public final class Util { + static String + createThreadName(final Ice.Properties properties, final String name) { + String threadName = properties.getProperty("Ice.ProgramName"); + if(threadName.length() > 0) + { + threadName += "-"; + } + + threadName = threadName + name; + return threadName; + } + + static ThreadFactory + createThreadFactory(final Ice.Properties properties, final String name) { + return new java.util.concurrent.ThreadFactory() + { + @Override + public Thread newThread(Runnable r) + { + Thread t = new Thread(r); + t.setName(name); + + if(properties.getProperty("Ice.ThreadPriority").length() > 0) + { + t.setPriority(Util.getThreadPriorityProperty(properties, "Ice")); + } + return t; + } + }; + } + public static Instance getInstance(Ice.Communicator communicator) { |