summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/OutgoingAsync.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/OutgoingAsync.java')
-rw-r--r--java/src/IceInternal/OutgoingAsync.java378
1 files changed, 224 insertions, 154 deletions
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index dc7720c70eb..d205d90abf5 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -19,117 +19,42 @@ public abstract class OutgoingAsync
public
OutgoingAsync()
{
- _is = null;
- _os = null;
+ __is = null;
+ __os = null;
}
public abstract void ice_exception(Ice.LocalException ex);
- public void
- __setup(Connection connection, Reference ref, String operation, Ice.OperationMode mode, java.util.Map context)
+ public final void
+ __finished(BasicStream is)
{
- try
- {
- _connection = connection;
- if(_is == null)
- {
- _is = new BasicStream(ref.instance);
- }
- else
- {
- _is.reset();
- }
- if(_os == null)
- {
- _os = new BasicStream(ref.instance);
- }
- else
- {
- _os.reset();
- }
+ //
+ // No mutex protection necessary, this function can only be
+ // called after __send() and __prepare() have completed.
+ //
- _connection.prepareRequest(_os);
+ assert(_reference != null);
+ assert(_connection != null);
- ref.identity.__write(_os);
- _os.writeStringSeq(ref.facet);
- _os.writeString(operation);
- _os.writeByte((byte)mode.value());
- if(context == null)
- {
- _os.writeSize(0);
- }
- else
- {
- final int sz = context.size();
- _os.writeSize(sz);
- if(sz > 0)
- {
- java.util.Iterator i = context.entrySet().iterator();
- while(i.hasNext())
- {
- java.util.Map.Entry entry = (java.util.Map.Entry)i.next();
- _os.writeString((String)entry.getKey());
- _os.writeString((String)entry.getValue());
- }
- }
- }
-
- //
- // Input and output parameters are always sent in an
- // encapsulation, which makes it possible to forward
- // requests as blobs.
- //
- _os.startWriteEncaps();
- }
- catch(RuntimeException ex)
- {
- destroy();
- throw ex;
- }
- }
+ int status;
- public void
- __invoke()
- {
try
{
- _os.endWriteEncaps();
-
- _connection.sendAsyncRequest(_os, this);
-
- if(_connection.timeout() >= 0)
+ if(__is != null)
{
- _absoluteTimeoutMillis = System.currentTimeMillis() + _connection.timeout();
+ __is.destroy();
}
- }
- catch(RuntimeException ex)
- {
- destroy();
- throw ex;
- }
- }
+ __is = new BasicStream(_reference.instance);
+ __is.swap(is);
- public void
- __finished(BasicStream is)
- {
- try
- {
- _is.swap(is);
- byte status = _is.readByte();
+ status = (int)__is.readByte();
- switch((int)status)
+ switch(status)
{
case DispatchStatus._DispatchOK:
- {
- _is.startReadEncaps();
- __response(true);
- break;
- }
-
case DispatchStatus._DispatchUserException:
{
- _is.startReadEncaps();
- __response(false);
+ __is.startReadEncaps();
break;
}
@@ -137,96 +62,154 @@ public abstract class OutgoingAsync
{
Ice.ObjectNotExistException ex = new Ice.ObjectNotExistException();
ex.id = new Ice.Identity();
- ex.id.__read(_is);
- ex.facet = _is.readStringSeq();
- ex.operation = _is.readString();
- ice_exception(ex);
- break;
+ ex.id.__read(__is);
+ ex.facet = __is.readStringSeq();
+ ex.operation = __is.readString();
+ throw ex;
}
case DispatchStatus._DispatchFacetNotExist:
{
Ice.FacetNotExistException ex = new Ice.FacetNotExistException();
ex.id = new Ice.Identity();
- ex.id.__read(_is);
- ex.facet = _is.readStringSeq();
- ex.operation = _is.readString();
- ice_exception(ex);
- break;
+ ex.id.__read(__is);
+ ex.facet = __is.readStringSeq();
+ ex.operation = __is.readString();
+ throw ex;
}
case DispatchStatus._DispatchOperationNotExist:
{
Ice.OperationNotExistException ex = new Ice.OperationNotExistException();
ex.id = new Ice.Identity();
- ex.id.__read(_is);
- ex.facet = _is.readStringSeq();
- ex.operation = _is.readString();
- ice_exception(ex);
- break;
+ ex.id.__read(__is);
+ ex.facet = __is.readStringSeq();
+ ex.operation = __is.readString();
+ throw ex;
}
case DispatchStatus._DispatchUnknownException:
{
Ice.UnknownException ex = new Ice.UnknownException();
- ex.unknown = _is.readString();
- ice_exception(ex);
- break;
+ ex.unknown = __is.readString();
+ throw ex;
}
case DispatchStatus._DispatchUnknownLocalException:
{
Ice.UnknownLocalException ex = new Ice.UnknownLocalException();
- ex.unknown = _is.readString();
- ice_exception(ex);
- break;
+ ex.unknown = __is.readString();
+ throw ex;
}
case DispatchStatus._DispatchUnknownUserException:
{
Ice.UnknownUserException ex = new Ice.UnknownUserException();
- ex.unknown = _is.readString();
- ice_exception(ex);
- break;
+ ex.unknown = __is.readString();
+ throw ex;
}
default:
{
- ice_exception(new Ice.UnknownReplyStatusException());
- break;
+ throw new Ice.UnknownReplyStatusException();
}
}
}
- catch(Exception ex)
+ catch(Ice.LocalException ex)
{
- warning(ex);
+ __finished(ex);
+ return;
+ }
+
+ assert(status == DispatchStatus._DispatchOK || status == DispatchStatus._DispatchUserException);
+
+ try
+ {
+ __response(status == DispatchStatus._DispatchOK);
}
- finally
+ catch(Exception ex)
{
- destroy();
+ warning(ex);
}
}
- public void
+ public final void
__finished(Ice.LocalException exc)
{
- try
+ //
+ // No mutex protection necessary, this function can only be called
+ // after __send() and __prepare() have completed.
+ //
+
+ assert(_reference != null);
+ //assert(_connection != null); // Might be null, if getConnection() failed.
+
+ if(_reference.locatorInfo != null)
{
- ice_exception(exc);
+ _reference.locatorInfo.clearObjectCache(_reference);
}
- catch(Exception ex)
+
+ boolean doRetry = false;
+
+ //
+ // A CloseConnectionException indicates graceful server
+ // shutdown, and is therefore always repeatable without
+ // violating "at-most-once". That's because by sending a close
+ // connection message, the server guarantees that all
+ // outstanding requests can safely be repeated. Otherwise, we
+ // can also retry if the operation mode Nonmutating or
+ // Idempotent.
+ //
+ if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent ||
+ exc instanceof Ice.CloseConnectionException)
{
- warning(ex);
+ try
+ {
+ ProxyFactory proxyFactory = _reference.instance.proxyFactory();
+ if(proxyFactory != null)
+ {
+ _cnt = proxyFactory.checkRetryAfterException(exc, _cnt);
+ }
+ else
+ {
+ throw exc; // The communicator is already destroyed, so we cannot retry.
+ }
+
+ doRetry = true;
+ }
+ catch(Ice.LocalException ex)
+ {
+ }
}
- finally
+
+ if(doRetry)
{
- destroy();
+ _connection = null;
+ __send();
+ }
+ else
+ {
+ try
+ {
+ ice_exception(exc);
+ }
+ catch(Exception ex)
+ {
+ warning(ex);
+ }
}
}
- public boolean
+ public final boolean
__timedOut()
{
+ //
+ // No mutex protection necessary, this function can only be called
+ // after __send() and __prepare() have completed.
+ //
+
+ assert(_connection != null);
+
if(_connection.timeout() >= 0)
{
return System.currentTimeMillis() >= _absoluteTimeoutMillis;
@@ -237,39 +220,121 @@ public abstract class OutgoingAsync
}
}
- public BasicStream
- __is()
+ protected final void
+ __prepare(Reference ref, String operation, Ice.OperationMode mode, java.util.Map context)
{
- return _is;
- }
+ //
+ // No mutex protection necessary, using this object for a new
+ // AMI call while another one is in progress is not allowed
+ // and leads to undefined behavior.
+ //
+
+ _reference = ref;
+ _connection = _reference.getConnection();
+ _cnt = 0;
+ _mode = mode;
+
+ if(__os != null)
+ {
+ __os.destroy();
+ }
+ __os = new BasicStream(_reference.instance);
+
+ _connection.prepareRequest(__os);
- public BasicStream
- __os()
- {
- return _os;
- }
+ ref.identity.__write(__os);
+ __os.writeStringSeq(ref.facet);
+ __os.writeString(operation);
+ __os.writeByte((byte)mode.value());
+ if(context == null)
+ {
+ __os.writeSize(0);
+ }
+ else
+ {
+ final int sz = context.size();
+ __os.writeSize(sz);
+ if(sz > 0)
+ {
+ java.util.Iterator i = context.entrySet().iterator();
+ while(i.hasNext())
+ {
+ java.util.Map.Entry entry = (java.util.Map.Entry)i.next();
+ __os.writeString((String)entry.getKey());
+ __os.writeString((String)entry.getValue());
+ }
+ }
+ }
- protected abstract void __response(boolean ok);
+ __os.startWriteEncaps();
+ }
- private void
- destroy()
+ protected final void
+ __send()
{
- if(_is != null)
+ assert(_reference != null);
+ //assert(_connection != null); // Might be null, if called from __finished() for retry.
+
+ try
{
- _is.destroy();
- _is = null;
+ while(true)
+ {
+ if(_connection == null)
+ {
+ _connection = _reference.getConnection();
+ }
+
+ if(_connection.timeout() >= 0)
+ {
+ _absoluteTimeoutMillis = System.currentTimeMillis() + _connection.timeout();
+ }
+
+ try
+ {
+ _connection.sendAsyncRequest(__os, this);
+
+ //
+ // Don't do anything after sendAsyncRequest() returned
+ // without an exception. I such case, there will be
+ // callbacks, i.e., calls to the __finished()
+ // functions. Since there is no mutex protection, we
+ // cannot modify state here and in such callbacks.
+ //
+ return;
+ }
+ catch(Ice.LocalException ex)
+ {
+ if(_reference.locatorInfo != null)
+ {
+ _reference.locatorInfo.clearObjectCache(_reference);
+ }
+
+ ProxyFactory proxyFactory = _reference.instance.proxyFactory();
+ if(proxyFactory != null)
+ {
+ _cnt = proxyFactory.checkRetryAfterException(ex, _cnt);
+ }
+ else
+ {
+ throw ex; // The communicator is already destroyed, so we cannot retry.
+ }
+ }
+
+ _connection = null;
+ }
}
- if(_os != null)
+ catch(Ice.LocalException ex)
{
- _os.destroy();
- _os = null;
+ __finished(ex);
}
}
- private void
+ protected abstract void __response(boolean ok);
+
+ private final void
warning(Exception ex)
{
- if(_os.instance().properties().getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ if(__os.instance().properties().getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
{
java.io.StringWriter sw = new java.io.StringWriter();
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
@@ -278,12 +343,17 @@ public abstract class OutgoingAsync
out.print("exception raised by AMI callback:\n");
ex.printStackTrace(pw);
pw.flush();
- _os.instance().logger().warning(sw.toString());
+ __os.instance().logger().warning(sw.toString());
}
}
+ protected BasicStream __is;
+ protected BasicStream __os;
+
+ private Reference _reference;
private Connection _connection;
+ private int _cnt;
+ private Ice.OperationMode _mode;
+
private long _absoluteTimeoutMillis;
- private BasicStream _is;
- private BasicStream _os;
}