diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-11-23 13:28:08 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-11-23 13:28:08 +0100 |
commit | 2c578015edcb36cdc0acd0227295de1dcca1b995 (patch) | |
tree | e163980b5dabb43a40089a29fdf8ff47a3e07f1c /java/src/IceInternal/OutgoingAsync.java | |
parent | no longer generating inspect method for each Ruby exception (diff) | |
download | ice-2c578015edcb36cdc0acd0227295de1dcca1b995.tar.bz2 ice-2c578015edcb36cdc0acd0227295de1dcca1b995.tar.xz ice-2c578015edcb36cdc0acd0227295de1dcca1b995.zip |
New AMI mapping
Diffstat (limited to 'java/src/IceInternal/OutgoingAsync.java')
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 510 |
1 files changed, 246 insertions, 264 deletions
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 490b15a73ee..75f52715837 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -9,70 +9,212 @@ package IceInternal; -public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback +public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessageCallback { - public final void - __sent(final Ice.ConnectionI connection) + public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase callback) { - synchronized(__monitor) + super(((Ice.ObjectPrxHelperBase)prx).__reference().getInstance(), operation, callback); + _proxy = (Ice.ObjectPrxHelperBase)prx; + } + + public void __prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx, + boolean explicitCtx) + { + _delegate = null; + _cnt = 0; + _mode = mode; + _sentSynchronously = false; + + if(explicitCtx && ctx == null) + { + ctx = _emptyContext; + } + + // + // Can't call async via a batch proxy. + // + if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram()) + { + throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI"); + } + + _os.writeBlob(IceInternal.Protocol.requestHdr); + + Reference ref = _proxy.__reference(); + + ref.getIdentity().__write(_os); + + // + // For compatibility with the old FacetPath. + // + String facet = ref.getFacet(); + if(facet == null || facet.length() == 0) + { + _os.writeStringSeq(null); + } + else { - _sent = true; + String[] facetPath = { facet }; + _os.writeStringSeq(facetPath); + } + + _os.writeString(operation); + + _os.writeByte((byte)mode.ordinal()); - if(!_proxy.ice_isTwoway()) + if(ctx != null) + { + // + // Explicit context + // + Ice.ContextHelper.write(_os, ctx); + } + else + { + // + // Implicit context + // + Ice.ImplicitContextI implicitContext = ref.getInstance().getImplicitContext(); + java.util.Map<String, String> prxContext = ref.getContext(); + + if(implicitContext == null) { - __releaseCallback(); + Ice.ContextHelper.write(_os, prxContext); } - else if(_response) + else { - __monitor.notifyAll(); // If the response was already received notify finished() which is waiting. + implicitContext.write(prxContext, _os); } - else if(connection.timeout() >= 0) + } + + _os.startWriteEncaps(); + } + + public Ice.ObjectPrx getProxy() + { + return _proxy; + } + + public boolean __sent(final Ice.ConnectionI connection) + { + synchronized(_monitor) + { + boolean alreadySent = (_state & Sent) != 0; + _state |= Sent; + + if((_state & Done) == 0) { - assert(_timerTask == null); - _timerTask = new TimerTask() + if(!_proxy.ice_isTwoway()) { - public void - runTimerTask() - { - __runTimerTask(connection); - } - }; - _proxy.__reference().getInstance().timer().schedule(_timerTask, connection.timeout()); + _state |= Done | OK; + } + else if(connection.timeout() > 0) + { + assert(_timerTaskConnection == null && _timerTask == null); + _timerTaskConnection = connection; + _timerTask = new TimerTask() + { + public void + runTimerTask() + { + __runTimerTask(); + } + }; + _instance.timer().schedule(_timerTask, connection.timeout()); + } } + _monitor.notifyAll(); + return !alreadySent; // Don't call the sent call is already sent. } } - public final void - __finished(BasicStream is) + public void __sent() + { + __sentInternal(); + } + + public void __finished(Ice.LocalException exc, boolean sent) + { + synchronized(_monitor) + { + assert((_state & Done) == 0); + if(_timerTaskConnection != null) + { + _instance.timer().cancel(_timerTask); + _timerTaskConnection = null; + _timerTask = null; + } + } + + // + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. + // + + try + { + int interval = handleException(exc, sent); // This will throw if the invocation can't be retried. + if(interval > 0) + { + _instance.retryQueue().add(this, interval); + } + else + { + __send(false); + } + } + catch(Ice.LocalException ex) + { + __exception(ex); + } + } + + public final void __finished(LocalExceptionWrapper exc) + { + // + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. The LocalExceptionWrapper exception is only called + // before the invocation is sent. + // + + try + { + int interval = handleException(exc); // This will throw if the invocation can't be retried. + if(interval > 0) + { + _instance.retryQueue().add(this, interval); + } + else + { + __send(false); + } + } + catch(Ice.LocalException ex) + { + __exception(ex); + } + } + + public final void __finished(BasicStream is) { assert(_proxy.ice_isTwoway()); // Can only be called for twoways. byte replyStatus; try { - synchronized(__monitor) + synchronized(_monitor) { - assert(__os != null); - _response = true; + assert(_exception == null && (_state & Done) == 0); - if(_timerTask != null && _proxy.__reference().getInstance().timer().cancel(_timerTask)) + if(_timerTaskConnection != null) { - _timerTask = null; // Timer cancelled. + _instance.timer().cancel(_timerTask); + _timerTaskConnection = null; + _timerTask = null; } - while(!_sent || _timerTask != null) - { - try - { - __monitor.wait(); - } - catch(java.lang.InterruptedException ex) - { - } - } - - __is.swap(is); - replyStatus = __is.readByte(); + _is.swap(is); + replyStatus = _is.readByte(); switch(replyStatus) { @@ -87,12 +229,12 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback case ReplyStatus.replyOperationNotExist: { Ice.Identity id = new Ice.Identity(); - id.__read(__is); + id.__read(_is); // // For compatibility with the old FacetPath. // - String[] facetPath = __is.readStringSeq(); + String[] facetPath = _is.readStringSeq(); String facet; if(facetPath.length > 0) { @@ -107,7 +249,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback facet = ""; } - String operation = __is.readString(); + String operation = _is.readString(); Ice.RequestFailedException ex = null; switch(replyStatus) @@ -147,7 +289,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback case ReplyStatus.replyUnknownLocalException: case ReplyStatus.replyUnknownUserException: { - String unknown = __is.readString(); + String unknown = _is.readString(); Ice.UnknownException ex = null; switch(replyStatus) @@ -186,234 +328,61 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback throw new Ice.UnknownReplyStatusException(); } } + + _state |= Done; + if(replyStatus == ReplyStatus.replyOK) + { + _state |= OK; + } + _monitor.notifyAll(); } } catch(Ice.LocalException ex) { - __finished(ex); + __finished(ex, true); return; } assert(replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException); - - try - { - __response(replyStatus == ReplyStatus.replyOK); - } - catch(java.lang.Exception ex) - { - __warning(ex); - __releaseCallback(); - } + __response(); } - - public final void - __finished(Ice.LocalException exc) + public final boolean __send(boolean synchronous) { - synchronized(__monitor) + while(true) { - if(__os != null) // Might be called from __prepare or before __prepare + int interval = 0; + try { - if(_timerTask != null && _proxy.__reference().getInstance().timer().cancel(_timerTask)) - { - _timerTask = null; // Timer cancelled. - } - - while(_timerTask != null) + _delegate = _proxy.__getDelegate(true); + boolean sent = _delegate.__getRequestHandler().sendAsyncRequest(this); + if(synchronous) // Only set sentSynchronously_ If called synchronously by the user thread. { - try - { - __monitor.wait(); - } - catch(java.lang.InterruptedException ex) - { - } + _sentSynchronously = sent; } + break; } - } - - // - // NOTE: at this point, synchronization isn't needed, no other threads should be - // calling on the callback. - // - - try - { - handleException(exc); // This will throw if the invocation can't be retried. - } - catch(Ice.LocalException ex) - { - __exception(ex); - } - } - - public final void - __finished(LocalExceptionWrapper ex) - { - assert(__os != null && !_sent); - - // - // NOTE: at this point, synchronization isn't needed, no other threads should be - // calling on the callback. The LocalExceptionWrapper exception is only called - // before the invocation is sent. - // - - try - { - handleException(ex); // This will throw if the invocation can't be retried. - } - catch(Ice.LocalException exc) - { - __exception(exc); - } - } - - public final void - __retry(int cnt, int interval) - { - // - // This method is called by the proxy to retry an invocation. It's safe to update - // the count here without synchronization, no other threads can access this object. - // - _cnt = cnt; - if(interval > 0) - { - assert(__os != null); - __os.instance().retryQueue().add(this, interval); - } - else - { - __send(); - } - } - - public final boolean - __send() - { - try - { - _sent = false; - _response = false; - _delegate = _proxy.__getDelegate(true); - _sentSynchronously = _delegate.__getRequestHandler().sendAsyncRequest(this); - } - catch(LocalExceptionWrapper ex) - { - handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously - } - catch(Ice.LocalException ex) - { - handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously - } - return _sentSynchronously; - } - - protected final void - __prepare(Ice.ObjectPrx prx, String operation, Ice.OperationMode mode, java.util.Map<String, String> context) - { - assert(__os != null); - - _proxy = (Ice.ObjectPrxHelperBase)prx; - _delegate = null; - _cnt = 0; - _mode = mode; - _sentSynchronously = false; - - // - // Can't call async via a batch proxy. - // - if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram()) - { - throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI"); - } - - __os.writeBlob(IceInternal.Protocol.requestHdr); - - Reference ref = _proxy.__reference(); - - ref.getIdentity().__write(__os); - - // - // For compatibility with the old FacetPath. - // - String facet = ref.getFacet(); - if(facet == null || facet.length() == 0) - { - __os.writeStringSeq(null); - } - else - { - String[] facetPath = { facet }; - __os.writeStringSeq(facetPath); - } - - __os.writeString(operation); - - __os.writeByte((byte)mode.ordinal()); - - if(context != null) - { - // - // Explicit context - // - Ice.ContextHelper.write(__os, context); - } - else - { - // - // Implicit context - // - Ice.ImplicitContextI implicitContext = ref.getInstance().getImplicitContext(); - java.util.Map<String, String> prxContext = ref.getContext(); - - if(implicitContext == null) + catch(LocalExceptionWrapper ex) { - Ice.ContextHelper.write(__os, prxContext); + interval = handleException(ex); } - else + catch(Ice.LocalException ex) { - implicitContext.write(prxContext, __os); + interval = handleException(ex, false); } - } - - __os.startWriteEncaps(); - } - protected abstract void __response(boolean ok); - - protected void - __throwUserException() - throws Ice.UserException - { - try - { - __is.startReadEncaps(); - __is.throwException(); - } - catch(Ice.UserException ex) - { - __is.endReadEncaps(); - throw ex; - } - } - - private void - handleException(LocalExceptionWrapper ex) - { - if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent) - { - _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, this, _cnt); - } - else - { - _proxy.__handleExceptionWrapper(_delegate, ex, this); + if(interval > 0) + { + _instance.retryQueue().add(this, interval); + return false; + } } + return _sentSynchronously; } - private void - handleException(Ice.LocalException exc) + private int handleException(Ice.LocalException exc, boolean sent) { + Ice.IntHolder interval = new Ice.IntHolder(0); try { // @@ -426,7 +395,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback // "at-most-once" (see the implementation of the checkRetryAfterException method of // the ProxyFactory class for the reasons why it can be useful). // - if(!_sent || + if(!sent || exc instanceof Ice.CloseConnectionException || exc instanceof Ice.ObjectNotExistException) { @@ -444,32 +413,43 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback { if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent) { - _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, this, _cnt); + _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, interval, _cnt); } else { - _proxy.__handleExceptionWrapper(_delegate, ex, this); + _proxy.__handleExceptionWrapper(_delegate, ex); } } catch(Ice.LocalException ex) { - _proxy.__handleException(_delegate, ex, this, _cnt); + _cnt = _proxy.__handleException(_delegate, ex, interval, _cnt); } + return interval.value; } - private final void - __runTimerTask(Ice.ConnectionI connection) + private int handleException(LocalExceptionWrapper ex) { - synchronized(__monitor) + Ice.IntHolder interval = new Ice.IntHolder(0); + if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent) + { + _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, interval, _cnt); + } + else { - assert(_timerTask != null && _sent); // Can only be set once the request is sent. + _proxy.__handleExceptionWrapper(_delegate, ex); + } + return interval.value; + } - if(_response) // If the response was just received, don't close the connection. - { - connection = null; - } + private final void + __runTimerTask() + { + Ice.ConnectionI connection; + synchronized(_monitor) + { + connection = _timerTaskConnection; + _timerTaskConnection = null; _timerTask = null; - __monitor.notifyAll(); } if(connection != null) @@ -478,12 +458,14 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback } } - private boolean _sent; - private boolean _sentSynchronously; - private boolean _response; - private Ice.ObjectPrxHelperBase _proxy; + protected Ice.ObjectPrxHelperBase _proxy; + + private Ice.ConnectionI _timerTaskConnection; + private TimerTask _timerTask; + private Ice._ObjectDel _delegate; private int _cnt; private Ice.OperationMode _mode; - private TimerTask _timerTask; + + private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>(); } |