diff options
Diffstat (limited to 'java/src')
30 files changed, 1533 insertions, 1750 deletions
diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java index df247ac9358..477dec5f5ac 100644 --- a/java/src/Ice/AsyncResult.java +++ b/java/src/Ice/AsyncResult.java @@ -16,6 +16,13 @@ package Ice; **/ public interface AsyncResult { + /** + * If not completed, cancels the request. This is a local + * operation, it won't cancel the request on the server side. The + * request won't be sent if it was waiting to be sent or the + * response will be ignored if it received after the callback. + **/ + public void cancel(); /** * Returns the communicator that sent the invocation. diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java index 76d46baab6a..fab8cf98605 100644 --- a/java/src/Ice/CommunicatorI.java +++ b/java/src/Ice/CommunicatorI.java @@ -263,8 +263,10 @@ public final class CommunicatorI implements Communicator // This callback object receives the results of all invocations // of Connection.begin_flushBatchRequests. // - IceInternal.CommunicatorBatchOutgoingAsync result = - new IceInternal.CommunicatorBatchOutgoingAsync(this, _instance, __flushBatchRequests_name, cb); + IceInternal.CommunicatorFlushBatch result = new IceInternal.CommunicatorFlushBatch(this, + _instance, + __flushBatchRequests_name, + cb); connectionFactory.flushAsyncBatchRequests(result); adapterFactory.flushAsyncBatchRequests(result); @@ -282,8 +284,8 @@ public final class CommunicatorI implements Communicator public void end_flushBatchRequests(AsyncResult r) { - IceInternal.OutgoingAsyncBase ri = (IceInternal.OutgoingAsyncBase)r; - IceInternal.OutgoingAsyncBase.check(ri, this, __flushBatchRequests_name); + IceInternal.CommunicatorFlushBatch ri = + IceInternal.CommunicatorFlushBatch.check(r, this, __flushBatchRequests_name); ri.__wait(); } diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 73880042340..9ee6ac10121 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -9,7 +9,8 @@ package Ice; -public final class ConnectionI extends IceInternal.EventHandler implements Connection, IceInternal.ResponseHandler +public final class ConnectionI extends IceInternal.EventHandler + implements Connection, IceInternal.ResponseHandler, IceInternal.CancellationHandler { public interface StartCallback { @@ -373,8 +374,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne os.writeInt(requestId); } - out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, os.size() - - IceInternal.Protocol.headerSize - 4); + out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); int status; try @@ -388,6 +388,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne throw (Ice.LocalException) _exception.fillInStackTrace(); } + if(response || (status & IceInternal.AsyncStatus.Queued) > 0) + { + out.cancelable(this); // Notify the request that it's cancelable + } + if(response) { // @@ -640,29 +645,21 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private Ice.AsyncResult begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb) { - IceInternal.ConnectionBatchOutgoingAsync result = new IceInternal.ConnectionBatchOutgoingAsync(this, - _communicator, _instance, __flushBatchRequests_name, cb); - try - { - result.__invoke(); - } - catch(LocalException __ex) - { - result.invokeExceptionAsync(__ex); - } - + IceInternal.ConnectionFlushBatch result = + new IceInternal.ConnectionFlushBatch(this, _communicator, _instance, __flushBatchRequests_name, cb); + result.invoke(); return result; } @Override public void end_flushBatchRequests(AsyncResult ir) { - IceInternal.OutgoingAsyncBase r = (IceInternal.OutgoingAsyncBase) ir; - IceInternal.OutgoingAsyncBase.check(r, this, __flushBatchRequests_name); + IceInternal.ConnectionFlushBatch r = + IceInternal.ConnectionFlushBatch.check(ir, this, __flushBatchRequests_name); r.__wait(); } - synchronized public int flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync) + synchronized public int flushAsyncBatchRequests(IceInternal.OutgoingAsyncBase outAsync) { waitBatchStreamInUse(); @@ -687,11 +684,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _batchStream.pos(IceInternal.Protocol.headerSize); _batchStream.writeInt(_batchRequestNum); - outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0, _batchStream.size() - - IceInternal.Protocol.headerSize - 4); - _batchStream.swap(outAsync.getOs()); + outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0); + // // Send the batch stream. // @@ -708,6 +704,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne throw (Ice.LocalException) _exception.fillInStackTrace(); } + if((status & IceInternal.AsyncStatus.Queued) > 0) + { + outAsync.cancelable(this); // Notify the request that it's cancelable. + } + // // Reset the batch stream. // @@ -728,12 +729,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(callback != null) { - class CallbackWorkItem extends IceInternal.DispatchWorkItem + _threadPool.dispatch(new IceInternal.DispatchWorkItem(this) { - public CallbackWorkItem() - { - } - @Override public void run() { @@ -746,8 +743,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _logger.error("connection callback exception:\n" + ex + '\n' + _desc); } } - }; - _threadPool.dispatch(new CallbackWorkItem()); + }); } } else @@ -793,9 +789,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff); } - synchronized public boolean asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync, - Ice.LocalException ex) + @Override + synchronized public void asyncRequestCanceled(IceInternal.OutgoingAsyncBase outAsync, Ice.LocalException ex) { + if(_state >= StateClosed) + { + return; // The request has already been or will be shortly notified of the failure. + } + java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); while(it.hasNext()) { @@ -815,13 +816,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // it's fine if the OutgoingAsync output stream is released (and // as long as canceled requests cannot be retried). // - o.timedOut(); + o.canceled(); if(o != _sendStreams.getFirst()) { it.remove(); } - outAsync.dispatchInvocationCancel(ex, _threadPool, this); - return true; // We're done + if(outAsync.completed(ex)) + { + outAsync.invokeCompletedAsync(); + } + return; } } @@ -834,12 +838,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(it2.next() == o) { it2.remove(); - outAsync.dispatchInvocationCancel(ex, _threadPool, this); - return true; // We're done. + if(outAsync.completed(ex)) + { + outAsync.invokeCompletedAsync(); + } } } } - return false; } @Override @@ -1469,7 +1474,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne for(OutgoingMessage p : _sendStreams) { - p.finished(_exception); + p.completed(_exception); if(p.requestId > 0) // Make sure finished isn't called twice. { _asyncRequests.remove(p.requestId); @@ -1480,7 +1485,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne for(IceInternal.OutgoingAsync p : _asyncRequests.values()) { - p.finished(_exception); + if(p.completed(_exception)) + { + p.invokeCompleted(); + } } _asyncRequests.clear(); @@ -2580,7 +2588,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels); info.requestId = info.stream.readInt(); IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId); - if(outAsync != null && outAsync.finished(info.stream)) + if(outAsync != null && outAsync.completed(info.stream)) { info.outAsync = outAsync; ++info.messageDispatchCount; @@ -2999,7 +3007,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne this.requestId = 0; } - OutgoingMessage(IceInternal.OutgoingAsyncMessageCallback out, IceInternal.BasicStream stream, boolean compress, + OutgoingMessage(IceInternal.OutgoingAsyncBase out, IceInternal.BasicStream stream, boolean compress, int requestId) { this.stream = stream; @@ -3008,7 +3016,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne this.requestId = requestId; } - public void timedOut() + public void canceled() { assert (outAsync != null); outAsync = null; @@ -3035,16 +3043,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return false; } - public void finished(Ice.LocalException ex) + public void completed(Ice.LocalException ex) { - if(outAsync != null) + if(outAsync != null && outAsync.completed(ex)) { - outAsync.finished(ex); + outAsync.invokeCompleted(); } } public IceInternal.BasicStream stream; - public IceInternal.OutgoingAsyncMessageCallback outAsync; + public IceInternal.OutgoingAsyncBase outAsync; public boolean compress; public int requestId; boolean adopt; diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java index 7538fc16ced..48ad2739082 100644 --- a/java/src/Ice/ObjectAdapterI.java +++ b/java/src/Ice/ObjectAdapterI.java @@ -13,8 +13,6 @@ import java.util.Map; public final class ObjectAdapterI implements ObjectAdapter { - - @Override public String getName() @@ -751,7 +749,7 @@ public final class ObjectAdapterI implements ObjectAdapter } public void - flushAsyncBatchRequests(IceInternal.CommunicatorBatchOutgoingAsync outAsync) + flushAsyncBatchRequests(IceInternal.CommunicatorFlushBatch outAsync) { java.util.List<IceInternal.IncomingConnectionFactory> f; synchronized(this) diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index 17b426c69cb..3f7bb25d9ad 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -286,11 +286,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.BasicStream __os = __result.startWriteParams(Ice.FormatType.DefaultFormat); __os.writeString(__id); __result.endWriteParams(); - __result.invoke(true); + __result.invoke(); } catch(Exception __ex) { - __result.invokeExceptionAsync(__ex); + __result.abort(__ex); } return __result; } @@ -305,8 +305,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final boolean end_ice_isA(AsyncResult __iresult) { - IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult; - IceInternal.OutgoingAsyncBase.check(__result, this, __ice_isA_name); + IceInternal.OutgoingAsync __result = IceInternal.OutgoingAsync.check(__iresult, this, __ice_isA_name); try { if(!__result.__wait()) @@ -544,11 +543,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable { __result.prepare(__ice_ping_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous); __result.writeEmptyParams(); - __result.invoke(true); + __result.invoke(); } catch(Exception __ex) { - __result.invokeExceptionAsync(__ex); + __result.abort(__ex); } return __result; } @@ -777,11 +776,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable { __result.prepare(__ice_ids_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous); __result.writeEmptyParams(); - __result.invoke(true); + __result.invoke(); } catch(Exception __ex) { - __result.invokeExceptionAsync(__ex); + __result.abort(__ex); } return __result; } @@ -797,8 +796,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final String[] end_ice_ids(AsyncResult __iresult) { - IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase) __iresult; - IceInternal.OutgoingAsyncBase.check(__result, this, __ice_ids_name); + IceInternal.OutgoingAsync __result = IceInternal.OutgoingAsync.check(__iresult, this, __ice_ids_name); try { if(!__result.__wait()) @@ -1054,11 +1052,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable { __result.prepare(__ice_id_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous); __result.writeEmptyParams(); - __result.invoke(true); + __result.invoke(); } catch(Exception __ex) { - __result.invokeExceptionAsync(__ex); + __result.abort(__ex); } return __result; } @@ -1073,8 +1071,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final String end_ice_id(AsyncResult __iresult) { - IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase) __iresult; - IceInternal.OutgoingAsyncBase.check(__result, this, __ice_id_name); + IceInternal.OutgoingAsync __result = IceInternal.OutgoingAsync.check(__iresult, this, __ice_id_name); try { if(!__result.__wait()) @@ -1444,11 +1441,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable { __result.prepare(operation, mode, __context, __explicitCtx, __synchronous); __result.writeParamEncaps(inParams); - __result.invoke(true); + __result.invoke(); } catch(Exception __ex) { - __result.invokeExceptionAsync(__ex); + __result.abort(__ex); } return __result; } @@ -1468,8 +1465,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final boolean end_ice_invoke(ByteSeqHolder outParams, AsyncResult __iresult) { - IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase) __iresult; - IceInternal.OutgoingAsyncBase.check(__result, this, __ice_invoke_name); + IceInternal.OutgoingAsync __result = IceInternal.OutgoingAsync.check(__iresult, this, __ice_invoke_name); try { boolean ok = __result.__wait(); @@ -2427,49 +2423,47 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable private static final String __ice_getConnection_name = "ice_getConnection"; private AsyncResult - begin_ice_getConnectionInternal(IceInternal.CallbackBase __cb) + begin_ice_getConnectionInternal(IceInternal.CallbackBase cb) { - IceInternal.GetConnectionOutgoingAsync __result = - new IceInternal.GetConnectionOutgoingAsync(this, __ice_getConnection_name, __cb); + IceInternal.ProxyGetConnection result = new IceInternal.ProxyGetConnection(this, __ice_getConnection_name, cb); try { - __result.__invoke(); + result.invoke(); } - catch(Exception __ex) + catch(Exception ex) { - __result.invokeExceptionAsync(__ex); + result.abort(ex); } - return __result; + return result; } @Override public Ice.Connection - end_ice_getConnection(AsyncResult __iresult) + end_ice_getConnection(AsyncResult r) { - IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult; - IceInternal.OutgoingAsyncBase.check(__result, this, __ice_getConnection_name); - __result.__wait(); + IceInternal.ProxyGetConnection result = IceInternal.ProxyGetConnection.check(r, this, __ice_getConnection_name); + result.__wait(); return ice_getCachedConnection(); } - static public final void __ice_getConnection_completed(TwowayCallbackArg1<Ice.Connection> __cb, AsyncResult __result) + static public final void __ice_getConnection_completed(TwowayCallbackArg1<Ice.Connection> cb, AsyncResult result) { - Ice.Connection __ret = null; + Ice.Connection ret = null; try { - __ret = __result.getProxy().end_ice_getConnection(__result); + ret = result.getProxy().end_ice_getConnection(result); } - catch(LocalException __ex) + catch(LocalException ex) { - __cb.exception(__ex); + cb.exception(ex); return; } - catch(SystemException __ex) + catch(SystemException ex) { - __cb.exception(__ex); + cb.exception(ex); return; } - __cb.response(__ret); + cb.response(ret); } /** @@ -2578,28 +2572,26 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable private static final String __ice_flushBatchRequests_name = "ice_flushBatchRequests"; private AsyncResult - begin_ice_flushBatchRequestsInternal(IceInternal.CallbackBase __cb) + begin_ice_flushBatchRequestsInternal(IceInternal.CallbackBase cb) { - IceInternal.ProxyBatchOutgoingAsync __result = - new IceInternal.ProxyBatchOutgoingAsync(this, __ice_flushBatchRequests_name, __cb); + IceInternal.ProxyFlushBatch result = new IceInternal.ProxyFlushBatch(this, __ice_flushBatchRequests_name, cb); try { - __result.__invoke(); + result.invoke(); } - catch(Exception __ex) + catch(Exception ex) { - __result.invokeExceptionAsync(__ex); + result.abort(ex); } - return __result; + return result; } @Override public void - end_ice_flushBatchRequests(AsyncResult __iresult) + end_ice_flushBatchRequests(AsyncResult r) { - IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult; - IceInternal.OutgoingAsyncBase.check(__result, this, __ice_flushBatchRequests_name); - __result.__wait(); + IceInternal.ProxyFlushBatch result = IceInternal.ProxyFlushBatch.check(r, this, __ice_flushBatchRequests_name); + result.__wait(); } /** @@ -2721,34 +2713,34 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } public final void - __end(AsyncResult __iresult, String operation) + __end(AsyncResult r, String operation) { - IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult; - IceInternal.OutgoingAsyncBase.check(__result, this, operation); + IceInternal.ProxyOutgoingAsyncBase result = IceInternal.ProxyOutgoingAsyncBase.check(r, this, operation); try { - boolean ok = __result.__wait(); + boolean ok = result.__wait(); if(_reference.getMode() == IceInternal.Reference.ModeTwoway) { + IceInternal.OutgoingAsync outAsync = (IceInternal.OutgoingAsync)result; if(!ok) { try { - __result.throwUserException(); + outAsync.throwUserException(); } - catch(UserException __ex) + catch(UserException ex) { - throw new UnknownUserException(__ex.ice_name(), __ex); + throw new UnknownUserException(ex.ice_name(), ex); } } - __result.readEmptyParams(); + outAsync.readEmptyParams(); } } finally { - if(__result != null) + if(result != null) { - __result.cacheMessageBuffers(); + result.cacheMessageBuffers(); } } } diff --git a/java/src/IceInternal/AsyncResultI.java b/java/src/IceInternal/AsyncResultI.java new file mode 100644 index 00000000000..9d7445584fe --- /dev/null +++ b/java/src/IceInternal/AsyncResultI.java @@ -0,0 +1,475 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +import Ice.AsyncResult; +import Ice.Communicator; +import Ice.CommunicatorDestroyedException; +import Ice.Connection; + +public class AsyncResultI implements AsyncResult +{ + @Override + public void cancel() + { + cancel(new Ice.InvocationCanceledException()); + } + + @Override + public Communicator getCommunicator() + { + return _communicator; + } + + @Override + public Connection getConnection() + { + return null; + } + + @Override + public Ice.ObjectPrx getProxy() + { + return null; + } + + @Override + public final boolean isCompleted() + { + synchronized(this) + { + return (_state & StateDone) > 0; + } + } + + @Override + public final void waitForCompleted() + { + synchronized(this) + { + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } + while((_state & StateDone) == 0) + { + try + { + this.wait(); + } + catch(InterruptedException ex) + { + throw new Ice.OperationInterruptedException(); + } + } + } + } + + @Override + public final boolean isSent() + { + synchronized(this) + { + return (_state & StateSent) > 0; + } + } + + @Override + public final void waitForSent() + { + synchronized(this) + { + if(Thread.interrupted()) + { + throw new Ice.OperationInterruptedException(); + } + while((_state & StateSent) == 0 && _exception == null) + { + try + { + this.wait(); + } + catch(InterruptedException ex) + { + throw new Ice.OperationInterruptedException(); + } + } + } + } + + @Override + public final void throwLocalException() + { + synchronized(this) + { + if(_exception != null) + { + throw _exception; + } + } + } + + @Override + public final boolean sentSynchronously() + { + return _sentSynchronously; // No lock needed, immutable + } + + @Override + public final String getOperation() + { + return _operation; + } + + public final void invokeSent() + { + assert(_callback != null); + + if(_instance.useApplicationClassLoader()) + { + Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader()); + } + + try + { + _callback.__sent(this); + } + catch(RuntimeException ex) + { + warning(ex); + } + catch(AssertionError exc) + { + error(exc); + } + catch(OutOfMemoryError exc) + { + error(exc); + } + finally + { + if(_instance.useApplicationClassLoader()) + { + Thread.currentThread().setContextClassLoader(null); + } + } + + if(_observer != null) + { + Ice.ObjectPrx proxy = getProxy(); + if(proxy == null || !proxy.ice_isTwoway()) + { + _observer.detach(); + _observer = null; + } + } + } + + public final void invokeCompleted() + { + assert(_callback != null); + + if(_instance.useApplicationClassLoader()) + { + Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader()); + } + + try + { + _callback.__completed(this); + } + catch(RuntimeException ex) + { + warning(ex); + } + catch(AssertionError exc) + { + error(exc); + } + catch(OutOfMemoryError exc) + { + error(exc); + } + finally + { + if(_instance.useApplicationClassLoader()) + { + Thread.currentThread().setContextClassLoader(null); + } + } + + if(_observer != null) + { + _observer.detach(); + _observer = null; + } + } + + public final void invokeCompletedAsync() + { + // + // CommunicatorDestroyedCompleted is the only exception that can propagate directly + // from this method. + // + _instance.clientThreadPool().dispatch(new DispatchWorkItem(_cachedConnection) + { + @Override + public void run() + { + invokeCompleted(); + } + }); + } + + public void cancelable(final CancellationHandler handler) + { + synchronized(this) + { + if(_cancellationException == null) + { + _cancellationHandler = handler; + return; + } + } + handler.asyncRequestCanceled((OutgoingAsyncBase)this, _cancellationException); + } + + public final boolean __wait() + { + try + { + synchronized(this) + { + if((_state & StateEndCalled) > 0) + { + throw new IllegalArgumentException("end_ method called more than once"); + } + + _state |= StateEndCalled; + if(Thread.interrupted()) + { + throw new InterruptedException(); + } + while((_state & StateDone) == 0) + { + this.wait(); + } + + if(_exception != null) + { + throw (Ice.Exception)_exception.fillInStackTrace(); + } + + return (_state & StateOK) > 0; + } + } + catch(InterruptedException ex) + { + Ice.OperationInterruptedException exc = new Ice.OperationInterruptedException(); + cancel(exc); // Must be called outside the synchronization + throw exc; + } + } + + public void cacheMessageBuffers() + { + } + + protected AsyncResultI(Communicator communicator, Instance instance, String op, CallbackBase del) + { + _communicator = communicator; + _instance = instance; + _operation = op; + _state = 0; + _sentSynchronously = false; + _exception = null; + _callback = del; + } + + protected boolean sent(boolean done) + { + synchronized(this) + { + assert(_exception == null); + + boolean alreadySent = (_state & StateSent) != 0; + _state |= StateSent; + if(done) + { + _state |= StateDone | StateOK; + _cancellationHandler = null; + if(_observer != null && (_callback == null || !_callback.__hasSentCallback())) + { + _observer.detach(); + _observer = null; + } + + // + // For oneway requests after the data has been sent + // the buffers can be reused unless this is a + // collocated invocation. For collocated invocations + // the buffer won't be reused because it has already + // been marked as cached in invokeCollocated. + // + cacheMessageBuffers(); + } + this.notifyAll(); + return !alreadySent && _callback != null && _callback.__hasSentCallback(); + } + } + + protected boolean finished(boolean ok) + { + synchronized(this) + { + _state |= StateDone; + if(ok) + { + _state |= StateOK; + } + _cancellationHandler = null; + if(_callback == null) + { + if(_observer != null) + { + _observer.detach(); + _observer = null; + } + } + this.notifyAll(); + return _callback != null; + } + } + + protected boolean finished(Ice.Exception ex) + { + synchronized(this) + { + _state |= StateDone; + _exception = ex; + _cancellationHandler = null; + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + if(_callback == null) + { + if(_observer != null) + { + _observer.detach(); + _observer = null; + } + } + this.notifyAll(); + return _callback != null; + } + } + + protected final void invokeSentAsync() + { + // + // This is called when it's not safe to call the sent callback + // synchronously from this thread. Instead the exception callback + // is called asynchronously from the client thread pool. + // + try + { + _instance.clientThreadPool().dispatch(new DispatchWorkItem(_cachedConnection) + { + @Override + public void run() + { + invokeSent(); + } + }); + } + catch(CommunicatorDestroyedException exc) + { + } + } + + protected void cancel(Ice.LocalException ex) + { + synchronized(this) + { + _cancellationException = ex; + if(_cancellationHandler == null) + { + return; + } + } + _cancellationHandler.asyncRequestCanceled((OutgoingAsyncBase)this, ex); + } + + protected void checkCanceled() + { + synchronized(this) + { + if(_cancellationException != null) + { + throw _cancellationException; + } + } + } + + protected Ice.Instrumentation.InvocationObserver getObserver() + { + return _observer; + } + + protected static void check(AsyncResult r, String operation) + { + if(r == null) + { + throw new IllegalArgumentException("AsyncResult == null"); + } + else if(r.getOperation() != operation) // Do NOT use equals() here - we are comparing reference equality + { + throw new IllegalArgumentException("Incorrect operation for end_" + operation + " method: " + + r.getOperation()); + } + } + + private final void warning(RuntimeException ex) + { + if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + String s = "exception raised by AMI callback:\n" + Ex.toString(ex); + _instance.initializationData().logger.warning(s); + } + } + + private final void error(Error error) + { + String s = "error raised by AMI callback:\n" + Ex.toString(error); + _instance.initializationData().logger.error(s); + } + + protected final Instance _instance; + protected Ice.Instrumentation.InvocationObserver _observer; + protected Connection _cachedConnection; + protected boolean _sentSynchronously; + + private final Communicator _communicator; + private final String _operation; + private final CallbackBase _callback; + + private Ice.Exception _exception; + + private CancellationHandler _cancellationHandler; + private Ice.LocalException _cancellationException; + + protected static final byte StateOK = 0x1; + protected static final byte StateDone = 0x2; + protected static final byte StateSent = 0x4; + protected static final byte StateEndCalled = 0x8; + protected static final byte StateCachedBuffers = 0x10; + protected byte _state; +} diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java deleted file mode 100644 index ff8ba712185..00000000000 --- a/java/src/IceInternal/BatchOutgoingAsync.java +++ /dev/null @@ -1,117 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -abstract public class BatchOutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback -{ - BatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, CallbackBase callback) - { - super(communicator, instance, operation, callback); - } - - @Override - public int - send(Ice.ConnectionI connection, boolean compress, boolean response) - { - _cachedConnection = connection; - return connection.flushAsyncBatchRequests(this); - } - - @Override - public int - invokeCollocated(CollocatedRequestHandler handler) - { - return handler.invokeAsyncBatchRequests(this); - } - - @Override - public boolean - sent() - { - synchronized(_monitor) - { - _state |= StateDone | StateOK | StateSent; - //_os.resize(0, false); // Don't clear the buffer now, it's needed for the collocation optimization - if(_childObserver != null) - { - _childObserver.detach(); - _childObserver = null; - } - if(_timeoutRequestHandler != null) - { - _future.cancel(false); - _future = null; - _timeoutRequestHandler = null; - } - _monitor.notifyAll(); - - if(_callback == null || !_callback.__hasSentCallback()) - { - if(_observer != null) - { - _observer.detach(); - _observer = null; - } - return false; - } - return true; - } - } - - @Override - public void - invokeSent() - { - invokeSentInternal(); - } - - @Override - public void - finished(Ice.Exception exc) - { - synchronized(_monitor) - { - if(_childObserver != null) - { - _childObserver.failed(exc.ice_name()); - _childObserver.detach(); - _childObserver = null; - } - if(_timeoutRequestHandler != null) - { - _future.cancel(false); - _future = null; - _timeoutRequestHandler = null; - } - } - invokeException(exc); - } - - @Override - public void - processRetry() - { - assert(false); // Retries are never scheduled - } - - @Override - public void - dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) - { - threadPool.dispatch(new DispatchWorkItem(connection) - { - @Override - public void run() - { - BatchOutgoingAsync.this.finished(ex); - } - }); - } -} diff --git a/java/src/IceInternal/CancellationHandler.java b/java/src/IceInternal/CancellationHandler.java new file mode 100644 index 00000000000..0182f403cb4 --- /dev/null +++ b/java/src/IceInternal/CancellationHandler.java @@ -0,0 +1,15 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public interface CancellationHandler +{ + void asyncRequestCanceled(OutgoingAsyncBase outAsync, Ice.LocalException ex); +} diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java index 8de4a93ebc4..48d7bfa5b7d 100644 --- a/java/src/IceInternal/CollocatedRequestHandler.java +++ b/java/src/IceInternal/CollocatedRequestHandler.java @@ -13,8 +13,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { private class InvokeAllAsync extends DispatchWorkItem { - private InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum, - boolean batch) + private InvokeAllAsync(OutgoingAsyncBase outAsync, BasicStream os, int requestId, int invokeNum, boolean batch) { _outAsync = outAsync; _os = os; @@ -32,7 +31,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } - private final OutgoingAsyncMessageCallback _outAsync; + private final OutgoingAsyncBase _outAsync; private BasicStream _os; private final int _requestId; private final int _invokeNum; @@ -186,14 +185,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler @Override public int - sendAsyncRequest(OutgoingAsyncMessageCallback outAsync) + sendAsyncRequest(OutgoingAsyncBase outAsync) { return outAsync.invokeCollocated(this); } @Override - synchronized public boolean - asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex) + synchronized public void + asyncRequestCanceled(OutgoingAsyncBase outAsync, Ice.LocalException ex) { Integer requestId = _sendAsyncRequests.get(outAsync); if(requestId != null) @@ -203,8 +202,11 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler _asyncRequests.remove(requestId); } _sendAsyncRequests.remove(outAsync); - outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); - return true; // We're done + if(outAsync.completed(ex)) + { + outAsync.invokeCompletedAsync(); + } + return; } if(outAsync instanceof OutgoingAsync) @@ -216,12 +218,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler if(e.getValue() == o) { _asyncRequests.remove(e.getKey()); - outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); - return true; // We're done + if(outAsync.completed(ex)) + { + outAsync.invokeCompletedAsync(); + } + return; } } } - return false; } @Override @@ -242,9 +246,13 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } outAsync = _asyncRequests.remove(requestId); + if(outAsync != null && !outAsync.completed(os)) + { + outAsync = null; + } } - if(outAsync != null && outAsync.finished(os)) + if(outAsync != null) { outAsync.invokeCompleted(); } @@ -271,18 +279,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler public void invokeException(int requestId, Ice.LocalException ex, int invokeNum) { - if(requestId > 0) - { - OutgoingAsync outAsync = null; - synchronized(this) - { - outAsync = _asyncRequests.remove(requestId); - } - if(outAsync != null) - { - outAsync.finished(ex); - } - } + handleException(requestId, ex); _adapter.decDirectCount(); } @@ -307,7 +304,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler return null; } - void invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous) + int invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous) { int requestId = 0; if((_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) || _response) @@ -323,6 +320,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { _sendAsyncRequests.put(outAsync, requestId); } + outAsync.cancelable(this); } } @@ -355,9 +353,10 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false)); } + return AsyncStatus.Queued; } - int invokeAsyncBatchRequests(BatchOutgoingAsync outAsync) + int invokeAsyncBatchRequests(OutgoingAsyncBase outAsync) { int invokeNum; synchronized(this) @@ -370,6 +369,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) { _sendAsyncRequests.put(outAsync, 0); + outAsync.cancelable(this); } assert(!_batchStream.isEmpty()); @@ -404,7 +404,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } private boolean - sentAsync(final OutgoingAsyncMessageCallback outAsync) + sentAsync(final OutgoingAsyncBase outAsync) { if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) { @@ -511,10 +511,15 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler synchronized(this) { outAsync = _asyncRequests.remove(requestId); + if(outAsync != null && !outAsync.completed(ex)) + { + outAsync = null; + } } + if(outAsync != null) { - outAsync.finished(ex); + outAsync.invokeCompleted(); } } @@ -567,8 +572,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler // A map of outstanding requests that can be canceled. A request // can be canceled if it has an invocation timeout, or we support // interrupts. - private java.util.Map<OutgoingAsyncMessageCallback, Integer> _sendAsyncRequests = - new java.util.HashMap<OutgoingAsyncMessageCallback, Integer>(); + private java.util.Map<OutgoingAsyncBase, Integer> _sendAsyncRequests = + new java.util.HashMap<OutgoingAsyncBase, Integer>(); private java.util.Map<Integer, OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, OutgoingAsync>(); diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorFlushBatch.java index ab1854cbce1..19e55ecc91c 100644 --- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java +++ b/java/src/IceInternal/CommunicatorFlushBatch.java @@ -16,12 +16,29 @@ import java.util.concurrent.RejectedExecutionException; import Ice.CommunicatorDestroyedException; -public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase +public class CommunicatorFlushBatch extends IceInternal.AsyncResultI { - public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, - CallbackBase callback) + public static CommunicatorFlushBatch check(Ice.AsyncResult r, Ice.Communicator com, String operation) { - super(communicator, instance, operation, callback); + check(r, operation); + if(!(r instanceof CommunicatorFlushBatch)) + { + throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method"); + } + if(r.getCommunicator() != com) + { + throw new IllegalArgumentException("Communicator for call to end_" + operation + + " does not match communicator that was used to call corresponding " + + "begin_" + operation + " method"); + } + return (CommunicatorFlushBatch)r; + } + + public CommunicatorFlushBatch(Ice.Communicator communicator, Instance instance, String op, CallbackBase callback) + { + super(communicator, instance, op, callback); + + _observer = ObserverHelper.get(instance, op); // // _useCount is initialized to 1 to prevent premature callbacks. @@ -29,34 +46,22 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase // been initiated. // _useCount = 1; - - // - // Assume all connections are flushed synchronously. - // - _sentSynchronously = true; - - // - // Attach observer - // - _observer = ObserverHelper.get(instance, operation); } public void flushConnection(final Ice.ConnectionI con) { - class BatchOutgoingAsyncI extends BatchOutgoingAsync + class FlushBatch extends OutgoingAsyncBase { - public - BatchOutgoingAsyncI() + public FlushBatch() { - super(CommunicatorBatchOutgoingAsync.this._communicator, - CommunicatorBatchOutgoingAsync.this._instance, - CommunicatorBatchOutgoingAsync.this._operation, + super(CommunicatorFlushBatch.this.getCommunicator(), + CommunicatorFlushBatch.this._instance, + CommunicatorFlushBatch.this.getOperation(), null); } @Override - public boolean - sent() + public boolean sent() { if(_childObserver != null) { @@ -69,8 +74,7 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase // TODO: MJN: This is missing a test. @Override - public void - finished(Ice.Exception ex) + public boolean completed(Ice.Exception ex) { if(_childObserver != null) { @@ -79,37 +83,23 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase _childObserver = null; } doCheck(false); + return false; } - @Override - public void - attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size) - { - if(CommunicatorBatchOutgoingAsync.this._observer != null) - { - _childObserver = CommunicatorBatchOutgoingAsync.this._observer.getRemoteObserver(info, endpt, - requestId, size); - if(_childObserver != null) - { - _childObserver.attach(); - } - } - } - - @Override - protected void cancelRequest() + @Override + protected Ice.Instrumentation.InvocationObserver getObserver() { + return CommunicatorFlushBatch.this._observer; } } - synchronized(_monitor) + synchronized(this) { ++_useCount; } try { - int status; if(_instance.queueRequests()) { Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>() @@ -117,7 +107,7 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase @Override public Integer call() throws RetryException { - return con.flushAsyncBatchRequests(new BatchOutgoingAsyncI()); + return con.flushAsyncBatchRequests(new FlushBatch()); } }); @@ -126,7 +116,7 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase { try { - status = future.get(); + future.get(); if(interrupted) { Thread.currentThread().interrupt(); @@ -160,11 +150,7 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase } else { - status = con.flushAsyncBatchRequests(new BatchOutgoingAsyncI()); - } - if((status & AsyncStatus.Sent) > 0) - { - _sentSynchronously = false; + con.flushAsyncBatchRequests(new FlushBatch()); } } catch(Ice.LocalException ex) @@ -181,53 +167,28 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase private void doCheck(boolean userThread) { - synchronized(_monitor) + synchronized(this) { assert(_useCount > 0); if(--_useCount > 0) { return; } - _state |= StateDone | StateOK | StateSent; - _os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation - _monitor.notifyAll(); } - if(_callback == null || !_callback.__hasSentCallback()) - { - if(_observer != null) - { - _observer.detach(); - _observer = null; - } - } - else + if(sent(true)) { - // - // sentSynchronously_ is immutable here. - // - if(!_sentSynchronously || !userThread) + if(userThread) { - invokeSentAsync(); + _sentSynchronously = true; + invokeSent(); } else { - invokeSentInternal(); + invokeSentAsync(); } } } - @Override - public void - processRetry() - { - assert(false); // Retries are never scheduled - } - - @Override - protected void cancelRequest() - { - } - private int _useCount; } diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 91957af1a0c..e4c1e6b1727 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -22,12 +22,12 @@ public class ConnectRequestHandler this.os.swap(os); } - Request(OutgoingAsyncMessageCallback out) + Request(OutgoingAsyncBase out) { this.outAsync = out; } - OutgoingAsyncMessageCallback outAsync = null; + OutgoingAsyncBase outAsync = null; BasicStream os = null; } @@ -149,7 +149,7 @@ public class ConnectRequestHandler @Override public int - sendAsyncRequest(OutgoingAsyncMessageCallback out) + sendAsyncRequest(OutgoingAsyncBase out) throws RetryException { synchronized(this) @@ -159,6 +159,7 @@ public class ConnectRequestHandler if(!initialized()) { _requests.add(new Request(out)); + out.cancelable(this); return AsyncStatus.Queued; } } @@ -171,14 +172,14 @@ public class ConnectRequestHandler } @Override - public boolean - asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex) + public void + asyncRequestCanceled(OutgoingAsyncBase outAsync, Ice.LocalException ex) { synchronized(this) { if(_exception != null) { - return false; // The request has been notified of a failure already. + return; // The request has been notified of a failure already. } if(!initialized()) @@ -190,14 +191,17 @@ public class ConnectRequestHandler if(request.outAsync == outAsync) { it.remove(); - outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); - return true; // We're done + if(outAsync.completed(ex)) + { + outAsync.invokeCompletedAsync(); + } + return; } } assert(false); // The request has to be queued if it timed out and we're not initialized yet. } } - return _connection.asyncRequestCanceled(outAsync, ex); + _connection.asyncRequestCanceled(outAsync, ex); } @Override @@ -394,8 +398,7 @@ public class ConnectRequestHandler _flushing = true; } - final java.util.List<OutgoingAsyncMessageCallback> sentCallbacks = - new java.util.ArrayList<OutgoingAsyncMessageCallback>(); + final java.util.List<OutgoingAsyncBase> sentCallbacks = new java.util.ArrayList<OutgoingAsyncBase>(); try { java.util.Iterator<Request> p = _requests.iterator(); // _requests is immutable when _flushing = true @@ -476,7 +479,7 @@ public class ConnectRequestHandler @Override public void run() { - for(OutgoingAsyncMessageCallback callback : sentCallbacks) + for(OutgoingAsyncBase callback : sentCallbacks) { callback.invokeSent(); } @@ -546,7 +549,10 @@ public class ConnectRequestHandler { if(request.outAsync != null) { - request.outAsync.finished(_exception); + if(request.outAsync.completed(_exception)) + { + request.outAsync.invokeCompleted(); + } } } _requests.clear(); diff --git a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java deleted file mode 100644 index 5a1f0a30886..00000000000 --- a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java +++ /dev/null @@ -1,107 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; - -import Ice.CommunicatorDestroyedException; - -public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync -{ - public ConnectionBatchOutgoingAsync(Ice.ConnectionI con, Ice.Communicator communicator, Instance instance, - String operation, CallbackBase callback) - { - super(communicator, instance, operation, callback); - _connection = con; - } - - public void __invoke() - { - int status; - if(_instance.queueRequests()) - { - Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>() - { - @Override - public Integer call() throws RetryException - { - return _connection.flushAsyncBatchRequests(ConnectionBatchOutgoingAsync.this); - } - }); - - boolean interrupted = false; - while(true) - { - try - { - status = future.get(); - if(interrupted) - { - Thread.currentThread().interrupt(); - } - break; - } - catch(InterruptedException ex) - { - interrupted = true; - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); - } - catch(ExecutionException e) - { - try - { - throw e.getCause(); - } - catch(RuntimeException ex) - { - throw ex; - } - catch(Throwable ex) - { - assert(false); - } - } - } - } - else - { - status = _connection.flushAsyncBatchRequests(this); - } - - if((status & AsyncStatus.Sent) > 0) - { - _sentSynchronously = true; - if((status & AsyncStatus.InvokeSentCallback) > 0) - { - invokeSent(); - } - } - } - - @Override - public Ice.Connection getConnection() - { - return _connection; - } - - @Override - protected void cancelRequest() - { - _connection.asyncRequestCanceled(this, new Ice.OperationInterruptedException()); - } - - private Ice.ConnectionI _connection; -} diff --git a/java/src/IceInternal/ConnectionFlushBatch.java b/java/src/IceInternal/ConnectionFlushBatch.java new file mode 100644 index 00000000000..4b3da0bcb5e --- /dev/null +++ b/java/src/IceInternal/ConnectionFlushBatch.java @@ -0,0 +1,128 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; + +import Ice.CommunicatorDestroyedException; + +public class ConnectionFlushBatch extends OutgoingAsyncBase +{ + public static ConnectionFlushBatch check(Ice.AsyncResult r, Ice.Connection con, String operation) + { + check(r, operation); + if(!(r instanceof ConnectionFlushBatch)) + { + throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method"); + } + if(r.getConnection() != con) + { + throw new IllegalArgumentException("Connection for call to end_" + operation + + " does not match connection that was used to call corresponding " + + "begin_" + operation + " method"); + } + return (ConnectionFlushBatch)r; + } + + public ConnectionFlushBatch(Ice.ConnectionI con, Ice.Communicator communicator, Instance instance, + String operation, CallbackBase callback) + { + super(communicator, instance, operation, callback); + _connection = con; + } + + @Override + public Ice.Connection getConnection() + { + return _connection; + } + + public void invoke() + { + try + { + int status; + if(_instance.queueRequests()) + { + Future<Integer> future = _instance.getQueueExecutor().submit( + new Callable<Integer>() + { + @Override + public Integer call() throws RetryException + { + return _connection.flushAsyncBatchRequests(ConnectionFlushBatch.this); + } + }); + + boolean interrupted = false; + while(true) + { + try + { + status = future.get(); + if(interrupted) + { + Thread.currentThread().interrupt(); + } + break; + } + catch(InterruptedException ex) + { + interrupted = true; + } + catch(RejectedExecutionException e) + { + throw new CommunicatorDestroyedException(); + } + catch(ExecutionException e) + { + try + { + throw e.getCause(); + } + catch(RuntimeException ex) + { + throw ex; + } + catch(Throwable ex) + { + assert(false); + } + } + } + } + else + { + status = _connection.flushAsyncBatchRequests(this); + } + + if((status & AsyncStatus.Sent) > 0) + { + _sentSynchronously = true; + if((status & AsyncStatus.InvokeSentCallback) > 0) + { + invokeSent(); + } + } + } + catch(Ice.Exception ex) + { + if(completed(ex)) + { + invokeCompletedAsync(); + } + } + } + + private Ice.ConnectionI _connection; +} diff --git a/java/src/IceInternal/ConnectionRequestHandler.java b/java/src/IceInternal/ConnectionRequestHandler.java index f7ca7bb5b29..924b633e670 100644 --- a/java/src/IceInternal/ConnectionRequestHandler.java +++ b/java/src/IceInternal/ConnectionRequestHandler.java @@ -69,17 +69,17 @@ public class ConnectionRequestHandler implements RequestHandler } @Override - public int sendAsyncRequest(OutgoingAsyncMessageCallback out) + public int sendAsyncRequest(OutgoingAsyncBase out) throws RetryException { return out.send(_connection, _compress, _response); } @Override - public boolean - asyncRequestCanceled(OutgoingAsyncMessageCallback outgoingAsync, Ice.LocalException ex) + public void + asyncRequestCanceled(OutgoingAsyncBase outgoingAsync, Ice.LocalException ex) { - return _connection.asyncRequestCanceled(outgoingAsync, ex); + _connection.asyncRequestCanceled(outgoingAsync, ex); } @Override @@ -103,8 +103,8 @@ public class ConnectionRequestHandler implements RequestHandler return _connection; } - public ConnectionRequestHandler(Reference ref, Ice.ConnectionI connection, - boolean compress) { + public ConnectionRequestHandler(Reference ref, Ice.ConnectionI connection, boolean compress) + { _reference = ref; _response = _reference.getMode() == Reference.ModeTwoway; _connection = connection; diff --git a/java/src/IceInternal/GetConnectionOutgoingAsync.java b/java/src/IceInternal/GetConnectionOutgoingAsync.java deleted file mode 100644 index 55653630ca2..00000000000 --- a/java/src/IceInternal/GetConnectionOutgoingAsync.java +++ /dev/null @@ -1,148 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -public class GetConnectionOutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback -{ - public GetConnectionOutgoingAsync(Ice.ObjectPrxHelperBase prx, String operation, CallbackBase cb) - { - super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb); - _proxy = (Ice.ObjectPrxHelperBase) prx; - _cnt = 0; - _observer = ObserverHelper.get(prx, operation); - } - - public void __invoke() - { - while(true) - { - try - { - _handler = _proxy.__getRequestHandler(); - _handler.sendAsyncRequest(this); - } - catch(RetryException ex) - { - _proxy.__setRequestHandler(_handler, null); - } - catch(Ice.Exception ex) - { - handleException(ex); - } - break; - } - } - - @Override - public Ice.ObjectPrx getProxy() - { - return _proxy; - } - - @Override - public int send(Ice.ConnectionI conection, boolean compress, boolean response) - throws RetryException - { - sent(); - return 0; - } - - @Override - public int invokeCollocated(CollocatedRequestHandler handler) - { - sent(); - return 0; - } - - @Override - public boolean sent() - { - synchronized(_monitor) - { - _state |= StateDone; - _monitor.notifyAll(); - } - invokeCompleted(); - return false; - } - - @Override - public void invokeSent() - { - // No sent callback - } - - @Override - public void finished(Ice.Exception exc) - { - try - { - handleException(exc); - } - catch(Ice.Exception ex) - { - invokeExceptionAsync(ex); - } - } - - @Override - void processRetry() - { - __invoke(); - } - - @Override - public void dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) - { - threadPool.dispatch(new DispatchWorkItem(connection) - { - @Override - public void run() - { - GetConnectionOutgoingAsync.this.finished(ex); - } - }); - } - - @Override - protected void cancelRequest() - { - if(_handler != null) - { - _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException()); - } - } - - private void handleException(Ice.Exception exc) - { - try - { - Ice.Holder<Integer> interval = new Ice.Holder<Integer>(); - _cnt = _proxy.__handleException(exc, _handler, Ice.OperationMode.Idempotent, false, interval, _cnt); - if(_observer != null) - { - _observer.retried(); // Invocation is being retried - } - _instance.retryQueue().add(this, interval.value); - } - catch(Ice.Exception ex) - { - if(_observer != null) - { - _observer.failed(ex.ice_name()); - } - throw ex; - } - } - - private Ice.ObjectPrxHelperBase _proxy; - private RequestHandler _handler = null; - private int _cnt; -} diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index 89dcf9b44ed..31306a14ca6 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -170,7 +170,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice } public void - flushAsyncBatchRequests(CommunicatorBatchOutgoingAsync outAsync) + flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) { for(Ice.ConnectionI c : connections()) // connections() is synchronized, no need to synchronize here. { diff --git a/java/src/IceInternal/ObjectAdapterFactory.java b/java/src/IceInternal/ObjectAdapterFactory.java index 63c3e592669..a723537cd7e 100644 --- a/java/src/IceInternal/ObjectAdapterFactory.java +++ b/java/src/IceInternal/ObjectAdapterFactory.java @@ -218,7 +218,7 @@ public final class ObjectAdapterFactory } public void - flushAsyncBatchRequests(CommunicatorBatchOutgoingAsync outAsync) + flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) { java.util.List<Ice.ObjectAdapterI> adapters; synchronized(this) diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index ef4d3d7c959..e42c817b42c 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -9,41 +9,47 @@ package IceInternal; -public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback +public class OutgoingAsync extends ProxyOutgoingAsyncBase { + public static OutgoingAsync check(Ice.AsyncResult r, Ice.ObjectPrx prx, String operation) + { + ProxyOutgoingAsyncBase.checkImpl(r, prx, operation); + try + { + return (OutgoingAsync)r; + } + catch(ClassCastException ex) + { + throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method"); + } + } + public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb) { - super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb); - _proxy = (Ice.ObjectPrxHelperBase) prx; + super((Ice.ObjectPrxHelperBase)prx, operation, cb); _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding()); + _is = null; } - public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, IceInternal.BasicStream is, - IceInternal.BasicStream os) + public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, BasicStream is, BasicStream os) { - super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb, - is, os); - _proxy = (Ice.ObjectPrxHelperBase) prx; + super((Ice.ObjectPrxHelperBase)prx, operation, cb, os); _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding()); + _is = is; } public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx, boolean explicitCtx, boolean synchronous) { - _handler = null; - _cnt = 0; - _sent = false; + Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol())); + _mode = mode; - _sentSynchronously = false; _synchronous = synchronous; - Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol())); - if(explicitCtx && ctx == null) { ctx = _emptyContext; } - _observer = ObserverHelper.get(_proxy, operation, ctx); switch(_proxy.__reference().getMode()) @@ -137,12 +143,6 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes } @Override - public Ice.ObjectPrx getProxy() - { - return _proxy; - } - - @Override public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException { _cachedConnection = connection; @@ -158,117 +158,52 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes // Disable caching by marking the streams as cached! _state |= StateCachedBuffers; } - handler.invokeAsyncRequest(this, _synchronous); - return AsyncStatus.Queued; + return handler.invokeAsyncRequest(this, _synchronous); } @Override - public boolean sent() + public void abort(Ice.Exception ex) { - synchronized(_monitor) + int mode = _proxy.__reference().getMode(); + if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) { - boolean alreadySent = (_state & StateSent) != 0; - _state |= StateSent; - _sent = true; - - assert ((_state & StateDone) == 0); - - if(!_proxy.ice_isTwoway()) + if(_handler != null) { - if(_childObserver != null) - { - _childObserver.detach(); - _childObserver = null; - } - if(_observer != null && (_callback == null || !_callback.__hasSentCallback())) - { - _observer.detach(); - _observer = null; - } - if(_timeoutRequestHandler != null) - { - _future.cancel(false); - _future = null; - _timeoutRequestHandler = null; - } - _state |= StateDone | StateOK; - // _os.resize(0, false); // Don't clear the buffer now, it's - // needed for the collocation optimization - - // For oneway requests after the data has been sent the buffers - // can be reused unless this is a collocated invocation. For - // collocated invocations the buffer won't be reused as the - // because it has already been marked as cached in - // invokeCollocated. - cacheMessageBuffers(); + // + // If we didn't finish a batch oneway or datagram request, we + // must notify the connection about that we give up ownership + // of the batch stream. + // + _handler.abortBatchRequest(); } - _monitor.notifyAll(); - - // Don't call the sent call is already sent. - return !alreadySent && _callback != null && _callback.__hasSentCallback(); } - } - @Override - public void invokeSent() - { - invokeSentInternal(); + super.abort(ex); } - @Override - public void finished(Ice.Exception exc) + public void invoke() { - synchronized(_monitor) + int mode = _proxy.__reference().getMode(); + if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) { - assert ((_state & StateDone) == 0); - if(_childObserver != null) - { - _childObserver.failed(exc.ice_name()); - _childObserver.detach(); - _childObserver = null; - } - if(_timeoutRequestHandler != null) + if(_handler != null) { - _future.cancel(false); - _future = null; - _timeoutRequestHandler = null; + _sentSynchronously = true; + _handler.finishBatchRequest(_os); + finished(true); } + return; // Don't call sent/completed callback for batch AMI requests } // - // NOTE: at this point, synchronization isn't needed, no other threads - // should be calling on the callback. + // NOTE: invokeImpl doesn't throw so this can be called from the + // try block with the catch block calling abort() in case of an + // exception. // - try - { - handleException(exc); - } - catch(Ice.Exception ex) - { - invokeException(ex); - } - } - - @Override - void processRetry() - { - invoke(false); - } - - @Override - public void dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) - { - threadPool.dispatch(new DispatchWorkItem(connection) - { - @Override - public void run() - { - OutgoingAsync.this.finished(ex); - } - }); + invokeImpl(true); // userThread = true } - public final boolean finished(BasicStream is) + public final boolean completed(BasicStream is) { // // NOTE: this method is called from ConnectionI.parseMessage @@ -276,291 +211,153 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes // any user callbacks. // - assert (_proxy.ice_isTwoway()); // Can only be called for twoways. - + assert(_proxy.ice_isTwoway()); // Can only be called for twoways. + + if(_childObserver != null) + { + _childObserver.reply(is.size() - Protocol.headerSize - 4); + _childObserver.detach(); + _childObserver = null; + } + byte replyStatus; try { - synchronized(_monitor) + // _is can already be initialized if the invocation is retried + if(_is == null) + { + _is = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding); + } + _is.swap(is); + replyStatus = _is.readByte(); + + switch(replyStatus) + { + case ReplyStatus.replyOK: + { + break; + } + + case ReplyStatus.replyUserException: { - assert (_exception == null && (_state & StateDone) == 0); - if(_childObserver != null) + if(_observer != null) { - _childObserver.reply(is.size() - Protocol.headerSize - 4); - _childObserver.detach(); - _childObserver = null; + _observer.userException(); } - - if(_timeoutRequestHandler != null) + break; + } + + case ReplyStatus.replyObjectNotExist: + case ReplyStatus.replyFacetNotExist: + case ReplyStatus.replyOperationNotExist: + { + Ice.Identity id = new Ice.Identity(); + id.__read(_is); + + // + // For compatibility with the old FacetPath. + // + String[] facetPath = _is.readStringSeq(); + String facet; + if(facetPath.length > 0) { - _future.cancel(false); - _future = null; - _timeoutRequestHandler = null; + if(facetPath.length > 1) + { + throw new Ice.MarshalException(); + } + facet = facetPath[0]; } - - // _is can already be initialized if the invocation is retried - if(_is == null) + else { - _is = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding); + facet = ""; } - _is.swap(is); - replyStatus = _is.readByte(); - + + String operation = _is.readString(); + + Ice.RequestFailedException ex = null; switch(replyStatus) { - case ReplyStatus.replyOK: - { - break; - } - - case ReplyStatus.replyUserException: - { - if(_observer != null) - { - _observer.userException(); - } - break; - } - - case ReplyStatus.replyObjectNotExist: - case ReplyStatus.replyFacetNotExist: - case ReplyStatus.replyOperationNotExist: - { - Ice.Identity id = new Ice.Identity(); - id.__read(_is); - - // - // For compatibility with the old FacetPath. - // - String[] facetPath = _is.readStringSeq(); - String facet; - if(facetPath.length > 0) - { - if(facetPath.length > 1) - { - throw new Ice.MarshalException(); - } - facet = facetPath[0]; - } - else - { - facet = ""; - } - - String operation = _is.readString(); - - Ice.RequestFailedException ex = null; - switch(replyStatus) - { - case ReplyStatus.replyObjectNotExist: - { - ex = new Ice.ObjectNotExistException(); - break; - } - - case ReplyStatus.replyFacetNotExist: - { - ex = new Ice.FacetNotExistException(); - break; - } - - case ReplyStatus.replyOperationNotExist: - { - ex = new Ice.OperationNotExistException(); - break; - } - - default: - { - assert (false); - break; - } - } - - ex.id = id; - ex.facet = facet; - ex.operation = operation; - throw ex; - } - - case ReplyStatus.replyUnknownException: - case ReplyStatus.replyUnknownLocalException: - case ReplyStatus.replyUnknownUserException: - { - String unknown = _is.readString(); - - Ice.UnknownException ex = null; - switch(replyStatus) - { - case ReplyStatus.replyUnknownException: - { - ex = new Ice.UnknownException(); - break; - } - - case ReplyStatus.replyUnknownLocalException: - { - ex = new Ice.UnknownLocalException(); - break; - } - - case ReplyStatus.replyUnknownUserException: - { - ex = new Ice.UnknownUserException(); - break; - } - - default: - { - assert (false); - break; - } - } - - ex.unknown = unknown; - throw ex; - } + case ReplyStatus.replyObjectNotExist: + { + ex = new Ice.ObjectNotExistException(); + break; + } - default: - { - throw new Ice.UnknownReplyStatusException(); - } + case ReplyStatus.replyFacetNotExist: + { + ex = new Ice.FacetNotExistException(); + break; } - if(replyStatus == ReplyStatus.replyOK) + case ReplyStatus.replyOperationNotExist: { - _state |= StateOK; + ex = new Ice.OperationNotExistException(); + break; } - _state |= StateDone; - _monitor.notifyAll(); - if(_callback == null) + default: { - if(_observer != null) - { - _observer.detach(); - _observer = null; - } - return false; + assert(false); + break; } - return true; - } - } - catch(Ice.Exception exc) - { - // - // We don't call finished(exc) here because we don't want - // to invoke the completion callback. The completion - // callback is invoked by the connection is this method - // returns true. - // - try - { - handleException(exc); - return false; + } + + ex.id = id; + ex.facet = facet; + ex.operation = operation; + throw ex; } - catch(Ice.LocalException ex) + + case ReplyStatus.replyUnknownException: + case ReplyStatus.replyUnknownLocalException: + case ReplyStatus.replyUnknownUserException: { - synchronized(_monitor) - { - _state |= StateDone; - _exception = ex; - _monitor.notifyAll(); + String unknown = _is.readString(); - if(_callback == null) - { - if(_observer != null) - { - _observer.detach(); - _observer = null; - } - return false; - } - return true; + Ice.UnknownException ex = null; + switch(replyStatus) + { + case ReplyStatus.replyUnknownException: + { + ex = new Ice.UnknownException(); + break; } - } - } - } - public final boolean invoke(boolean userThread) - { - int mode = _proxy.__reference().getMode(); - if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) - { - _state |= StateDone | StateOK; - _handler.finishBatchRequest(_os); - if(_observer != null) - { - _observer.detach(); - _observer = null; - } - return true; - } + case ReplyStatus.replyUnknownLocalException: + { + ex = new Ice.UnknownLocalException(); + break; + } - while(true) - { - try - { - _sent = false; - _handler = _proxy.__getRequestHandler(); - int status = _handler.sendAsyncRequest(this); - if((status & AsyncStatus.Sent) > 0) + case ReplyStatus.replyUnknownUserException: { - if(userThread) - { - _sentSynchronously = true; - if((status & AsyncStatus.InvokeSentCallback) > 0) - { - invokeSent(); // Call from the user thread. - } - } - else - { - if((status & AsyncStatus.InvokeSentCallback) > 0) - { - // Call from a client thread pool thread. - invokeSentAsync(); - } - } + ex = new Ice.UnknownUserException(); + break; } - if(mode == IceInternal.Reference.ModeTwoway || (status & AsyncStatus.Sent) == 0) + default: { - synchronized(_monitor) - { - if((_state & StateDone) == 0) - { - int invocationTimeout = _handler.getReference().getInvocationTimeout(); - if(invocationTimeout > 0) - { - _future = _instance.timer().schedule(new Runnable() - { - @Override - public void run() - { - timeout(); - } - }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS); - _timeoutRequestHandler = _handler; - } - } - } + assert(false); + break; + } } + + ex.unknown = unknown; + throw ex; } - catch(RetryException ex) + + default: { - // Clear request handler and retry. - _proxy.__setRequestHandler(_handler, null); - continue; + throw new Ice.UnknownReplyStatusException(); } - catch(Ice.Exception ex) - { - // This will throw if the invocation can't be retried. - handleException(ex); } - break; + + return finished(replyStatus == ReplyStatus.replyOK); + } + catch(Ice.Exception ex) + { + return completed(ex); } - return _sentSynchronously; } public BasicStream startWriteParams(Ice.FormatType format) @@ -591,12 +388,48 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes } } + public IceInternal.BasicStream startReadParams() + { + _is.startReadEncaps(); + return _is; + } + + public void endReadParams() + { + _is.endReadEncaps(); + } + + public void readEmptyParams() + { + _is.skipEmptyEncaps(null); + } + + public byte[] readParamEncaps() + { + return _is.readEncaps(null); + } + + public final void throwUserException() + throws Ice.UserException + { + try + { + _is.startReadEncaps(); + _is.throwException(null); + } + catch(Ice.UserException ex) + { + _is.endReadEncaps(); + throw ex; + } + } + @Override public void cacheMessageBuffers() { if(_proxy.__reference().getInstance().cacheMessageBuffers() > 0) { - synchronized(_monitor) + synchronized(this) { if((_state & StateCachedBuffers) > 0) { @@ -612,76 +445,14 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes _os.reset(); _proxy.cacheMessageBuffers(_is, _os); - } - } - @Override - public void invokeExceptionAsync(final Ice.Exception ex) - { - if((_state & StateDone) == 0 && _handler != null) - { - // - // If we didn't finish a batch oneway or datagram request, we - // must notify the connection about that we give up ownership - // of the batch stream. - // - int mode = _proxy.__reference().getMode(); - if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) - { - _handler.abortBatchRequest(); - } - } - - super.invokeExceptionAsync(ex); - } - - @Override - protected void cancelRequest() - { - if(_handler != null) - { - _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException()); + _is = null; + _os = null; } } - private void handleException(Ice.Exception exc) - { - try - { - Ice.Holder<Integer> interval = new Ice.Holder<Integer>(); - _cnt = _proxy.__handleException(exc, _handler, _mode, _sent, interval, _cnt); - if(_observer != null) - { - _observer.retried(); // Invocation is being retried. - } - - // - // Schedule the retry. Note that we always schedule the retry - // on the retry queue even if the invocation can be retried - // immediately. This is required because it might not be safe - // to retry from this thread (this is for instance called by - // finished(BasicStream) which is called with the connection - // locked. - // - _instance.retryQueue().add(this, interval.value); - } - catch(Ice.Exception ex) - { - if(_observer != null) - { - _observer.failed(ex.ice_name()); - } - throw ex; - } - } - - final private Ice.ObjectPrxHelperBase _proxy; final private Ice.EncodingVersion _encoding; - - private RequestHandler _handler; - private int _cnt; - private Ice.OperationMode _mode; - private boolean _sent; + private BasicStream _is; // // If true this AMI request is being used for a generated synchronous invocation. diff --git a/java/src/IceInternal/OutgoingAsyncBase.java b/java/src/IceInternal/OutgoingAsyncBase.java index 2e6aa221f4d..f04b09ea941 100644 --- a/java/src/IceInternal/OutgoingAsyncBase.java +++ b/java/src/IceInternal/OutgoingAsyncBase.java @@ -9,385 +9,41 @@ package IceInternal; -import Ice.AsyncResult; -import Ice.Communicator; -import Ice.CommunicatorDestroyedException; -import Ice.Connection; -import Ice.ObjectPrx; -import Ice.UserException; - -/** - * An AsyncResult object is the return value of an asynchronous invocation. - * With this object, an application can obtain several attributes of the - * invocation and discover its outcome. - **/ -public abstract class OutgoingAsyncBase implements Ice.AsyncResult +// +// Base class for handling asynchronous invocations. This class is +// responsible for the handling of the output stream and the child +// invocation observer. +// +public abstract class OutgoingAsyncBase extends IceInternal.AsyncResultI { - protected OutgoingAsyncBase(Communicator communicator, IceInternal.Instance instance, String op, - IceInternal.CallbackBase del) + public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException { - _communicator = communicator; - _instance = instance; - _operation = op; - _os = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding); - _state = 0; - _sentSynchronously = false; - _exception = null; - _callback = del; - } - - protected OutgoingAsyncBase(Communicator communicator, Instance instance, String op, CallbackBase del, - BasicStream is, BasicStream os) - { - _communicator = communicator; - _instance = instance; - _operation = op; - _os = os; - _is = is; - _state = 0; - _sentSynchronously = false; - _exception = null; - _callback = del; - } - - /** - * Returns the communicator that sent the invocation. - * - * @return The communicator. - **/ - @Override - public Communicator getCommunicator() - { - return _communicator; - } - - /** - * Returns the connection that was used for the invocation. - * - * @return The connection. - **/ - @Override - public Connection getConnection() - { - return null; - } - - /** - * Returns the proxy that was used to call the <code>begin_</code> method. - * - * @return The proxy. - **/ - @Override - public ObjectPrx getProxy() - { - return null; - } - - /** - * Indicates whether the result of an invocation is available. - * - * @return True if the result is available, which means a call to the <code>end_</code> - * method will not block. The method returns false if the result is not yet available. - **/ - @Override - public final boolean isCompleted() - { - synchronized(_monitor) - { - return (_state & StateDone) > 0; - } + assert(false); // This should be overriden if this object is used with a request handler + return AsyncStatus.Queued; } - /** - * Blocks the caller until the result of the invocation is available. - **/ - @Override - public final void waitForCompleted() + public int invokeCollocated(CollocatedRequestHandler handler) { - synchronized(_monitor) - { - if(Thread.interrupted()) - { - throw new Ice.OperationInterruptedException(); - } - while((_state & StateDone) == 0) - { - try - { - _monitor.wait(); - } - catch(InterruptedException ex) - { - throw new Ice.OperationInterruptedException(); - } - } - } + assert(false); // This should be overriden if this object is used with a request handler + return AsyncStatus.Queued; } - /** - * When you call the <code>begin_</code> method, the Ice run time attempts to - * write the corresponding request to the client-side transport. If the - * transport cannot accept the request, the Ice run time queues the request - * for later transmission. This method returns true if, at the time it is called, - * the request has been written to the local transport (whether it was initially - * queued or not). Otherwise, if the request is still queued, this method returns - * false. - * - * @return True if the request has been sent, or false if the request is queued. - **/ - @Override - public final boolean isSent() + public boolean sent() { - synchronized(_monitor) - { - return (_state & StateSent) > 0; - } + return sent(true); } - /** - * Blocks the caller until the request has been written to the client-side transport. - **/ - @Override - public final void waitForSent() + public boolean completed(Ice.Exception ex) { - synchronized(_monitor) - { - if(Thread.interrupted()) - { - throw new Ice.OperationInterruptedException(); - } - while((_state & StateSent) == 0 && _exception == null) - { - try - { - _monitor.wait(); - } - catch(InterruptedException ex) - { - throw new Ice.OperationInterruptedException(); - } - } - } + return finished(ex); } - /** - * If the invocation failed with a local exception, throws the local exception. - **/ - @Override - public final void throwLocalException() + public final void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId) { - synchronized(_monitor) - { - if(_exception != null) - { - throw _exception; - } - } - } - - /** - * This method returns true if a request was written to the client-side - * transport without first being queued. If the request was initially - * queued, this method returns false (independent of whether the request - * is still in the queue or has since been written to the client-side transport). - * - * @return True if the request was sent without being queued, or false - * otherwise. - **/ - @Override - public final boolean sentSynchronously() - { - return _sentSynchronously; // No lock needed, immutable once __send() is called - } - - /** - * Returns the name of the operation. - * - * @return The operation name. - **/ - @Override - public final String getOperation() - { - return _operation; - } - - public final IceInternal.BasicStream getOs() - { - return _os; - } - - public IceInternal.BasicStream - startReadParams() - { - _is.startReadEncaps(); - return _is; - } - - public void - endReadParams() - { - _is.endReadEncaps(); - } - - public void - readEmptyParams() - { - _is.skipEmptyEncaps(null); - } - - public byte[] - readParamEncaps() - { - return _is.readEncaps(null); - } - - public final boolean __wait() - { - try - { - synchronized(_monitor) - { - if((_state & StateEndCalled) > 0) - { - throw new java.lang.IllegalArgumentException("end_ method called more than once"); - } - - _state |= StateEndCalled; - if(Thread.interrupted()) - { - throw new InterruptedException(); - } - while((_state & StateDone) == 0) - { - _monitor.wait(); - } - - if(_exception != null) - { - throw (Ice.Exception)_exception.fillInStackTrace(); - } - - return (_state & StateOK) > 0; - } - } - catch(InterruptedException ex) - { - // This must be called outside of the monitor as the - // invocation will potentially want to lock the - // connection (which in turn may want to lock the outgoing - // to notify that the message has been sent). - cancelRequest(); - throw new Ice.OperationInterruptedException(); - } - } - - public final void throwUserException() - throws UserException - { - try - { - _is.startReadEncaps(); - _is.throwException(null); - } - catch(UserException ex) - { - _is.endReadEncaps(); - throw ex; - } - } - - public void invokeExceptionAsync(final Ice.Exception ex) - { - // - // This is called when it's not safe to call the exception callback synchronously - // from this thread. Instead the exception callback is called asynchronously from - // the client thread pool. - // - try - { - _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection) - { - @Override - public void - run() - { - invokeException(ex); - } - }); - } - catch(CommunicatorDestroyedException exc) - { - throw exc; // CommunicatorDestroyedException is the only exception that can propagate directly. - } - } - - public final void invokeException(Ice.Exception ex) - { - synchronized(_monitor) - { - _state |= StateDone; - //_os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation - _exception = ex; - _monitor.notifyAll(); - } - - invokeCompleted(); - } - - protected final void invokeSentInternal() - { - // - // Note: no need to change the _state here, specializations are responsible for - // changing the state. - // - - if(_callback != null) - { - if(_instance.useApplicationClassLoader()) - { - Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader()); - } - - try - { - _callback.__sent(this); - } - catch(RuntimeException ex) - { - warning(ex); - } - catch(AssertionError exc) - { - error(exc); - } - catch(OutOfMemoryError exc) - { - error(exc); - } - finally - { - if(_instance.useApplicationClassLoader()) - { - Thread.currentThread().setContextClassLoader(null); - } - } - } - if(_observer != null) { - Ice.ObjectPrx proxy = getProxy(); - if(proxy == null || !proxy.ice_isTwoway()) - { - _observer.detach(); - } - } - } - - public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size) - { - if(_observer != null) - { - _childObserver = _observer.getRemoteObserver(info, endpt, requestId, size); + final int size = _os.size() - IceInternal.Protocol.headerSize - 4; + _childObserver = getObserver().getRemoteObserver(info, endpt, requestId, size); if(_childObserver != null) { _childObserver.attach(); @@ -395,13 +51,12 @@ public abstract class OutgoingAsyncBase implements Ice.AsyncResult } } - void attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId) + public final void attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId) { if(_observer != null) { - _childObserver = _observer.getCollocatedObserver(adapter, - requestId, - _os.size() - IceInternal.Protocol.headerSize - 4); + final int size = _os.size() - IceInternal.Protocol.headerSize - 4; + _childObserver = getObserver().getCollocatedObserver(adapter, requestId, size); if(_childObserver != null) { _childObserver.attach(); @@ -409,185 +64,49 @@ public abstract class OutgoingAsyncBase implements Ice.AsyncResult } } - abstract void processRetry(); - - final protected void invokeSentAsync() - { - // - // This is called when it's not safe to call the sent callback synchronously - // from this thread. Instead the exception callback is called asynchronously from - // the client thread pool. - // - try - { - _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection) - { - @Override - public void run() - { - invokeSentInternal(); - } - }); - } - catch(CommunicatorDestroyedException exc) - { - } - } - - public static void check(AsyncResult r, ObjectPrx prx, String operation) - { - check(r, operation); - if(r.getProxy() != prx) - { - throw new IllegalArgumentException("Proxy for call to end_" + operation + - " does not match proxy that was used to call corresponding begin_" + - operation + " method"); - } - } - - public static void check(AsyncResult r, Connection con, String operation) + public final IceInternal.BasicStream getOs() { - check(r, operation); - if(r.getConnection() != con) - { - throw new IllegalArgumentException("Connection for call to end_" + operation + - " does not match connection that was used to call corresponding begin_" + - operation + " method"); - } + return _os; } - public static void check(AsyncResult r, Communicator com, String operation) + protected OutgoingAsyncBase(Ice.Communicator com, Instance instance, String op, CallbackBase del) { - check(r, operation); - if(r.getCommunicator() != com) - { - throw new IllegalArgumentException("Communicator for call to end_" + operation + - " does not match communicator that was used to call corresponding " + - "begin_" + operation + " method"); - } + super(com, instance, op, del); + _os = new BasicStream(instance, Protocol.currentProtocolEncoding); } - public void cacheMessageBuffers() + protected OutgoingAsyncBase(Ice.Communicator com, Instance instance, String op, CallbackBase del, BasicStream os) { + super(com, instance, op, del); + _os = os; } - public final void invokeCompleted() + @Override + protected boolean sent(boolean done) { - // - // Note: no need to change the _state here, specializations are responsible for - // changing the state. - // - - if(_callback != null) + if(done) { - if(_instance.useApplicationClassLoader()) - { - Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader()); - } - - try - { - _callback.__completed(this); - } - catch(RuntimeException ex) - { - warning(ex); - } - catch(AssertionError exc) - { - error(exc); - } - catch(OutOfMemoryError exc) - { - error(exc); - } - finally + if(_childObserver != null) { - if(_instance.useApplicationClassLoader()) - { - Thread.currentThread().setContextClassLoader(null); - } + _childObserver.detach(); + _childObserver = null; } } - - if(_observer != null) - { - _observer.detach(); - _observer = null; - } + return super.sent(done); } - protected void - timeout() - { - IceInternal.RequestHandler handler; - synchronized(_monitor) - { - handler = _timeoutRequestHandler; - _timeoutRequestHandler = null; - } - - if(handler != null) - { - handler.asyncRequestCanceled((IceInternal.OutgoingAsyncMessageCallback)this, - new Ice.InvocationTimeoutException()); - } - } - - private static void check(AsyncResult r, String operation) - { - if(r == null) - { - throw new IllegalArgumentException("AsyncResult == null"); - } - else if(r.getOperation() != operation) // Do NOT use equals() here - we are comparing reference equality - { - throw new IllegalArgumentException("Incorrect operation for end_" + operation + " method: " + - r.getOperation()); - } - } - - private final void warning(RuntimeException ex) + @Override + protected boolean finished(Ice.Exception ex) { - if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + if(_childObserver != null) { - String s = "exception raised by AMI callback:\n" + IceInternal.Ex.toString(ex); - _instance.initializationData().logger.warning(s); + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); + _childObserver = null; } + return super.finished(ex); } - private final void error(Error error) - { - String s = "error raised by AMI callback:\n" + IceInternal.Ex.toString(error); - _instance.initializationData().logger.error(s); - } - - abstract protected void cancelRequest(); - - protected Communicator _communicator; - protected IceInternal.Instance _instance; - protected String _operation; - protected Ice.Connection _cachedConnection; - - protected java.lang.Object _monitor = new java.lang.Object(); - protected IceInternal.BasicStream _is; - protected IceInternal.BasicStream _os; - - protected IceInternal.RequestHandler _timeoutRequestHandler; - protected java.util.concurrent.Future<?> _future; - - protected static final byte StateOK = 0x1; - protected static final byte StateDone = 0x2; - protected static final byte StateSent = 0x4; - protected static final byte StateEndCalled = 0x8; - protected static final byte StateCachedBuffers = 0x10; - - protected byte _state; - protected boolean _sentSynchronously; - protected Ice.Exception _exception; - - protected Ice.Instrumentation.InvocationObserver _observer; + protected BasicStream _os; protected Ice.Instrumentation.ChildInvocationObserver _childObserver; - - protected IceInternal.CallbackBase _callback; } diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java deleted file mode 100644 index 7b1f5972434..00000000000 --- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java +++ /dev/null @@ -1,53 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -// -// This interface is used by the connection to handle OutgoingAsync -// and BatchOutgoingAsync messages. -// -public interface OutgoingAsyncMessageCallback -{ - // - // Called by the request handler to send the request over the connection. - // - int send(Ice.ConnectionI conection, boolean compress, boolean response) - throws RetryException; - - // - // Called by the collocated request handler to invoke the request. - // - int invokeCollocated(CollocatedRequestHandler handler); - - // - // Called by the connection when the message is confirmed sent. The - // connection is locked when this is called so this method can't call the - // sent callback. Instead, this method returns true if there's a sent - // callback and false otherwise. If true is returned, the connection will - // call the __invokeSent() method bellow (which in turn should call the sent - // callback). - // - boolean sent(); - - // - // Called by the connection to call the user sent callback. - // - void invokeSent(); - - // - // Called by the connection when the request failed. - // - void finished(Ice.Exception ex); - - // - // Helper to dispatch the cancellation exception. - // - void dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection); -} diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 534888ec89e..116038ad537 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -265,7 +265,7 @@ public final class OutgoingConnectionFactory } public void - flushAsyncBatchRequests(CommunicatorBatchOutgoingAsync outAsync) + flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) { java.util.List<Ice.ConnectionI> c = new java.util.LinkedList<Ice.ConnectionI>(); diff --git a/java/src/IceInternal/ProxyBatchOutgoingAsync.java b/java/src/IceInternal/ProxyBatchOutgoingAsync.java deleted file mode 100644 index 4a575142da7..00000000000 --- a/java/src/IceInternal/ProxyBatchOutgoingAsync.java +++ /dev/null @@ -1,97 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync -{ - public ProxyBatchOutgoingAsync(Ice.ObjectPrxHelperBase prx, String operation, CallbackBase callback) - { - super(prx.ice_getCommunicator(), prx.__reference().getInstance(), operation, callback); - _proxy = prx; - _observer = ObserverHelper.get(prx, operation); - } - - public void __invoke() - { - Protocol.checkSupportedProtocol(_proxy.__reference().getProtocol()); - - try - { - _handler = _proxy.__getRequestHandler(); - int status = _handler.sendAsyncRequest(this); - if((status & AsyncStatus.Sent) > 0) - { - _sentSynchronously = true; - if((status & AsyncStatus.InvokeSentCallback) > 0) - { - invokeSent(); - } - } - else - { - synchronized(_monitor) - { - if((_state & StateDone) == 0) - { - int invocationTimeout = _handler.getReference().getInvocationTimeout(); - if(invocationTimeout > 0) - { - _future = _instance.timer().schedule(new Runnable() - { - @Override - public void run() - { - timeout(); - } - }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS); - _timeoutRequestHandler = _handler; - } - } - } - } - } - catch(RetryException ex) - { - // - // Clear request handler but don't retry or throw. Retrying - // isn't useful, there were no batch requests associated with - // the proxy's request handler. - // - _proxy.__setRequestHandler(_handler, null); - } - catch(Ice.Exception ex) - { - if(_observer != null) - { - _observer.failed(ex.ice_name()); - } - _proxy.__setRequestHandler(_handler, null); // Clear request handler - throw ex; // Throw to notify the user lthat batch requests were potentially lost. - } - } - - @Override - public Ice.ObjectPrx getProxy() - { - return _proxy; - } - - @Override - protected void cancelRequest() - { - if(_handler != null) - { - _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException()); - } - } - - final private Ice.ObjectPrxHelperBase _proxy; - private RequestHandler _handler = null; -} diff --git a/java/src/IceInternal/ProxyFactory.java b/java/src/IceInternal/ProxyFactory.java index fb664c89b9b..a5165c37374 100644 --- a/java/src/IceInternal/ProxyFactory.java +++ b/java/src/IceInternal/ProxyFactory.java @@ -217,7 +217,7 @@ public final class ProxyFactory // // Don't retry invocation timeouts. // - if(ex instanceof Ice.InvocationTimeoutException) + if(ex instanceof Ice.InvocationTimeoutException || ex instanceof Ice.InvocationCanceledException) { throw ex; } diff --git a/java/src/IceInternal/ProxyFlushBatch.java b/java/src/IceInternal/ProxyFlushBatch.java new file mode 100644 index 00000000000..d3708dc67e3 --- /dev/null +++ b/java/src/IceInternal/ProxyFlushBatch.java @@ -0,0 +1,71 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class ProxyFlushBatch extends ProxyOutgoingAsyncBase +{ + public static ProxyFlushBatch check(Ice.AsyncResult r, Ice.ObjectPrx prx, String operation) + { + ProxyOutgoingAsyncBase.checkImpl(r, prx, operation); + try + { + return (ProxyFlushBatch)r; + } + catch(ClassCastException ex) + { + throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method"); + } + } + + public ProxyFlushBatch(Ice.ObjectPrxHelperBase prx, String operation, CallbackBase callback) + { + super(prx, operation, callback); + _observer = ObserverHelper.get(prx, operation); + } + + @Override + public boolean sent() + { + return sent(true); // Overriden because the flush is done even if using a two-way proxy. + } + + @Override + public int send(Ice.ConnectionI connection, boolean compress, boolean response) + { + _cachedConnection = connection; + return connection.flushAsyncBatchRequests(this); + } + + @Override + public int invokeCollocated(CollocatedRequestHandler handler) + { + return handler.invokeAsyncBatchRequests(this); + } + + public void invoke() + { + Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol())); + invokeImpl(true); // userThread = true + } + + @Override + protected void handleRetryException(RetryException exc) + { + _proxy.__setRequestHandler(_handler, null); // Clear request handler + throw exc.get(); // No retries, we want to notify the user of potentially lost batch requests + } + + @Override + protected int handleException(Ice.Exception exc) + { + _proxy.__setRequestHandler(_handler, null); // Clear request handler + throw exc; // No retries, we want to notify the user of potentially lost batch requests + } +} diff --git a/java/src/IceInternal/ProxyGetConnection.java b/java/src/IceInternal/ProxyGetConnection.java new file mode 100644 index 00000000000..9f5388218db --- /dev/null +++ b/java/src/IceInternal/ProxyGetConnection.java @@ -0,0 +1,59 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class ProxyGetConnection extends ProxyOutgoingAsyncBase +{ + public static ProxyGetConnection check(Ice.AsyncResult r, Ice.ObjectPrx prx, String operation) + { + ProxyOutgoingAsyncBase.checkImpl(r, prx, operation); + try + { + return (ProxyGetConnection)r; + } + catch(ClassCastException ex) + { + throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method"); + } + } + + public ProxyGetConnection(Ice.ObjectPrxHelperBase prx, String operation, CallbackBase cb) + { + super(prx, operation, cb); + _observer = ObserverHelper.get(prx, operation); + } + + @Override + public int send(Ice.ConnectionI connection, boolean compress, boolean response) + throws RetryException + { + _cachedConnection = connection; + if(finished(true)) + { + invokeCompletedAsync(); + } + return AsyncStatus.Sent; + } + + @Override + public int invokeCollocated(CollocatedRequestHandler handler) + { + if(finished(true)) + { + invokeCompletedAsync(); + } + return AsyncStatus.Sent; + } + + public void invoke() + { + invokeImpl(true); // userThread = true + } +} diff --git a/java/src/IceInternal/ProxyOutgoingAsyncBase.java b/java/src/IceInternal/ProxyOutgoingAsyncBase.java new file mode 100644 index 00000000000..eca0de9fcf6 --- /dev/null +++ b/java/src/IceInternal/ProxyOutgoingAsyncBase.java @@ -0,0 +1,278 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +// +// Base class for proxy based invocations. This class handles the +// retry for proxy invocations. It also ensures the child observer is +// correct notified of failures and make sure the retry task is +// correctly canceled when the invocation completes. +// +public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase +{ + public static ProxyOutgoingAsyncBase check(Ice.AsyncResult r, Ice.ObjectPrx prx, String operation) + { + ProxyOutgoingAsyncBase.checkImpl(r, prx, operation); + try + { + return (ProxyOutgoingAsyncBase)r; + } + catch(ClassCastException ex) + { + throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method"); + } + } + + @Override + public Ice.ObjectPrx getProxy() + { + return _proxy; + } + + @Override + public boolean sent() + { + return sent(!_proxy.ice_isTwoway()); + } + + @Override + public boolean completed(Ice.Exception exc) + { + if(_childObserver != null) + { + _childObserver.failed(exc.ice_name()); + _childObserver.detach(); + _childObserver = null; + } + + // + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. + // + try + { + _instance.retryQueue().add(this, handleException(exc)); + return false; + } + catch(Ice.Exception ex) + { + return finished(ex); // No retries, we're done + } + } + + public void retry() + { + invokeImpl(false); + } + + public void abort(Ice.Exception ex) + { + assert(_childObserver == null); + if(finished(ex)) + { + invokeCompletedAsync(); + } + else if(ex instanceof Ice.CommunicatorDestroyedException) + { + // + // If it's a communicator destroyed exception, don't swallow + // it but instead notify the user thread. Even if no callback + // was provided. + // + throw ex; + } + } + + protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, String op, CallbackBase delegate) + { + super(prx.ice_getCommunicator(), prx.__reference().getInstance(), op, delegate); + _proxy = prx; + _mode = Ice.OperationMode.Normal; + _cnt = 0; + _sent = false; + } + + protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, String op, CallbackBase delegate, BasicStream os) + { + super(prx.ice_getCommunicator(), prx.__reference().getInstance(), op, delegate, os); + _proxy = prx; + _mode = Ice.OperationMode.Normal; + _cnt = 0; + _sent = false; + } + + protected static Ice.AsyncResult checkImpl(Ice.AsyncResult r, Ice.ObjectPrx p, String operation) + { + check(r, operation); + if(r.getProxy() != p) + { + throw new IllegalArgumentException("Proxy for call to end_" + operation + + " does not match proxy that was used to call corresponding " + + "begin_" + operation + " method"); + } + return r; + } + + protected void invokeImpl(boolean userThread) + { + try + { + if(userThread) + { + int invocationTimeout = _proxy.__reference().getInvocationTimeout(); + if(invocationTimeout > 0) + { + _future = _instance.timer().schedule( + new Runnable() + { + @Override + public void run() + { + cancel(new Ice.InvocationTimeoutException()); + } + }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS); + } + } + else // If not called from the user thread, it's called from the retry queue + { + checkCanceled(); // Cancellation exception aren't retriable + if(_observer != null) + { + _observer.retried(); + } + } + + while(true) + { + try + { + _sent = false; + _handler = _proxy.__getRequestHandler(); + int status = _handler.sendAsyncRequest(this); + if((status & AsyncStatus.Sent) > 0) + { + if(userThread) + { + _sentSynchronously = true; + if((status & AsyncStatus.InvokeSentCallback) > 0) + { + invokeSent(); // Call the sent callback from the user thread. + } + } + else + { + if((status & AsyncStatus.InvokeSentCallback) > 0) + { + invokeSentAsync(); // Call the sent callback from a client thread pool thread. + } + } + } + return; // We're done! + } + catch(RetryException ex) + { + handleRetryException(ex); + } + catch(Ice.Exception ex) + { + if(_childObserver != null) + { + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); + _childObserver = null; + } + final int interval = handleException(ex); + if(interval > 0) + { + _instance.retryQueue().add(this, interval); + return; + } + else if(_observer != null) + { + checkCanceled(); + _observer.retried(); + } + } + } + } + catch(Ice.Exception ex) + { + // + // If called from the user thread we re-throw, the exception + // will be catch by the caller and abort() will be called. + // + if(userThread) + { + throw ex; + } + else if(finished(ex)) // No retries, we're done + { + invokeCompletedAsync(); + } + } + } + + @Override + protected boolean sent(boolean done) + { + _sent = true; + if(done) + { + if(_future != null) + { + _future.cancel(false); + _future = null; + } + } + return super.sent(done); + } + + @Override + protected boolean finished(Ice.Exception ex) + { + if(_future != null) + { + _future.cancel(false); + _future = null; + } + return super.finished(ex); + } + + @Override + protected boolean finished(boolean ok) + { + if(_future != null) + { + _future.cancel(false); + _future = null; + } + return super.finished(ok); + } + + protected void handleRetryException(RetryException exc) + { + _proxy.__setRequestHandler(_handler, null); // Clear request handler and always retry. + } + + protected int handleException(Ice.Exception exc) + { + Ice.Holder<Integer> interval = new Ice.Holder<Integer>(); + _cnt = _proxy.__handleException(exc, _handler, _mode, _sent, interval, _cnt); + return interval.value; + } + + final protected Ice.ObjectPrxHelperBase _proxy; + protected RequestHandler _handler; + protected Ice.OperationMode _mode; + + private java.util.concurrent.Future<?> _future; + private int _cnt; + private boolean _sent; +} diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java index 5f40ea37403..fdb7a672e2a 100644 --- a/java/src/IceInternal/QueueRequestHandler.java +++ b/java/src/IceInternal/QueueRequestHandler.java @@ -124,7 +124,7 @@ public class QueueRequestHandler implements RequestHandler @Override public int - sendAsyncRequest(final OutgoingAsyncMessageCallback out) throws RetryException + sendAsyncRequest(final OutgoingAsyncBase out) throws RetryException { try { @@ -148,15 +148,16 @@ public class QueueRequestHandler implements RequestHandler } @Override - public boolean - asyncRequestCanceled(final OutgoingAsyncMessageCallback outAsync, final Ice.LocalException ex) + public void + asyncRequestCanceled(final OutgoingAsyncBase outAsync, final Ice.LocalException ex) { - return performCallable(new Callable<Boolean>() + performCallable(new Callable<Void>() { @Override - public Boolean call() + public Void call() { - return _delegate.asyncRequestCanceled(outAsync, ex); + _delegate.asyncRequestCanceled(outAsync, ex); + return null; } }); } diff --git a/java/src/IceInternal/RequestHandler.java b/java/src/IceInternal/RequestHandler.java index 1e0e4a7909a..f87b215919f 100644 --- a/java/src/IceInternal/RequestHandler.java +++ b/java/src/IceInternal/RequestHandler.java @@ -9,7 +9,7 @@ package IceInternal; -public interface RequestHandler +public interface RequestHandler extends CancellationHandler { RequestHandler connect(); RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler); @@ -19,11 +19,9 @@ public interface RequestHandler void finishBatchRequest(BasicStream out); void abortBatchRequest(); - int sendAsyncRequest(OutgoingAsyncMessageCallback out) + int sendAsyncRequest(OutgoingAsyncBase out) throws RetryException; - boolean asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex); - Reference getReference(); Ice.ConnectionI getConnection(); diff --git a/java/src/IceInternal/RetryQueue.java b/java/src/IceInternal/RetryQueue.java index 2c322d1cb72..82831529545 100644 --- a/java/src/IceInternal/RetryQueue.java +++ b/java/src/IceInternal/RetryQueue.java @@ -16,8 +16,7 @@ public class RetryQueue _instance = instance; } - synchronized public void - add(OutgoingAsyncBase outAsync, int interval) + synchronized public void add(ProxyOutgoingAsyncBase outAsync, int interval) { if(_instance == null) { @@ -26,10 +25,10 @@ public class RetryQueue RetryTask task = new RetryTask(this, outAsync); task.setFuture(_instance.timer().schedule(task, interval, java.util.concurrent.TimeUnit.MILLISECONDS)); _requests.add(task); + outAsync.cancelable(task); } - synchronized public void - destroy() + synchronized public void destroy() { java.util.HashSet<RetryTask> keep = new java.util.HashSet<RetryTask>(); for(RetryTask task : _requests) @@ -65,14 +64,14 @@ public class RetryQueue } } - synchronized void - remove(RetryTask task) + synchronized boolean remove(RetryTask task) { - _requests.remove(task); + boolean removed = _requests.remove(task); if(_instance == null && _requests.isEmpty()) { notify(); } + return removed; } private Instance _instance; diff --git a/java/src/IceInternal/RetryTask.java b/java/src/IceInternal/RetryTask.java index 64a54bd45b5..974dc998a79 100644 --- a/java/src/IceInternal/RetryTask.java +++ b/java/src/IceInternal/RetryTask.java @@ -9,26 +9,18 @@ package IceInternal; -class RetryTask implements Runnable +class RetryTask implements Runnable, CancellationHandler { - RetryTask(RetryQueue queue, OutgoingAsyncBase outAsync) + RetryTask(RetryQueue queue, ProxyOutgoingAsyncBase outAsync) { _queue = queue; _outAsync = outAsync; } @Override - public void - run() + public void run() { - try - { - _outAsync.processRetry(); - } - catch(Ice.LocalException ex) - { - _outAsync.invokeExceptionAsync(ex); - } + _outAsync.retry(); // // NOTE: this must be called last, destroy() blocks until all task @@ -39,12 +31,32 @@ class RetryTask implements Runnable _queue.remove(this); } - public boolean - destroy() + @Override + public void asyncRequestCanceled(OutgoingAsyncBase outAsync, Ice.LocalException ex) + { + if(_queue.remove(this) && _future.cancel(false)) + { + // + // We just retry the outgoing async now rather than marking it + // as finished. The retry will check for the cancellation + // exception and terminate appropriately the request. + // + _outAsync.retry(); + } + } + + public boolean destroy() { if(_future.cancel(false)) { - _outAsync.invokeExceptionAsync(new Ice.CommunicatorDestroyedException()); + try + { + _outAsync.abort(new Ice.CommunicatorDestroyedException()); + } + catch(Ice.CommunicatorDestroyedException ex) + { + // Abort can throw if there's no callback, just ignore in this case + } return true; } return false; @@ -56,6 +68,6 @@ class RetryTask implements Runnable } private final RetryQueue _queue; - private final OutgoingAsyncBase _outAsync; + private final ProxyOutgoingAsyncBase _outAsync; private java.util.concurrent.Future<?> _future; } |