summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/OutgoingAsync.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
commita4f93259dc3494d98addf38e69b87eb557d432b3 (patch)
treed2b78bb5cea24e33dc1b46be22dba6167e96c9ed /java/src/IceInternal/OutgoingAsync.java
parentFix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff)
downloadice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'java/src/IceInternal/OutgoingAsync.java')
-rw-r--r--java/src/IceInternal/OutgoingAsync.java163
1 files changed, 49 insertions, 114 deletions
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 699477d4607..41c56076a94 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -21,7 +21,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
public void __prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx,
boolean explicitCtx)
{
- _delegate = null;
+ _handler = null;
_cnt = 0;
_mode = mode;
_sentSynchronously = false;
@@ -101,11 +101,17 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
public int
__send(Ice.ConnectionI connection, boolean compress, boolean response)
- throws LocalExceptionWrapper
+ throws RetryException
{
return connection.sendAsyncRequest(this, compress, response);
}
+ public int
+ __invokeCollocated(CollocatedRequestHandler handler)
+ {
+ return handler.invokeAsyncRequest(this);
+ }
+
public boolean
__sent()
{
@@ -129,7 +135,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
_timeoutRequestHandler = null;
}
_state |= Done | OK;
- _os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation
+ //_os.resize(0, false); // Don't clear the buffer now, it's needed for the collocation optimization
}
_monitor.notifyAll();
return !alreadySent; // Don't call the sent call is already sent.
@@ -143,7 +149,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
public void
- __finished(Ice.LocalException exc, boolean sent)
+ __finished(Ice.Exception exc, boolean sent)
{
synchronized(_monitor)
{
@@ -165,56 +171,16 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
// NOTE: at this point, synchronization isn't needed, no other threads should be
// calling on the callback.
//
-
- try
- {
- int interval = handleException(exc, sent); // This will throw if the invocation can't be retried.
- if(interval > 0)
- {
- _instance.retryQueue().add(this, interval);
- }
- else
- {
- __invoke(false);
- }
- }
- catch(Ice.LocalException ex)
- {
- __invokeException(ex);
- }
- }
-
- public final void
- __finished(LocalExceptionWrapper exc)
- {
- //
- // NOTE: at this point, synchronization isn't needed, no other threads should be
- // calling on the callback. The LocalExceptionWrapper exception is only called
- // before the invocation is sent.
- //
-
- if(_remoteObserver != null)
- {
- _remoteObserver.failed(exc.get().ice_name());
- _remoteObserver.detach();
- _remoteObserver = null;
- }
-
- assert(_timeoutRequestHandler == null);
-
try
{
- int interval = handleException(exc); // This will throw if the invocation can't be retried.
- if(interval > 0)
- {
- _instance.retryQueue().add(this, interval);
- }
- else
+ if(!handleException(exc, sent))
{
- __invoke(false);
+ return; // Can't be retried immediately.
}
+
+ __invoke(false); // Retry the invocation
}
- catch(Ice.LocalException ex)
+ catch(Ice.Exception ex)
{
__invokeException(ex);
}
@@ -396,12 +362,10 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
{
while(true)
{
- int interval = 0;
try
{
- _delegate = _proxy.__getDelegate(true);
- RequestHandler handler = _delegate.__getRequestHandler();
- int status = handler.sendAsyncRequest(this);
+ _handler = _proxy.__getRequestHandler(true);
+ int status = _handler.sendAsyncRequest(this);
if((status & AsyncStatus.Sent) > 0)
{
if(synchronous)
@@ -427,30 +391,27 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
{
if((_state & Done) == 0)
{
- int invocationTimeout = handler.getReference().getInvocationTimeout();
+ int invocationTimeout = _handler.getReference().getInvocationTimeout();
if(invocationTimeout > 0)
{
_instance.timer().schedule(this, invocationTimeout);
- _timeoutRequestHandler = handler;
+ _timeoutRequestHandler = _handler;
}
}
}
}
break;
}
- catch(LocalExceptionWrapper ex)
- {
- interval = handleException(ex);
- }
- catch(Ice.LocalException ex)
+ catch(RetryException ex)
{
- interval = handleException(ex, false);
+ _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry.
}
-
- if(interval > 0)
+ catch(Ice.Exception ex)
{
- _instance.retryQueue().add(this, interval);
- return false;
+ if(!handleException(ex, false)) // This will throw if the invocation can't be retried.
+ {
+ break; // Can't be retried immediately.
+ }
}
}
return _sentSynchronously;
@@ -488,78 +449,52 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
}
+ BasicStream
+ __getIs()
+ {
+ return _is;
+ }
+
public void
runTimerTask()
{
__runTimerTask();
}
- private int
- handleException(Ice.LocalException exc, boolean sent)
+ private boolean
+ handleException(Ice.Exception exc, boolean sent)
{
- Ice.IntHolder interval = new Ice.IntHolder(0);
try
{
- //
- // A CloseConnectionException indicates graceful server shutdown, and is therefore
- // always repeatable without violating "at-most-once". That's because by sending a
- // close connection message, the server guarantees that all outstanding requests
- // can safely be repeated.
- //
- // An ObjectNotExistException can always be retried as well without violating
- // "at-most-once" (see the implementation of the checkRetryAfterException method of
- // the ProxyFactory class for the reasons why it can be useful).
- //
- if(!sent ||
- exc instanceof Ice.CloseConnectionException ||
- exc instanceof Ice.ObjectNotExistException)
+ Ice.IntHolder interval = new Ice.IntHolder();
+ _cnt = _proxy.__handleException(exc, _handler, _mode, sent, interval, _cnt);
+ if(_observer != null)
{
- throw exc;
+ _observer.retried(); // Invocation is being retried.
}
-
- //
- // Throw the exception wrapped in a LocalExceptionWrapper, to indicate that the
- // request cannot be resent without potentially violating the "at-most-once"
- // principle.
- //
- throw new LocalExceptionWrapper(exc, false);
- }
- catch(LocalExceptionWrapper ex)
- {
- if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent)
+ if(interval.value > 0)
{
- _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, interval, _cnt, _observer);
+ _instance.retryQueue().add(this, interval.value);
+ return false; // Don't retry immediately, the retry queue will take care of the retry.
}
else
{
- _proxy.__handleExceptionWrapper(_delegate, ex, _observer);
+ return true; // Retry immediately.
}
}
- catch(Ice.LocalException ex)
+ catch(Ice.Exception ex)
{
- _cnt = _proxy.__handleException(_delegate, ex, interval, _cnt, _observer);
- }
- return interval.value;
- }
-
- private int
- handleException(LocalExceptionWrapper ex)
- {
- Ice.IntHolder interval = new Ice.IntHolder(0);
- if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent)
- {
- _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, interval, _cnt, _observer);
- }
- else
- {
- _proxy.__handleExceptionWrapper(_delegate, ex, _observer);
+ if(_observer != null)
+ {
+ _observer.failed(ex.ice_name());
+ }
+ throw ex;
}
- return interval.value;
}
protected Ice.ObjectPrxHelperBase _proxy;
- private Ice._ObjectDel _delegate;
+ private RequestHandler _handler;
private Ice.EncodingVersion _encoding;
private int _cnt;
private Ice.OperationMode _mode;