diff options
Diffstat (limited to 'java/src/IceInternal/OutgoingAsync.java')
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 163 |
1 files changed, 49 insertions, 114 deletions
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 699477d4607..41c56076a94 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -21,7 +21,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa public void __prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx, boolean explicitCtx) { - _delegate = null; + _handler = null; _cnt = 0; _mode = mode; _sentSynchronously = false; @@ -101,11 +101,17 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa public int __send(Ice.ConnectionI connection, boolean compress, boolean response) - throws LocalExceptionWrapper + throws RetryException { return connection.sendAsyncRequest(this, compress, response); } + public int + __invokeCollocated(CollocatedRequestHandler handler) + { + return handler.invokeAsyncRequest(this); + } + public boolean __sent() { @@ -129,7 +135,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa _timeoutRequestHandler = null; } _state |= Done | OK; - _os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation + //_os.resize(0, false); // Don't clear the buffer now, it's needed for the collocation optimization } _monitor.notifyAll(); return !alreadySent; // Don't call the sent call is already sent. @@ -143,7 +149,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } public void - __finished(Ice.LocalException exc, boolean sent) + __finished(Ice.Exception exc, boolean sent) { synchronized(_monitor) { @@ -165,56 +171,16 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa // NOTE: at this point, synchronization isn't needed, no other threads should be // calling on the callback. // - - try - { - int interval = handleException(exc, sent); // This will throw if the invocation can't be retried. - if(interval > 0) - { - _instance.retryQueue().add(this, interval); - } - else - { - __invoke(false); - } - } - catch(Ice.LocalException ex) - { - __invokeException(ex); - } - } - - public final void - __finished(LocalExceptionWrapper exc) - { - // - // NOTE: at this point, synchronization isn't needed, no other threads should be - // calling on the callback. The LocalExceptionWrapper exception is only called - // before the invocation is sent. - // - - if(_remoteObserver != null) - { - _remoteObserver.failed(exc.get().ice_name()); - _remoteObserver.detach(); - _remoteObserver = null; - } - - assert(_timeoutRequestHandler == null); - try { - int interval = handleException(exc); // This will throw if the invocation can't be retried. - if(interval > 0) - { - _instance.retryQueue().add(this, interval); - } - else + if(!handleException(exc, sent)) { - __invoke(false); + return; // Can't be retried immediately. } + + __invoke(false); // Retry the invocation } - catch(Ice.LocalException ex) + catch(Ice.Exception ex) { __invokeException(ex); } @@ -396,12 +362,10 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa { while(true) { - int interval = 0; try { - _delegate = _proxy.__getDelegate(true); - RequestHandler handler = _delegate.__getRequestHandler(); - int status = handler.sendAsyncRequest(this); + _handler = _proxy.__getRequestHandler(true); + int status = _handler.sendAsyncRequest(this); if((status & AsyncStatus.Sent) > 0) { if(synchronous) @@ -427,30 +391,27 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa { if((_state & Done) == 0) { - int invocationTimeout = handler.getReference().getInvocationTimeout(); + int invocationTimeout = _handler.getReference().getInvocationTimeout(); if(invocationTimeout > 0) { _instance.timer().schedule(this, invocationTimeout); - _timeoutRequestHandler = handler; + _timeoutRequestHandler = _handler; } } } } break; } - catch(LocalExceptionWrapper ex) - { - interval = handleException(ex); - } - catch(Ice.LocalException ex) + catch(RetryException ex) { - interval = handleException(ex, false); + _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry. } - - if(interval > 0) + catch(Ice.Exception ex) { - _instance.retryQueue().add(this, interval); - return false; + if(!handleException(ex, false)) // This will throw if the invocation can't be retried. + { + break; // Can't be retried immediately. + } } } return _sentSynchronously; @@ -488,78 +449,52 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } } + BasicStream + __getIs() + { + return _is; + } + public void runTimerTask() { __runTimerTask(); } - private int - handleException(Ice.LocalException exc, boolean sent) + private boolean + handleException(Ice.Exception exc, boolean sent) { - Ice.IntHolder interval = new Ice.IntHolder(0); try { - // - // A CloseConnectionException indicates graceful server shutdown, and is therefore - // always repeatable without violating "at-most-once". That's because by sending a - // close connection message, the server guarantees that all outstanding requests - // can safely be repeated. - // - // An ObjectNotExistException can always be retried as well without violating - // "at-most-once" (see the implementation of the checkRetryAfterException method of - // the ProxyFactory class for the reasons why it can be useful). - // - if(!sent || - exc instanceof Ice.CloseConnectionException || - exc instanceof Ice.ObjectNotExistException) + Ice.IntHolder interval = new Ice.IntHolder(); + _cnt = _proxy.__handleException(exc, _handler, _mode, sent, interval, _cnt); + if(_observer != null) { - throw exc; + _observer.retried(); // Invocation is being retried. } - - // - // Throw the exception wrapped in a LocalExceptionWrapper, to indicate that the - // request cannot be resent without potentially violating the "at-most-once" - // principle. - // - throw new LocalExceptionWrapper(exc, false); - } - catch(LocalExceptionWrapper ex) - { - if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent) + if(interval.value > 0) { - _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, interval, _cnt, _observer); + _instance.retryQueue().add(this, interval.value); + return false; // Don't retry immediately, the retry queue will take care of the retry. } else { - _proxy.__handleExceptionWrapper(_delegate, ex, _observer); + return true; // Retry immediately. } } - catch(Ice.LocalException ex) + catch(Ice.Exception ex) { - _cnt = _proxy.__handleException(_delegate, ex, interval, _cnt, _observer); - } - return interval.value; - } - - private int - handleException(LocalExceptionWrapper ex) - { - Ice.IntHolder interval = new Ice.IntHolder(0); - if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent) - { - _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, interval, _cnt, _observer); - } - else - { - _proxy.__handleExceptionWrapper(_delegate, ex, _observer); + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + throw ex; } - return interval.value; } protected Ice.ObjectPrxHelperBase _proxy; - private Ice._ObjectDel _delegate; + private RequestHandler _handler; private Ice.EncodingVersion _encoding; private int _cnt; private Ice.OperationMode _mode; |