diff options
Diffstat (limited to 'java/src/IceInternal/OutgoingAsync.java')
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 395 |
1 files changed, 157 insertions, 238 deletions
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 184355e8e06..fcf104ed942 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -9,35 +9,22 @@ package IceInternal; -public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback +public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback { - public - OutgoingAsync() - { - } - - public abstract void ice_exception(Ice.LocalException ex); - - public final BasicStream - __os() - { - return __os; - } - public final void __sent(final Ice.ConnectionI connection) { - synchronized(_monitor) + synchronized(__monitor) { _sent = true; if(!_proxy.ice_isTwoway()) { - cleanup(); // No response expected, we're done with the OutgoingAsync. + __release(); } else if(_response) { - _monitor.notifyAll(); // If the response was already received notify finished() which is waiting. + __monitor.notifyAll(); // If the response was already received notify finished() which is waiting. } else if(connection.timeout() >= 0) { @@ -63,7 +50,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback byte replyStatus; try { - synchronized(_monitor) + synchronized(__monitor) { assert(__os != null); _response = true; @@ -77,7 +64,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback { try { - _monitor.wait(); + __monitor.wait(); } catch(java.lang.InterruptedException ex) { @@ -216,14 +203,8 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback } catch(java.lang.Exception ex) { - warning(ex); - } - finally - { - synchronized(_monitor) - { - cleanup(); - } + __warning(ex); + __release(); } } @@ -231,8 +212,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback public final void __finished(Ice.LocalException exc) { - boolean retry = false; - synchronized(_monitor) + synchronized(__monitor) { if(__os != null) // Might be called from __prepare or before __prepare { @@ -245,236 +225,213 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback { try { - _monitor.wait(); + __monitor.wait(); } catch(java.lang.InterruptedException ex) { } } - - // - // A CloseConnectionException indicates graceful - // server shutdown, and is therefore always repeatable - // without violating "at-most-once". That's because by - // sending a close connection message, the server - // guarantees that all outstanding requests can safely - // be repeated. Otherwise, we can also retry if the - // operation mode is Nonmutating or Idempotent. - // - // An ObjectNotExistException can always be retried as - // well without violating "at-most-once". - // - if(!_sent || - _mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent || - exc instanceof Ice.CloseConnectionException || exc instanceof Ice.ObjectNotExistException) - { - retry = true; - } - } - } - - if(retry) - { - try - { - _cnt = _proxy.__handleException(_delegate, exc, _cnt); - __send(); - return; - } - catch(Ice.LocalException ex) - { } } + + // + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. + // try { - ice_exception(exc); - } - catch(java.lang.Exception ex) - { - warning(ex); + handleException(exc); // This will throw if the invocation can't be retried. + __send(); } - finally + catch(Ice.LocalException ex) { - synchronized(_monitor) - { - cleanup(); - } + __exception(ex); } } public final void __finished(LocalExceptionWrapper ex) { + assert(__os != null && !_sent); + // - // NOTE: This is called if sendRequest/sendAsyncRequest fails with - // a LocalExceptionWrapper exception. It's not possible for the - // timer to be set at this point because the request couldn't be - // sent. + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. The LocalExceptionWrapper exception is only called + // before the invocation is sent. // - assert(!_sent && _timerTask == null); try { - if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent) - { - _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt); - } - else - { - _proxy.__handleExceptionWrapper(_delegate, ex); - } + handleException(ex); // This will throw if the invocation can't be retried. __send(); } catch(Ice.LocalException exc) { - try - { - ice_exception(exc); - } - catch(java.lang.Exception exl) + __exception(exc); + } + } + + protected final void + __prepare(Ice.ObjectPrx prx, String operation, Ice.OperationMode mode, java.util.Map context) + { + assert(__os != null); + + _proxy = (Ice.ObjectPrxHelperBase)prx; + _delegate = null; + _cnt = 0; + _mode = mode; + + // + // Can't call async via a batch proxy. + // + if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram()) + { + throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI"); + } + + __os.writeBlob(IceInternal.Protocol.requestHdr); + + Reference ref = _proxy.__reference(); + + ref.getIdentity().__write(__os); + + // + // For compatibility with the old FacetPath. + // + String facet = ref.getFacet(); + if(facet == null || facet.length() == 0) + { + __os.writeStringSeq(null); + } + else + { + String[] facetPath = { facet }; + __os.writeStringSeq(facetPath); + } + + __os.writeString(operation); + + __os.writeByte((byte)mode.value()); + + if(context != null) + { + // + // Explicit context + // + Ice.ContextHelper.write(__os, context); + } + else + { + // + // Implicit context + // + Ice.ImplicitContextI implicitContext = ref.getInstance().getImplicitContext(); + java.util.Map prxContext = ref.getContext(); + + if(implicitContext == null) { - warning(exl); + Ice.ContextHelper.write(__os, prxContext); } - finally + else { - synchronized(_monitor) - { - cleanup(); - } + implicitContext.write(prxContext, __os); } } + + __os.startWriteEncaps(); } protected final void - __prepare(Ice.ObjectPrx prx, String operation, Ice.OperationMode mode, java.util.Map context) + __send() { - synchronized(_monitor) + while(true) { try { - // - // We must first wait for other requests to finish. - // - while(__os != null) - { - try - { - _monitor.wait(); - } - catch(InterruptedException ex) - { - } - } - - // - // Can't call async via a batch proxy. - // - _proxy = (Ice.ObjectPrxHelperBase)prx; - if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram()) - { - throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI"); - } - - _delegate = null; - _cnt = 0; - _mode = mode; _sent = false; - _response = false; - - Reference ref = _proxy.__reference(); - assert(__is == null); - __is = new BasicStream(ref.getInstance()); - assert(__os == null); - __os = new BasicStream(ref.getInstance()); - - __os.writeBlob(IceInternal.Protocol.requestHdr); - - ref.getIdentity().__write(__os); - - // - // For compatibility with the old FacetPath. - // - String facet = ref.getFacet(); - if(facet == null || facet.length() == 0) - { - __os.writeStringSeq(null); - } - else - { - String[] facetPath = { facet }; - __os.writeStringSeq(facetPath); - } - - __os.writeString(operation); - - __os.writeByte((byte)mode.value()); - - if(context != null) - { - // - // Explicit context - // - Ice.ContextHelper.write(__os, context); - } - else - { - // - // Implicit context - // - Ice.ImplicitContextI implicitContext = ref.getInstance().getImplicitContext(); - java.util.Map prxContext = ref.getContext(); - - if(implicitContext == null) - { - Ice.ContextHelper.write(__os, prxContext); - } - else - { - implicitContext.write(prxContext, __os); - } - } - - __os.startWriteEncaps(); + _response = false; + _delegate = _proxy.__getDelegate(true); + _delegate.__getRequestHandler().sendAsyncRequest(this); + return; + } + catch(LocalExceptionWrapper ex) + { + handleException(ex); } catch(Ice.LocalException ex) { - cleanup(); - throw ex; + handleException(ex); } } } - protected final void - __send() + protected abstract void __response(boolean ok); + + private void + handleException(LocalExceptionWrapper ex) { - // - // NOTE: no synchronization needed. At this point, no other threads can be calling on this object. - // + if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent) + { + _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt); + } + else + { + _proxy.__handleExceptionWrapper(_delegate, ex); + } + } - RequestHandler handler; + private void + handleException(Ice.LocalException exc) + { try { - _delegate = _proxy.__getDelegate(true); - handler = _delegate.__getRequestHandler(); + // + // A CloseConnectionException indicates graceful + // server shutdown, and is therefore always repeatable + // without violating "at-most-once". That's because by + // sending a close connection message, the server + // guarantees that all outstanding requests can safely + // be repeated. + // + // An ObjectNotExistException can always be retried as + // well without violating "at-most-once". + // + if(!_sent || + exc instanceof Ice.CloseConnectionException || + exc instanceof Ice.ObjectNotExistException) + { + throw exc; + } + + // + // Throw the exception wrapped in a LocalExceptionWrapper, to + // indicate that the request cannot be resent without + // potentially violating the "at-most-once" principle. + // + throw new LocalExceptionWrapper(exc, false); + } + catch(LocalExceptionWrapper ex) + { + if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent) + { + _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt); + } + else + { + _proxy.__handleExceptionWrapper(_delegate, ex); + } } catch(Ice.LocalException ex) { - __finished(ex); - return; + _cnt = _proxy.__handleException(_delegate, ex, _cnt); } - - _sent = false; - _response = false; - handler.sendAsyncRequest(this); } - protected abstract void __response(boolean ok); - private final void __runTimerTask(Ice.ConnectionI connection) { - synchronized(_monitor) + synchronized(__monitor) { assert(_timerTask != null && _sent); // Can only be set once the request is sent. @@ -483,7 +440,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback connection = null; } _timerTask = null; - _monitor.notifyAll(); + __monitor.notifyAll(); } if(connection != null) @@ -492,49 +449,11 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback } } - private final void - warning(java.lang.Exception ex) - { - if(__os != null) // Don't print anything if cleanup() was already called. - { - Reference ref = _proxy.__reference(); - if(ref.getInstance().initializationData().properties.getPropertyAsIntWithDefault( - "Ice.Warn.AMICallback", 1) > 0) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - IceUtil.OutputBase out = new IceUtil.OutputBase(pw); - out.setUseTab(false); - out.print("exception raised by AMI callback:\n"); - ex.printStackTrace(pw); - pw.flush(); - ref.getInstance().initializationData().logger.warning(sw.toString()); - } - } - } - - private final void - cleanup() - { - assert(_timerTask == null); - - __is = null; - __os = null; - - _monitor.notify(); - } - - protected BasicStream __is; - protected BasicStream __os; - private boolean _sent; private boolean _response; private Ice.ObjectPrxHelperBase _proxy; private Ice._ObjectDel _delegate; private int _cnt; private Ice.OperationMode _mode; - private TimerTask _timerTask; - - private final java.lang.Object _monitor = new java.lang.Object(); } |