diff options
Diffstat (limited to 'java/src/IceInternal/OutgoingAsync.java')
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 105 |
1 files changed, 69 insertions, 36 deletions
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 0618ffb93e9..84bfc477768 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -236,12 +236,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC // try { - if(!handleException(exc)) - { - return; // Can't be retried immediately. - } - - invoke(false); // Retry the invocation + handleException(exc); } catch(Ice.Exception ex) { @@ -262,8 +257,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC }); } - public final void finished(BasicStream is) + public final boolean finished(BasicStream is) { + // + // NOTE: this method is called from ConnectionI.parseMessage + // with the connection locked. Therefore, it must not invoke + // any user callbacks. + // + assert (_proxy.ice_isTwoway()); // Can only be called for twoways. byte replyStatus; @@ -278,14 +279,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _childObserver.detach(); _childObserver = null; } - + if(_timeoutRequestHandler != null) { _future.cancel(false); _future = null; _timeoutRequestHandler = null; } - + // _is can already be initialized if the invocation is retried if(_is == null) { @@ -293,7 +294,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } _is.swap(is); replyStatus = _is.readByte(); - + switch(replyStatus) { case ReplyStatus.replyOK: @@ -415,25 +416,59 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } } - _state |= StateDone; - // Clear buffer now, instead of waiting for AsyncResult - // deallocation - // _os.resize(0, false); if(replyStatus == ReplyStatus.replyOK) { _state |= StateOK; } + _state |= StateDone; _monitor.notifyAll(); + + if(_callback == null) + { + if(_observer != null) + { + _observer.detach(); + _observer = null; + } + return false; + } + return true; } } - catch(Ice.LocalException ex) + catch(Ice.Exception exc) { - finished(ex); - return; - } + // + // We don't call finished(exc) here because we don't want + // to invoke the completion callback. The completion + // callback is invoked by the connection is this method + // returns true. + // + try + { + handleException(exc); + return false; + } + catch(Ice.LocalException ex) + { + synchronized(_monitor) + { + _state |= StateDone; + _exception = ex; + _monitor.notifyAll(); - assert (replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException); - invokeCompleted(); + if(_callback == null) + { + if(_observer != null) + { + _observer.detach(); + _observer = null; + } + return false; + } + return true; + } + } + } } public final boolean invoke(boolean synchronous) @@ -500,21 +535,19 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } } } - break; } catch(RetryException ex) { // Clear request handler and retry. _proxy.__setRequestHandler(_handler, null); + continue; } catch(Ice.Exception ex) { // This will throw if the invocation can't be retried. - if(!handleException(ex)) - { - break; // Can't be retried immediately. - } + handleException(ex); } + break; } return _sentSynchronously; } @@ -589,7 +622,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC super.invokeExceptionAsync(ex); } - private boolean handleException(Ice.Exception exc) + private void handleException(Ice.Exception exc) { try { @@ -599,16 +632,16 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC { _observer.retried(); // Invocation is being retried. } - if(interval.value > 0) - { - _instance.retryQueue().add(this, interval.value); - return false; // Don't retry immediately, the retry queue will - // take care of the retry. - } - else - { - return true; // Retry immediately. - } + + // + // Schedule the retry. Note that we always schedule the retry + // on the retry queue even if the invocation can be retried + // immediately. This is required because it might not be safe + // to retry from this thread (this is for instance called by + // finished(BasicStream) which is called with the connection + // locked. + // + _instance.retryQueue().add(this, interval.value); } catch(Ice.Exception ex) { |