summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ProxyOutgoingAsyncBase.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/ProxyOutgoingAsyncBase.java')
-rw-r--r--java/src/IceInternal/ProxyOutgoingAsyncBase.java278
1 files changed, 278 insertions, 0 deletions
diff --git a/java/src/IceInternal/ProxyOutgoingAsyncBase.java b/java/src/IceInternal/ProxyOutgoingAsyncBase.java
new file mode 100644
index 00000000000..eca0de9fcf6
--- /dev/null
+++ b/java/src/IceInternal/ProxyOutgoingAsyncBase.java
@@ -0,0 +1,278 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+//
+// Base class for proxy based invocations. This class handles the
+// retry for proxy invocations. It also ensures the child observer is
+// correct notified of failures and make sure the retry task is
+// correctly canceled when the invocation completes.
+//
+public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
+{
+ public static ProxyOutgoingAsyncBase check(Ice.AsyncResult r, Ice.ObjectPrx prx, String operation)
+ {
+ ProxyOutgoingAsyncBase.checkImpl(r, prx, operation);
+ try
+ {
+ return (ProxyOutgoingAsyncBase)r;
+ }
+ catch(ClassCastException ex)
+ {
+ throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method");
+ }
+ }
+
+ @Override
+ public Ice.ObjectPrx getProxy()
+ {
+ return _proxy;
+ }
+
+ @Override
+ public boolean sent()
+ {
+ return sent(!_proxy.ice_isTwoway());
+ }
+
+ @Override
+ public boolean completed(Ice.Exception exc)
+ {
+ if(_childObserver != null)
+ {
+ _childObserver.failed(exc.ice_name());
+ _childObserver.detach();
+ _childObserver = null;
+ }
+
+ //
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback.
+ //
+ try
+ {
+ _instance.retryQueue().add(this, handleException(exc));
+ return false;
+ }
+ catch(Ice.Exception ex)
+ {
+ return finished(ex); // No retries, we're done
+ }
+ }
+
+ public void retry()
+ {
+ invokeImpl(false);
+ }
+
+ public void abort(Ice.Exception ex)
+ {
+ assert(_childObserver == null);
+ if(finished(ex))
+ {
+ invokeCompletedAsync();
+ }
+ else if(ex instanceof Ice.CommunicatorDestroyedException)
+ {
+ //
+ // If it's a communicator destroyed exception, don't swallow
+ // it but instead notify the user thread. Even if no callback
+ // was provided.
+ //
+ throw ex;
+ }
+ }
+
+ protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, String op, CallbackBase delegate)
+ {
+ super(prx.ice_getCommunicator(), prx.__reference().getInstance(), op, delegate);
+ _proxy = prx;
+ _mode = Ice.OperationMode.Normal;
+ _cnt = 0;
+ _sent = false;
+ }
+
+ protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, String op, CallbackBase delegate, BasicStream os)
+ {
+ super(prx.ice_getCommunicator(), prx.__reference().getInstance(), op, delegate, os);
+ _proxy = prx;
+ _mode = Ice.OperationMode.Normal;
+ _cnt = 0;
+ _sent = false;
+ }
+
+ protected static Ice.AsyncResult checkImpl(Ice.AsyncResult r, Ice.ObjectPrx p, String operation)
+ {
+ check(r, operation);
+ if(r.getProxy() != p)
+ {
+ throw new IllegalArgumentException("Proxy for call to end_" + operation +
+ " does not match proxy that was used to call corresponding " +
+ "begin_" + operation + " method");
+ }
+ return r;
+ }
+
+ protected void invokeImpl(boolean userThread)
+ {
+ try
+ {
+ if(userThread)
+ {
+ int invocationTimeout = _proxy.__reference().getInvocationTimeout();
+ if(invocationTimeout > 0)
+ {
+ _future = _instance.timer().schedule(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ cancel(new Ice.InvocationTimeoutException());
+ }
+ }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS);
+ }
+ }
+ else // If not called from the user thread, it's called from the retry queue
+ {
+ checkCanceled(); // Cancellation exception aren't retriable
+ if(_observer != null)
+ {
+ _observer.retried();
+ }
+ }
+
+ while(true)
+ {
+ try
+ {
+ _sent = false;
+ _handler = _proxy.__getRequestHandler();
+ int status = _handler.sendAsyncRequest(this);
+ if((status & AsyncStatus.Sent) > 0)
+ {
+ if(userThread)
+ {
+ _sentSynchronously = true;
+ if((status & AsyncStatus.InvokeSentCallback) > 0)
+ {
+ invokeSent(); // Call the sent callback from the user thread.
+ }
+ }
+ else
+ {
+ if((status & AsyncStatus.InvokeSentCallback) > 0)
+ {
+ invokeSentAsync(); // Call the sent callback from a client thread pool thread.
+ }
+ }
+ }
+ return; // We're done!
+ }
+ catch(RetryException ex)
+ {
+ handleRetryException(ex);
+ }
+ catch(Ice.Exception ex)
+ {
+ if(_childObserver != null)
+ {
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
+ _childObserver = null;
+ }
+ final int interval = handleException(ex);
+ if(interval > 0)
+ {
+ _instance.retryQueue().add(this, interval);
+ return;
+ }
+ else if(_observer != null)
+ {
+ checkCanceled();
+ _observer.retried();
+ }
+ }
+ }
+ }
+ catch(Ice.Exception ex)
+ {
+ //
+ // If called from the user thread we re-throw, the exception
+ // will be catch by the caller and abort() will be called.
+ //
+ if(userThread)
+ {
+ throw ex;
+ }
+ else if(finished(ex)) // No retries, we're done
+ {
+ invokeCompletedAsync();
+ }
+ }
+ }
+
+ @Override
+ protected boolean sent(boolean done)
+ {
+ _sent = true;
+ if(done)
+ {
+ if(_future != null)
+ {
+ _future.cancel(false);
+ _future = null;
+ }
+ }
+ return super.sent(done);
+ }
+
+ @Override
+ protected boolean finished(Ice.Exception ex)
+ {
+ if(_future != null)
+ {
+ _future.cancel(false);
+ _future = null;
+ }
+ return super.finished(ex);
+ }
+
+ @Override
+ protected boolean finished(boolean ok)
+ {
+ if(_future != null)
+ {
+ _future.cancel(false);
+ _future = null;
+ }
+ return super.finished(ok);
+ }
+
+ protected void handleRetryException(RetryException exc)
+ {
+ _proxy.__setRequestHandler(_handler, null); // Clear request handler and always retry.
+ }
+
+ protected int handleException(Ice.Exception exc)
+ {
+ Ice.Holder<Integer> interval = new Ice.Holder<Integer>();
+ _cnt = _proxy.__handleException(exc, _handler, _mode, _sent, interval, _cnt);
+ return interval.value;
+ }
+
+ final protected Ice.ObjectPrxHelperBase _proxy;
+ protected RequestHandler _handler;
+ protected Ice.OperationMode _mode;
+
+ private java.util.concurrent.Future<?> _future;
+ private int _cnt;
+ private boolean _sent;
+}