summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/OutgoingAsync.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2009-11-23 13:28:08 +0100
committerBenoit Foucher <benoit@zeroc.com>2009-11-23 13:28:08 +0100
commit2c578015edcb36cdc0acd0227295de1dcca1b995 (patch)
treee163980b5dabb43a40089a29fdf8ff47a3e07f1c /java/src/IceInternal/OutgoingAsync.java
parentno longer generating inspect method for each Ruby exception (diff)
downloadice-2c578015edcb36cdc0acd0227295de1dcca1b995.tar.bz2
ice-2c578015edcb36cdc0acd0227295de1dcca1b995.tar.xz
ice-2c578015edcb36cdc0acd0227295de1dcca1b995.zip
New AMI mapping
Diffstat (limited to 'java/src/IceInternal/OutgoingAsync.java')
-rw-r--r--java/src/IceInternal/OutgoingAsync.java510
1 files changed, 246 insertions, 264 deletions
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 490b15a73ee..75f52715837 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -9,70 +9,212 @@
package IceInternal;
-public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
+public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessageCallback
{
- public final void
- __sent(final Ice.ConnectionI connection)
+ public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase callback)
{
- synchronized(__monitor)
+ super(((Ice.ObjectPrxHelperBase)prx).__reference().getInstance(), operation, callback);
+ _proxy = (Ice.ObjectPrxHelperBase)prx;
+ }
+
+ public void __prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx,
+ boolean explicitCtx)
+ {
+ _delegate = null;
+ _cnt = 0;
+ _mode = mode;
+ _sentSynchronously = false;
+
+ if(explicitCtx && ctx == null)
+ {
+ ctx = _emptyContext;
+ }
+
+ //
+ // Can't call async via a batch proxy.
+ //
+ if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram())
+ {
+ throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI");
+ }
+
+ _os.writeBlob(IceInternal.Protocol.requestHdr);
+
+ Reference ref = _proxy.__reference();
+
+ ref.getIdentity().__write(_os);
+
+ //
+ // For compatibility with the old FacetPath.
+ //
+ String facet = ref.getFacet();
+ if(facet == null || facet.length() == 0)
+ {
+ _os.writeStringSeq(null);
+ }
+ else
{
- _sent = true;
+ String[] facetPath = { facet };
+ _os.writeStringSeq(facetPath);
+ }
+
+ _os.writeString(operation);
+
+ _os.writeByte((byte)mode.ordinal());
- if(!_proxy.ice_isTwoway())
+ if(ctx != null)
+ {
+ //
+ // Explicit context
+ //
+ Ice.ContextHelper.write(_os, ctx);
+ }
+ else
+ {
+ //
+ // Implicit context
+ //
+ Ice.ImplicitContextI implicitContext = ref.getInstance().getImplicitContext();
+ java.util.Map<String, String> prxContext = ref.getContext();
+
+ if(implicitContext == null)
{
- __releaseCallback();
+ Ice.ContextHelper.write(_os, prxContext);
}
- else if(_response)
+ else
{
- __monitor.notifyAll(); // If the response was already received notify finished() which is waiting.
+ implicitContext.write(prxContext, _os);
}
- else if(connection.timeout() >= 0)
+ }
+
+ _os.startWriteEncaps();
+ }
+
+ public Ice.ObjectPrx getProxy()
+ {
+ return _proxy;
+ }
+
+ public boolean __sent(final Ice.ConnectionI connection)
+ {
+ synchronized(_monitor)
+ {
+ boolean alreadySent = (_state & Sent) != 0;
+ _state |= Sent;
+
+ if((_state & Done) == 0)
{
- assert(_timerTask == null);
- _timerTask = new TimerTask()
+ if(!_proxy.ice_isTwoway())
{
- public void
- runTimerTask()
- {
- __runTimerTask(connection);
- }
- };
- _proxy.__reference().getInstance().timer().schedule(_timerTask, connection.timeout());
+ _state |= Done | OK;
+ }
+ else if(connection.timeout() > 0)
+ {
+ assert(_timerTaskConnection == null && _timerTask == null);
+ _timerTaskConnection = connection;
+ _timerTask = new TimerTask()
+ {
+ public void
+ runTimerTask()
+ {
+ __runTimerTask();
+ }
+ };
+ _instance.timer().schedule(_timerTask, connection.timeout());
+ }
}
+ _monitor.notifyAll();
+ return !alreadySent; // Don't call the sent call is already sent.
}
}
- public final void
- __finished(BasicStream is)
+ public void __sent()
+ {
+ __sentInternal();
+ }
+
+ public void __finished(Ice.LocalException exc, boolean sent)
+ {
+ synchronized(_monitor)
+ {
+ assert((_state & Done) == 0);
+ if(_timerTaskConnection != null)
+ {
+ _instance.timer().cancel(_timerTask);
+ _timerTaskConnection = null;
+ _timerTask = null;
+ }
+ }
+
+ //
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback.
+ //
+
+ try
+ {
+ int interval = handleException(exc, sent); // This will throw if the invocation can't be retried.
+ if(interval > 0)
+ {
+ _instance.retryQueue().add(this, interval);
+ }
+ else
+ {
+ __send(false);
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ __exception(ex);
+ }
+ }
+
+ public final void __finished(LocalExceptionWrapper exc)
+ {
+ //
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback. The LocalExceptionWrapper exception is only called
+ // before the invocation is sent.
+ //
+
+ try
+ {
+ int interval = handleException(exc); // This will throw if the invocation can't be retried.
+ if(interval > 0)
+ {
+ _instance.retryQueue().add(this, interval);
+ }
+ else
+ {
+ __send(false);
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ __exception(ex);
+ }
+ }
+
+ public final void __finished(BasicStream is)
{
assert(_proxy.ice_isTwoway()); // Can only be called for twoways.
byte replyStatus;
try
{
- synchronized(__monitor)
+ synchronized(_monitor)
{
- assert(__os != null);
- _response = true;
+ assert(_exception == null && (_state & Done) == 0);
- if(_timerTask != null && _proxy.__reference().getInstance().timer().cancel(_timerTask))
+ if(_timerTaskConnection != null)
{
- _timerTask = null; // Timer cancelled.
+ _instance.timer().cancel(_timerTask);
+ _timerTaskConnection = null;
+ _timerTask = null;
}
- while(!_sent || _timerTask != null)
- {
- try
- {
- __monitor.wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
-
- __is.swap(is);
- replyStatus = __is.readByte();
+ _is.swap(is);
+ replyStatus = _is.readByte();
switch(replyStatus)
{
@@ -87,12 +229,12 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
case ReplyStatus.replyOperationNotExist:
{
Ice.Identity id = new Ice.Identity();
- id.__read(__is);
+ id.__read(_is);
//
// For compatibility with the old FacetPath.
//
- String[] facetPath = __is.readStringSeq();
+ String[] facetPath = _is.readStringSeq();
String facet;
if(facetPath.length > 0)
{
@@ -107,7 +249,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
facet = "";
}
- String operation = __is.readString();
+ String operation = _is.readString();
Ice.RequestFailedException ex = null;
switch(replyStatus)
@@ -147,7 +289,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
case ReplyStatus.replyUnknownLocalException:
case ReplyStatus.replyUnknownUserException:
{
- String unknown = __is.readString();
+ String unknown = _is.readString();
Ice.UnknownException ex = null;
switch(replyStatus)
@@ -186,234 +328,61 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
throw new Ice.UnknownReplyStatusException();
}
}
+
+ _state |= Done;
+ if(replyStatus == ReplyStatus.replyOK)
+ {
+ _state |= OK;
+ }
+ _monitor.notifyAll();
}
}
catch(Ice.LocalException ex)
{
- __finished(ex);
+ __finished(ex, true);
return;
}
assert(replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException);
-
- try
- {
- __response(replyStatus == ReplyStatus.replyOK);
- }
- catch(java.lang.Exception ex)
- {
- __warning(ex);
- __releaseCallback();
- }
+ __response();
}
-
- public final void
- __finished(Ice.LocalException exc)
+ public final boolean __send(boolean synchronous)
{
- synchronized(__monitor)
+ while(true)
{
- if(__os != null) // Might be called from __prepare or before __prepare
+ int interval = 0;
+ try
{
- if(_timerTask != null && _proxy.__reference().getInstance().timer().cancel(_timerTask))
- {
- _timerTask = null; // Timer cancelled.
- }
-
- while(_timerTask != null)
+ _delegate = _proxy.__getDelegate(true);
+ boolean sent = _delegate.__getRequestHandler().sendAsyncRequest(this);
+ if(synchronous) // Only set sentSynchronously_ If called synchronously by the user thread.
{
- try
- {
- __monitor.wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
+ _sentSynchronously = sent;
}
+ break;
}
- }
-
- //
- // NOTE: at this point, synchronization isn't needed, no other threads should be
- // calling on the callback.
- //
-
- try
- {
- handleException(exc); // This will throw if the invocation can't be retried.
- }
- catch(Ice.LocalException ex)
- {
- __exception(ex);
- }
- }
-
- public final void
- __finished(LocalExceptionWrapper ex)
- {
- assert(__os != null && !_sent);
-
- //
- // NOTE: at this point, synchronization isn't needed, no other threads should be
- // calling on the callback. The LocalExceptionWrapper exception is only called
- // before the invocation is sent.
- //
-
- try
- {
- handleException(ex); // This will throw if the invocation can't be retried.
- }
- catch(Ice.LocalException exc)
- {
- __exception(exc);
- }
- }
-
- public final void
- __retry(int cnt, int interval)
- {
- //
- // This method is called by the proxy to retry an invocation. It's safe to update
- // the count here without synchronization, no other threads can access this object.
- //
- _cnt = cnt;
- if(interval > 0)
- {
- assert(__os != null);
- __os.instance().retryQueue().add(this, interval);
- }
- else
- {
- __send();
- }
- }
-
- public final boolean
- __send()
- {
- try
- {
- _sent = false;
- _response = false;
- _delegate = _proxy.__getDelegate(true);
- _sentSynchronously = _delegate.__getRequestHandler().sendAsyncRequest(this);
- }
- catch(LocalExceptionWrapper ex)
- {
- handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously
- }
- catch(Ice.LocalException ex)
- {
- handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously
- }
- return _sentSynchronously;
- }
-
- protected final void
- __prepare(Ice.ObjectPrx prx, String operation, Ice.OperationMode mode, java.util.Map<String, String> context)
- {
- assert(__os != null);
-
- _proxy = (Ice.ObjectPrxHelperBase)prx;
- _delegate = null;
- _cnt = 0;
- _mode = mode;
- _sentSynchronously = false;
-
- //
- // Can't call async via a batch proxy.
- //
- if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram())
- {
- throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI");
- }
-
- __os.writeBlob(IceInternal.Protocol.requestHdr);
-
- Reference ref = _proxy.__reference();
-
- ref.getIdentity().__write(__os);
-
- //
- // For compatibility with the old FacetPath.
- //
- String facet = ref.getFacet();
- if(facet == null || facet.length() == 0)
- {
- __os.writeStringSeq(null);
- }
- else
- {
- String[] facetPath = { facet };
- __os.writeStringSeq(facetPath);
- }
-
- __os.writeString(operation);
-
- __os.writeByte((byte)mode.ordinal());
-
- if(context != null)
- {
- //
- // Explicit context
- //
- Ice.ContextHelper.write(__os, context);
- }
- else
- {
- //
- // Implicit context
- //
- Ice.ImplicitContextI implicitContext = ref.getInstance().getImplicitContext();
- java.util.Map<String, String> prxContext = ref.getContext();
-
- if(implicitContext == null)
+ catch(LocalExceptionWrapper ex)
{
- Ice.ContextHelper.write(__os, prxContext);
+ interval = handleException(ex);
}
- else
+ catch(Ice.LocalException ex)
{
- implicitContext.write(prxContext, __os);
+ interval = handleException(ex, false);
}
- }
-
- __os.startWriteEncaps();
- }
- protected abstract void __response(boolean ok);
-
- protected void
- __throwUserException()
- throws Ice.UserException
- {
- try
- {
- __is.startReadEncaps();
- __is.throwException();
- }
- catch(Ice.UserException ex)
- {
- __is.endReadEncaps();
- throw ex;
- }
- }
-
- private void
- handleException(LocalExceptionWrapper ex)
- {
- if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent)
- {
- _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, this, _cnt);
- }
- else
- {
- _proxy.__handleExceptionWrapper(_delegate, ex, this);
+ if(interval > 0)
+ {
+ _instance.retryQueue().add(this, interval);
+ return false;
+ }
}
+ return _sentSynchronously;
}
- private void
- handleException(Ice.LocalException exc)
+ private int handleException(Ice.LocalException exc, boolean sent)
{
+ Ice.IntHolder interval = new Ice.IntHolder(0);
try
{
//
@@ -426,7 +395,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
// "at-most-once" (see the implementation of the checkRetryAfterException method of
// the ProxyFactory class for the reasons why it can be useful).
//
- if(!_sent ||
+ if(!sent ||
exc instanceof Ice.CloseConnectionException ||
exc instanceof Ice.ObjectNotExistException)
{
@@ -444,32 +413,43 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
{
if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent)
{
- _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, this, _cnt);
+ _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, interval, _cnt);
}
else
{
- _proxy.__handleExceptionWrapper(_delegate, ex, this);
+ _proxy.__handleExceptionWrapper(_delegate, ex);
}
}
catch(Ice.LocalException ex)
{
- _proxy.__handleException(_delegate, ex, this, _cnt);
+ _cnt = _proxy.__handleException(_delegate, ex, interval, _cnt);
}
+ return interval.value;
}
- private final void
- __runTimerTask(Ice.ConnectionI connection)
+ private int handleException(LocalExceptionWrapper ex)
{
- synchronized(__monitor)
+ Ice.IntHolder interval = new Ice.IntHolder(0);
+ if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent)
+ {
+ _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, interval, _cnt);
+ }
+ else
{
- assert(_timerTask != null && _sent); // Can only be set once the request is sent.
+ _proxy.__handleExceptionWrapper(_delegate, ex);
+ }
+ return interval.value;
+ }
- if(_response) // If the response was just received, don't close the connection.
- {
- connection = null;
- }
+ private final void
+ __runTimerTask()
+ {
+ Ice.ConnectionI connection;
+ synchronized(_monitor)
+ {
+ connection = _timerTaskConnection;
+ _timerTaskConnection = null;
_timerTask = null;
- __monitor.notifyAll();
}
if(connection != null)
@@ -478,12 +458,14 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
}
}
- private boolean _sent;
- private boolean _sentSynchronously;
- private boolean _response;
- private Ice.ObjectPrxHelperBase _proxy;
+ protected Ice.ObjectPrxHelperBase _proxy;
+
+ private Ice.ConnectionI _timerTaskConnection;
+ private TimerTask _timerTask;
+
private Ice._ObjectDel _delegate;
private int _cnt;
private Ice.OperationMode _mode;
- private TimerTask _timerTask;
+
+ private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>();
}