summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/OutgoingAsync.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/OutgoingAsync.java')
-rw-r--r--java/src/IceInternal/OutgoingAsync.java105
1 files changed, 69 insertions, 36 deletions
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 0618ffb93e9..84bfc477768 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -236,12 +236,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
//
try
{
- if(!handleException(exc))
- {
- return; // Can't be retried immediately.
- }
-
- invoke(false); // Retry the invocation
+ handleException(exc);
}
catch(Ice.Exception ex)
{
@@ -262,8 +257,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
});
}
- public final void finished(BasicStream is)
+ public final boolean finished(BasicStream is)
{
+ //
+ // NOTE: this method is called from ConnectionI.parseMessage
+ // with the connection locked. Therefore, it must not invoke
+ // any user callbacks.
+ //
+
assert (_proxy.ice_isTwoway()); // Can only be called for twoways.
byte replyStatus;
@@ -278,14 +279,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
_childObserver.detach();
_childObserver = null;
}
-
+
if(_timeoutRequestHandler != null)
{
_future.cancel(false);
_future = null;
_timeoutRequestHandler = null;
}
-
+
// _is can already be initialized if the invocation is retried
if(_is == null)
{
@@ -293,7 +294,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
_is.swap(is);
replyStatus = _is.readByte();
-
+
switch(replyStatus)
{
case ReplyStatus.replyOK:
@@ -415,25 +416,59 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
}
- _state |= StateDone;
- // Clear buffer now, instead of waiting for AsyncResult
- // deallocation
- // _os.resize(0, false);
if(replyStatus == ReplyStatus.replyOK)
{
_state |= StateOK;
}
+ _state |= StateDone;
_monitor.notifyAll();
+
+ if(_callback == null)
+ {
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ return false;
+ }
+ return true;
}
}
- catch(Ice.LocalException ex)
+ catch(Ice.Exception exc)
{
- finished(ex);
- return;
- }
+ //
+ // We don't call finished(exc) here because we don't want
+ // to invoke the completion callback. The completion
+ // callback is invoked by the connection is this method
+ // returns true.
+ //
+ try
+ {
+ handleException(exc);
+ return false;
+ }
+ catch(Ice.LocalException ex)
+ {
+ synchronized(_monitor)
+ {
+ _state |= StateDone;
+ _exception = ex;
+ _monitor.notifyAll();
- assert (replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException);
- invokeCompleted();
+ if(_callback == null)
+ {
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ return false;
+ }
+ return true;
+ }
+ }
+ }
}
public final boolean invoke(boolean synchronous)
@@ -500,21 +535,19 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
}
}
- break;
}
catch(RetryException ex)
{
// Clear request handler and retry.
_proxy.__setRequestHandler(_handler, null);
+ continue;
}
catch(Ice.Exception ex)
{
// This will throw if the invocation can't be retried.
- if(!handleException(ex))
- {
- break; // Can't be retried immediately.
- }
+ handleException(ex);
}
+ break;
}
return _sentSynchronously;
}
@@ -589,7 +622,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
super.invokeExceptionAsync(ex);
}
- private boolean handleException(Ice.Exception exc)
+ private void handleException(Ice.Exception exc)
{
try
{
@@ -599,16 +632,16 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
{
_observer.retried(); // Invocation is being retried.
}
- if(interval.value > 0)
- {
- _instance.retryQueue().add(this, interval.value);
- return false; // Don't retry immediately, the retry queue will
- // take care of the retry.
- }
- else
- {
- return true; // Retry immediately.
- }
+
+ //
+ // Schedule the retry. Note that we always schedule the retry
+ // on the retry queue even if the invocation can be retried
+ // immediately. This is required because it might not be safe
+ // to retry from this thread (this is for instance called by
+ // finished(BasicStream) which is called with the connection
+ // locked.
+ //
+ _instance.retryQueue().add(this, interval.value);
}
catch(Ice.Exception ex)
{