summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/OutgoingAsync.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/OutgoingAsync.java')
-rw-r--r--java/src/IceInternal/OutgoingAsync.java395
1 files changed, 157 insertions, 238 deletions
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 184355e8e06..fcf104ed942 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -9,35 +9,22 @@
package IceInternal;
-public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
+public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
{
- public
- OutgoingAsync()
- {
- }
-
- public abstract void ice_exception(Ice.LocalException ex);
-
- public final BasicStream
- __os()
- {
- return __os;
- }
-
public final void
__sent(final Ice.ConnectionI connection)
{
- synchronized(_monitor)
+ synchronized(__monitor)
{
_sent = true;
if(!_proxy.ice_isTwoway())
{
- cleanup(); // No response expected, we're done with the OutgoingAsync.
+ __release();
}
else if(_response)
{
- _monitor.notifyAll(); // If the response was already received notify finished() which is waiting.
+ __monitor.notifyAll(); // If the response was already received notify finished() which is waiting.
}
else if(connection.timeout() >= 0)
{
@@ -63,7 +50,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
byte replyStatus;
try
{
- synchronized(_monitor)
+ synchronized(__monitor)
{
assert(__os != null);
_response = true;
@@ -77,7 +64,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
{
try
{
- _monitor.wait();
+ __monitor.wait();
}
catch(java.lang.InterruptedException ex)
{
@@ -216,14 +203,8 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
}
catch(java.lang.Exception ex)
{
- warning(ex);
- }
- finally
- {
- synchronized(_monitor)
- {
- cleanup();
- }
+ __warning(ex);
+ __release();
}
}
@@ -231,8 +212,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
public final void
__finished(Ice.LocalException exc)
{
- boolean retry = false;
- synchronized(_monitor)
+ synchronized(__monitor)
{
if(__os != null) // Might be called from __prepare or before __prepare
{
@@ -245,236 +225,213 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
{
try
{
- _monitor.wait();
+ __monitor.wait();
}
catch(java.lang.InterruptedException ex)
{
}
}
-
- //
- // A CloseConnectionException indicates graceful
- // server shutdown, and is therefore always repeatable
- // without violating "at-most-once". That's because by
- // sending a close connection message, the server
- // guarantees that all outstanding requests can safely
- // be repeated. Otherwise, we can also retry if the
- // operation mode is Nonmutating or Idempotent.
- //
- // An ObjectNotExistException can always be retried as
- // well without violating "at-most-once".
- //
- if(!_sent ||
- _mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent ||
- exc instanceof Ice.CloseConnectionException || exc instanceof Ice.ObjectNotExistException)
- {
- retry = true;
- }
- }
- }
-
- if(retry)
- {
- try
- {
- _cnt = _proxy.__handleException(_delegate, exc, _cnt);
- __send();
- return;
- }
- catch(Ice.LocalException ex)
- {
}
}
+
+ //
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback.
+ //
try
{
- ice_exception(exc);
- }
- catch(java.lang.Exception ex)
- {
- warning(ex);
+ handleException(exc); // This will throw if the invocation can't be retried.
+ __send();
}
- finally
+ catch(Ice.LocalException ex)
{
- synchronized(_monitor)
- {
- cleanup();
- }
+ __exception(ex);
}
}
public final void
__finished(LocalExceptionWrapper ex)
{
+ assert(__os != null && !_sent);
+
//
- // NOTE: This is called if sendRequest/sendAsyncRequest fails with
- // a LocalExceptionWrapper exception. It's not possible for the
- // timer to be set at this point because the request couldn't be
- // sent.
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback. The LocalExceptionWrapper exception is only called
+ // before the invocation is sent.
//
- assert(!_sent && _timerTask == null);
try
{
- if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent)
- {
- _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt);
- }
- else
- {
- _proxy.__handleExceptionWrapper(_delegate, ex);
- }
+ handleException(ex); // This will throw if the invocation can't be retried.
__send();
}
catch(Ice.LocalException exc)
{
- try
- {
- ice_exception(exc);
- }
- catch(java.lang.Exception exl)
+ __exception(exc);
+ }
+ }
+
+ protected final void
+ __prepare(Ice.ObjectPrx prx, String operation, Ice.OperationMode mode, java.util.Map context)
+ {
+ assert(__os != null);
+
+ _proxy = (Ice.ObjectPrxHelperBase)prx;
+ _delegate = null;
+ _cnt = 0;
+ _mode = mode;
+
+ //
+ // Can't call async via a batch proxy.
+ //
+ if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram())
+ {
+ throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI");
+ }
+
+ __os.writeBlob(IceInternal.Protocol.requestHdr);
+
+ Reference ref = _proxy.__reference();
+
+ ref.getIdentity().__write(__os);
+
+ //
+ // For compatibility with the old FacetPath.
+ //
+ String facet = ref.getFacet();
+ if(facet == null || facet.length() == 0)
+ {
+ __os.writeStringSeq(null);
+ }
+ else
+ {
+ String[] facetPath = { facet };
+ __os.writeStringSeq(facetPath);
+ }
+
+ __os.writeString(operation);
+
+ __os.writeByte((byte)mode.value());
+
+ if(context != null)
+ {
+ //
+ // Explicit context
+ //
+ Ice.ContextHelper.write(__os, context);
+ }
+ else
+ {
+ //
+ // Implicit context
+ //
+ Ice.ImplicitContextI implicitContext = ref.getInstance().getImplicitContext();
+ java.util.Map prxContext = ref.getContext();
+
+ if(implicitContext == null)
{
- warning(exl);
+ Ice.ContextHelper.write(__os, prxContext);
}
- finally
+ else
{
- synchronized(_monitor)
- {
- cleanup();
- }
+ implicitContext.write(prxContext, __os);
}
}
+
+ __os.startWriteEncaps();
}
protected final void
- __prepare(Ice.ObjectPrx prx, String operation, Ice.OperationMode mode, java.util.Map context)
+ __send()
{
- synchronized(_monitor)
+ while(true)
{
try
{
- //
- // We must first wait for other requests to finish.
- //
- while(__os != null)
- {
- try
- {
- _monitor.wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
-
- //
- // Can't call async via a batch proxy.
- //
- _proxy = (Ice.ObjectPrxHelperBase)prx;
- if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram())
- {
- throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI");
- }
-
- _delegate = null;
- _cnt = 0;
- _mode = mode;
_sent = false;
- _response = false;
-
- Reference ref = _proxy.__reference();
- assert(__is == null);
- __is = new BasicStream(ref.getInstance());
- assert(__os == null);
- __os = new BasicStream(ref.getInstance());
-
- __os.writeBlob(IceInternal.Protocol.requestHdr);
-
- ref.getIdentity().__write(__os);
-
- //
- // For compatibility with the old FacetPath.
- //
- String facet = ref.getFacet();
- if(facet == null || facet.length() == 0)
- {
- __os.writeStringSeq(null);
- }
- else
- {
- String[] facetPath = { facet };
- __os.writeStringSeq(facetPath);
- }
-
- __os.writeString(operation);
-
- __os.writeByte((byte)mode.value());
-
- if(context != null)
- {
- //
- // Explicit context
- //
- Ice.ContextHelper.write(__os, context);
- }
- else
- {
- //
- // Implicit context
- //
- Ice.ImplicitContextI implicitContext = ref.getInstance().getImplicitContext();
- java.util.Map prxContext = ref.getContext();
-
- if(implicitContext == null)
- {
- Ice.ContextHelper.write(__os, prxContext);
- }
- else
- {
- implicitContext.write(prxContext, __os);
- }
- }
-
- __os.startWriteEncaps();
+ _response = false;
+ _delegate = _proxy.__getDelegate(true);
+ _delegate.__getRequestHandler().sendAsyncRequest(this);
+ return;
+ }
+ catch(LocalExceptionWrapper ex)
+ {
+ handleException(ex);
}
catch(Ice.LocalException ex)
{
- cleanup();
- throw ex;
+ handleException(ex);
}
}
}
- protected final void
- __send()
+ protected abstract void __response(boolean ok);
+
+ private void
+ handleException(LocalExceptionWrapper ex)
{
- //
- // NOTE: no synchronization needed. At this point, no other threads can be calling on this object.
- //
+ if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent)
+ {
+ _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt);
+ }
+ else
+ {
+ _proxy.__handleExceptionWrapper(_delegate, ex);
+ }
+ }
- RequestHandler handler;
+ private void
+ handleException(Ice.LocalException exc)
+ {
try
{
- _delegate = _proxy.__getDelegate(true);
- handler = _delegate.__getRequestHandler();
+ //
+ // A CloseConnectionException indicates graceful
+ // server shutdown, and is therefore always repeatable
+ // without violating "at-most-once". That's because by
+ // sending a close connection message, the server
+ // guarantees that all outstanding requests can safely
+ // be repeated.
+ //
+ // An ObjectNotExistException can always be retried as
+ // well without violating "at-most-once".
+ //
+ if(!_sent ||
+ exc instanceof Ice.CloseConnectionException ||
+ exc instanceof Ice.ObjectNotExistException)
+ {
+ throw exc;
+ }
+
+ //
+ // Throw the exception wrapped in a LocalExceptionWrapper, to
+ // indicate that the request cannot be resent without
+ // potentially violating the "at-most-once" principle.
+ //
+ throw new LocalExceptionWrapper(exc, false);
+ }
+ catch(LocalExceptionWrapper ex)
+ {
+ if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent)
+ {
+ _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt);
+ }
+ else
+ {
+ _proxy.__handleExceptionWrapper(_delegate, ex);
+ }
}
catch(Ice.LocalException ex)
{
- __finished(ex);
- return;
+ _cnt = _proxy.__handleException(_delegate, ex, _cnt);
}
-
- _sent = false;
- _response = false;
- handler.sendAsyncRequest(this);
}
- protected abstract void __response(boolean ok);
-
private final void
__runTimerTask(Ice.ConnectionI connection)
{
- synchronized(_monitor)
+ synchronized(__monitor)
{
assert(_timerTask != null && _sent); // Can only be set once the request is sent.
@@ -483,7 +440,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
connection = null;
}
_timerTask = null;
- _monitor.notifyAll();
+ __monitor.notifyAll();
}
if(connection != null)
@@ -492,49 +449,11 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
}
}
- private final void
- warning(java.lang.Exception ex)
- {
- if(__os != null) // Don't print anything if cleanup() was already called.
- {
- Reference ref = _proxy.__reference();
- if(ref.getInstance().initializationData().properties.getPropertyAsIntWithDefault(
- "Ice.Warn.AMICallback", 1) > 0)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- IceUtil.OutputBase out = new IceUtil.OutputBase(pw);
- out.setUseTab(false);
- out.print("exception raised by AMI callback:\n");
- ex.printStackTrace(pw);
- pw.flush();
- ref.getInstance().initializationData().logger.warning(sw.toString());
- }
- }
- }
-
- private final void
- cleanup()
- {
- assert(_timerTask == null);
-
- __is = null;
- __os = null;
-
- _monitor.notify();
- }
-
- protected BasicStream __is;
- protected BasicStream __os;
-
private boolean _sent;
private boolean _response;
private Ice.ObjectPrxHelperBase _proxy;
private Ice._ObjectDel _delegate;
private int _cnt;
private Ice.OperationMode _mode;
-
private TimerTask _timerTask;
-
- private final java.lang.Object _monitor = new java.lang.Object();
}