summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-10-10 12:03:07 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-10-10 12:03:07 +0200
commit570455a381e6620f8ddfcca448559d3fa545ba38 (patch)
treefe3fa45e6a643b473d9370babff6224b1a9d4dcb /java/src
parentFixed ICE-5726: provide deprecated public StringConverterPlugin (diff)
downloadice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.bz2
ice-570455a381e6620f8ddfcca448559d3fa545ba38.tar.xz
ice-570455a381e6620f8ddfcca448559d3fa545ba38.zip
Fixed invocation timeouts/interrupt issues, addded AsyncResult.cancel()
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/AsyncResult.java7
-rw-r--r--java/src/Ice/CommunicatorI.java10
-rw-r--r--java/src/Ice/ConnectionI.java96
-rw-r--r--java/src/Ice/ObjectAdapterI.java4
-rw-r--r--java/src/Ice/ObjectPrxHelperBase.java108
-rw-r--r--java/src/IceInternal/AsyncResultI.java475
-rw-r--r--java/src/IceInternal/BatchOutgoingAsync.java117
-rw-r--r--java/src/IceInternal/CancellationHandler.java15
-rw-r--r--java/src/IceInternal/CollocatedRequestHandler.java65
-rw-r--r--java/src/IceInternal/CommunicatorFlushBatch.java (renamed from java/src/IceInternal/CommunicatorBatchOutgoingAsync.java)123
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java32
-rw-r--r--java/src/IceInternal/ConnectionBatchOutgoingAsync.java107
-rw-r--r--java/src/IceInternal/ConnectionFlushBatch.java128
-rw-r--r--java/src/IceInternal/ConnectionRequestHandler.java12
-rw-r--r--java/src/IceInternal/GetConnectionOutgoingAsync.java148
-rw-r--r--java/src/IceInternal/IncomingConnectionFactory.java2
-rw-r--r--java/src/IceInternal/ObjectAdapterFactory.java2
-rw-r--r--java/src/IceInternal/OutgoingAsync.java623
-rw-r--r--java/src/IceInternal/OutgoingAsyncBase.java571
-rw-r--r--java/src/IceInternal/OutgoingAsyncMessageCallback.java53
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java2
-rw-r--r--java/src/IceInternal/ProxyBatchOutgoingAsync.java97
-rw-r--r--java/src/IceInternal/ProxyFactory.java2
-rw-r--r--java/src/IceInternal/ProxyFlushBatch.java71
-rw-r--r--java/src/IceInternal/ProxyGetConnection.java59
-rw-r--r--java/src/IceInternal/ProxyOutgoingAsyncBase.java278
-rw-r--r--java/src/IceInternal/QueueRequestHandler.java13
-rw-r--r--java/src/IceInternal/RequestHandler.java6
-rw-r--r--java/src/IceInternal/RetryQueue.java13
-rw-r--r--java/src/IceInternal/RetryTask.java44
30 files changed, 1533 insertions, 1750 deletions
diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java
index df247ac9358..477dec5f5ac 100644
--- a/java/src/Ice/AsyncResult.java
+++ b/java/src/Ice/AsyncResult.java
@@ -16,6 +16,13 @@ package Ice;
**/
public interface AsyncResult
{
+ /**
+ * If not completed, cancels the request. This is a local
+ * operation, it won't cancel the request on the server side. The
+ * request won't be sent if it was waiting to be sent or the
+ * response will be ignored if it received after the callback.
+ **/
+ public void cancel();
/**
* Returns the communicator that sent the invocation.
diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java
index 76d46baab6a..fab8cf98605 100644
--- a/java/src/Ice/CommunicatorI.java
+++ b/java/src/Ice/CommunicatorI.java
@@ -263,8 +263,10 @@ public final class CommunicatorI implements Communicator
// This callback object receives the results of all invocations
// of Connection.begin_flushBatchRequests.
//
- IceInternal.CommunicatorBatchOutgoingAsync result =
- new IceInternal.CommunicatorBatchOutgoingAsync(this, _instance, __flushBatchRequests_name, cb);
+ IceInternal.CommunicatorFlushBatch result = new IceInternal.CommunicatorFlushBatch(this,
+ _instance,
+ __flushBatchRequests_name,
+ cb);
connectionFactory.flushAsyncBatchRequests(result);
adapterFactory.flushAsyncBatchRequests(result);
@@ -282,8 +284,8 @@ public final class CommunicatorI implements Communicator
public void
end_flushBatchRequests(AsyncResult r)
{
- IceInternal.OutgoingAsyncBase ri = (IceInternal.OutgoingAsyncBase)r;
- IceInternal.OutgoingAsyncBase.check(ri, this, __flushBatchRequests_name);
+ IceInternal.CommunicatorFlushBatch ri =
+ IceInternal.CommunicatorFlushBatch.check(r, this, __flushBatchRequests_name);
ri.__wait();
}
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 73880042340..9ee6ac10121 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -9,7 +9,8 @@
package Ice;
-public final class ConnectionI extends IceInternal.EventHandler implements Connection, IceInternal.ResponseHandler
+public final class ConnectionI extends IceInternal.EventHandler
+ implements Connection, IceInternal.ResponseHandler, IceInternal.CancellationHandler
{
public interface StartCallback
{
@@ -373,8 +374,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
os.writeInt(requestId);
}
- out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, os.size() -
- IceInternal.Protocol.headerSize - 4);
+ out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
int status;
try
@@ -388,6 +388,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
throw (Ice.LocalException) _exception.fillInStackTrace();
}
+ if(response || (status & IceInternal.AsyncStatus.Queued) > 0)
+ {
+ out.cancelable(this); // Notify the request that it's cancelable
+ }
+
if(response)
{
//
@@ -640,29 +645,21 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private Ice.AsyncResult begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb)
{
- IceInternal.ConnectionBatchOutgoingAsync result = new IceInternal.ConnectionBatchOutgoingAsync(this,
- _communicator, _instance, __flushBatchRequests_name, cb);
- try
- {
- result.__invoke();
- }
- catch(LocalException __ex)
- {
- result.invokeExceptionAsync(__ex);
- }
-
+ IceInternal.ConnectionFlushBatch result =
+ new IceInternal.ConnectionFlushBatch(this, _communicator, _instance, __flushBatchRequests_name, cb);
+ result.invoke();
return result;
}
@Override
public void end_flushBatchRequests(AsyncResult ir)
{
- IceInternal.OutgoingAsyncBase r = (IceInternal.OutgoingAsyncBase) ir;
- IceInternal.OutgoingAsyncBase.check(r, this, __flushBatchRequests_name);
+ IceInternal.ConnectionFlushBatch r =
+ IceInternal.ConnectionFlushBatch.check(ir, this, __flushBatchRequests_name);
r.__wait();
}
- synchronized public int flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync)
+ synchronized public int flushAsyncBatchRequests(IceInternal.OutgoingAsyncBase outAsync)
{
waitBatchStreamInUse();
@@ -687,11 +684,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_batchStream.pos(IceInternal.Protocol.headerSize);
_batchStream.writeInt(_batchRequestNum);
- outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0, _batchStream.size() -
- IceInternal.Protocol.headerSize - 4);
-
_batchStream.swap(outAsync.getOs());
+ outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0);
+
//
// Send the batch stream.
//
@@ -708,6 +704,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
throw (Ice.LocalException) _exception.fillInStackTrace();
}
+ if((status & IceInternal.AsyncStatus.Queued) > 0)
+ {
+ outAsync.cancelable(this); // Notify the request that it's cancelable.
+ }
+
//
// Reset the batch stream.
//
@@ -728,12 +729,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(callback != null)
{
- class CallbackWorkItem extends IceInternal.DispatchWorkItem
+ _threadPool.dispatch(new IceInternal.DispatchWorkItem(this)
{
- public CallbackWorkItem()
- {
- }
-
@Override
public void run()
{
@@ -746,8 +743,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_logger.error("connection callback exception:\n" + ex + '\n' + _desc);
}
}
- };
- _threadPool.dispatch(new CallbackWorkItem());
+ });
}
}
else
@@ -793,9 +789,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff);
}
- synchronized public boolean asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync,
- Ice.LocalException ex)
+ @Override
+ synchronized public void asyncRequestCanceled(IceInternal.OutgoingAsyncBase outAsync, Ice.LocalException ex)
{
+ if(_state >= StateClosed)
+ {
+ return; // The request has already been or will be shortly notified of the failure.
+ }
+
java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
while(it.hasNext())
{
@@ -815,13 +816,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// it's fine if the OutgoingAsync output stream is released (and
// as long as canceled requests cannot be retried).
//
- o.timedOut();
+ o.canceled();
if(o != _sendStreams.getFirst())
{
it.remove();
}
- outAsync.dispatchInvocationCancel(ex, _threadPool, this);
- return true; // We're done
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
+ return;
}
}
@@ -834,12 +838,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(it2.next() == o)
{
it2.remove();
- outAsync.dispatchInvocationCancel(ex, _threadPool, this);
- return true; // We're done.
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
}
}
}
- return false;
}
@Override
@@ -1469,7 +1474,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
for(OutgoingMessage p : _sendStreams)
{
- p.finished(_exception);
+ p.completed(_exception);
if(p.requestId > 0) // Make sure finished isn't called twice.
{
_asyncRequests.remove(p.requestId);
@@ -1480,7 +1485,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
for(IceInternal.OutgoingAsync p : _asyncRequests.values())
{
- p.finished(_exception);
+ if(p.completed(_exception))
+ {
+ p.invokeCompleted();
+ }
}
_asyncRequests.clear();
@@ -2580,7 +2588,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
info.requestId = info.stream.readInt();
IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId);
- if(outAsync != null && outAsync.finished(info.stream))
+ if(outAsync != null && outAsync.completed(info.stream))
{
info.outAsync = outAsync;
++info.messageDispatchCount;
@@ -2999,7 +3007,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
this.requestId = 0;
}
- OutgoingMessage(IceInternal.OutgoingAsyncMessageCallback out, IceInternal.BasicStream stream, boolean compress,
+ OutgoingMessage(IceInternal.OutgoingAsyncBase out, IceInternal.BasicStream stream, boolean compress,
int requestId)
{
this.stream = stream;
@@ -3008,7 +3016,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
this.requestId = requestId;
}
- public void timedOut()
+ public void canceled()
{
assert (outAsync != null);
outAsync = null;
@@ -3035,16 +3043,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return false;
}
- public void finished(Ice.LocalException ex)
+ public void completed(Ice.LocalException ex)
{
- if(outAsync != null)
+ if(outAsync != null && outAsync.completed(ex))
{
- outAsync.finished(ex);
+ outAsync.invokeCompleted();
}
}
public IceInternal.BasicStream stream;
- public IceInternal.OutgoingAsyncMessageCallback outAsync;
+ public IceInternal.OutgoingAsyncBase outAsync;
public boolean compress;
public int requestId;
boolean adopt;
diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java
index 7538fc16ced..48ad2739082 100644
--- a/java/src/Ice/ObjectAdapterI.java
+++ b/java/src/Ice/ObjectAdapterI.java
@@ -13,8 +13,6 @@ import java.util.Map;
public final class ObjectAdapterI implements ObjectAdapter
{
-
-
@Override
public String
getName()
@@ -751,7 +749,7 @@ public final class ObjectAdapterI implements ObjectAdapter
}
public void
- flushAsyncBatchRequests(IceInternal.CommunicatorBatchOutgoingAsync outAsync)
+ flushAsyncBatchRequests(IceInternal.CommunicatorFlushBatch outAsync)
{
java.util.List<IceInternal.IncomingConnectionFactory> f;
synchronized(this)
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java
index 17b426c69cb..3f7bb25d9ad 100644
--- a/java/src/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/ObjectPrxHelperBase.java
@@ -286,11 +286,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.BasicStream __os = __result.startWriteParams(Ice.FormatType.DefaultFormat);
__os.writeString(__id);
__result.endWriteParams();
- __result.invoke(true);
+ __result.invoke();
}
catch(Exception __ex)
{
- __result.invokeExceptionAsync(__ex);
+ __result.abort(__ex);
}
return __result;
}
@@ -305,8 +305,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final boolean
end_ice_isA(AsyncResult __iresult)
{
- IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult;
- IceInternal.OutgoingAsyncBase.check(__result, this, __ice_isA_name);
+ IceInternal.OutgoingAsync __result = IceInternal.OutgoingAsync.check(__iresult, this, __ice_isA_name);
try
{
if(!__result.__wait())
@@ -544,11 +543,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
{
__result.prepare(__ice_ping_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous);
__result.writeEmptyParams();
- __result.invoke(true);
+ __result.invoke();
}
catch(Exception __ex)
{
- __result.invokeExceptionAsync(__ex);
+ __result.abort(__ex);
}
return __result;
}
@@ -777,11 +776,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
{
__result.prepare(__ice_ids_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous);
__result.writeEmptyParams();
- __result.invoke(true);
+ __result.invoke();
}
catch(Exception __ex)
{
- __result.invokeExceptionAsync(__ex);
+ __result.abort(__ex);
}
return __result;
}
@@ -797,8 +796,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final String[]
end_ice_ids(AsyncResult __iresult)
{
- IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase) __iresult;
- IceInternal.OutgoingAsyncBase.check(__result, this, __ice_ids_name);
+ IceInternal.OutgoingAsync __result = IceInternal.OutgoingAsync.check(__iresult, this, __ice_ids_name);
try
{
if(!__result.__wait())
@@ -1054,11 +1052,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
{
__result.prepare(__ice_id_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous);
__result.writeEmptyParams();
- __result.invoke(true);
+ __result.invoke();
}
catch(Exception __ex)
{
- __result.invokeExceptionAsync(__ex);
+ __result.abort(__ex);
}
return __result;
}
@@ -1073,8 +1071,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final String
end_ice_id(AsyncResult __iresult)
{
- IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase) __iresult;
- IceInternal.OutgoingAsyncBase.check(__result, this, __ice_id_name);
+ IceInternal.OutgoingAsync __result = IceInternal.OutgoingAsync.check(__iresult, this, __ice_id_name);
try
{
if(!__result.__wait())
@@ -1444,11 +1441,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
{
__result.prepare(operation, mode, __context, __explicitCtx, __synchronous);
__result.writeParamEncaps(inParams);
- __result.invoke(true);
+ __result.invoke();
}
catch(Exception __ex)
{
- __result.invokeExceptionAsync(__ex);
+ __result.abort(__ex);
}
return __result;
}
@@ -1468,8 +1465,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final boolean
end_ice_invoke(ByteSeqHolder outParams, AsyncResult __iresult)
{
- IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase) __iresult;
- IceInternal.OutgoingAsyncBase.check(__result, this, __ice_invoke_name);
+ IceInternal.OutgoingAsync __result = IceInternal.OutgoingAsync.check(__iresult, this, __ice_invoke_name);
try
{
boolean ok = __result.__wait();
@@ -2427,49 +2423,47 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
private static final String __ice_getConnection_name = "ice_getConnection";
private AsyncResult
- begin_ice_getConnectionInternal(IceInternal.CallbackBase __cb)
+ begin_ice_getConnectionInternal(IceInternal.CallbackBase cb)
{
- IceInternal.GetConnectionOutgoingAsync __result =
- new IceInternal.GetConnectionOutgoingAsync(this, __ice_getConnection_name, __cb);
+ IceInternal.ProxyGetConnection result = new IceInternal.ProxyGetConnection(this, __ice_getConnection_name, cb);
try
{
- __result.__invoke();
+ result.invoke();
}
- catch(Exception __ex)
+ catch(Exception ex)
{
- __result.invokeExceptionAsync(__ex);
+ result.abort(ex);
}
- return __result;
+ return result;
}
@Override
public Ice.Connection
- end_ice_getConnection(AsyncResult __iresult)
+ end_ice_getConnection(AsyncResult r)
{
- IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult;
- IceInternal.OutgoingAsyncBase.check(__result, this, __ice_getConnection_name);
- __result.__wait();
+ IceInternal.ProxyGetConnection result = IceInternal.ProxyGetConnection.check(r, this, __ice_getConnection_name);
+ result.__wait();
return ice_getCachedConnection();
}
- static public final void __ice_getConnection_completed(TwowayCallbackArg1<Ice.Connection> __cb, AsyncResult __result)
+ static public final void __ice_getConnection_completed(TwowayCallbackArg1<Ice.Connection> cb, AsyncResult result)
{
- Ice.Connection __ret = null;
+ Ice.Connection ret = null;
try
{
- __ret = __result.getProxy().end_ice_getConnection(__result);
+ ret = result.getProxy().end_ice_getConnection(result);
}
- catch(LocalException __ex)
+ catch(LocalException ex)
{
- __cb.exception(__ex);
+ cb.exception(ex);
return;
}
- catch(SystemException __ex)
+ catch(SystemException ex)
{
- __cb.exception(__ex);
+ cb.exception(ex);
return;
}
- __cb.response(__ret);
+ cb.response(ret);
}
/**
@@ -2578,28 +2572,26 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
private static final String __ice_flushBatchRequests_name = "ice_flushBatchRequests";
private AsyncResult
- begin_ice_flushBatchRequestsInternal(IceInternal.CallbackBase __cb)
+ begin_ice_flushBatchRequestsInternal(IceInternal.CallbackBase cb)
{
- IceInternal.ProxyBatchOutgoingAsync __result =
- new IceInternal.ProxyBatchOutgoingAsync(this, __ice_flushBatchRequests_name, __cb);
+ IceInternal.ProxyFlushBatch result = new IceInternal.ProxyFlushBatch(this, __ice_flushBatchRequests_name, cb);
try
{
- __result.__invoke();
+ result.invoke();
}
- catch(Exception __ex)
+ catch(Exception ex)
{
- __result.invokeExceptionAsync(__ex);
+ result.abort(ex);
}
- return __result;
+ return result;
}
@Override
public void
- end_ice_flushBatchRequests(AsyncResult __iresult)
+ end_ice_flushBatchRequests(AsyncResult r)
{
- IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult;
- IceInternal.OutgoingAsyncBase.check(__result, this, __ice_flushBatchRequests_name);
- __result.__wait();
+ IceInternal.ProxyFlushBatch result = IceInternal.ProxyFlushBatch.check(r, this, __ice_flushBatchRequests_name);
+ result.__wait();
}
/**
@@ -2721,34 +2713,34 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
}
public final void
- __end(AsyncResult __iresult, String operation)
+ __end(AsyncResult r, String operation)
{
- IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult;
- IceInternal.OutgoingAsyncBase.check(__result, this, operation);
+ IceInternal.ProxyOutgoingAsyncBase result = IceInternal.ProxyOutgoingAsyncBase.check(r, this, operation);
try
{
- boolean ok = __result.__wait();
+ boolean ok = result.__wait();
if(_reference.getMode() == IceInternal.Reference.ModeTwoway)
{
+ IceInternal.OutgoingAsync outAsync = (IceInternal.OutgoingAsync)result;
if(!ok)
{
try
{
- __result.throwUserException();
+ outAsync.throwUserException();
}
- catch(UserException __ex)
+ catch(UserException ex)
{
- throw new UnknownUserException(__ex.ice_name(), __ex);
+ throw new UnknownUserException(ex.ice_name(), ex);
}
}
- __result.readEmptyParams();
+ outAsync.readEmptyParams();
}
}
finally
{
- if(__result != null)
+ if(result != null)
{
- __result.cacheMessageBuffers();
+ result.cacheMessageBuffers();
}
}
}
diff --git a/java/src/IceInternal/AsyncResultI.java b/java/src/IceInternal/AsyncResultI.java
new file mode 100644
index 00000000000..9d7445584fe
--- /dev/null
+++ b/java/src/IceInternal/AsyncResultI.java
@@ -0,0 +1,475 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+import Ice.AsyncResult;
+import Ice.Communicator;
+import Ice.CommunicatorDestroyedException;
+import Ice.Connection;
+
+public class AsyncResultI implements AsyncResult
+{
+ @Override
+ public void cancel()
+ {
+ cancel(new Ice.InvocationCanceledException());
+ }
+
+ @Override
+ public Communicator getCommunicator()
+ {
+ return _communicator;
+ }
+
+ @Override
+ public Connection getConnection()
+ {
+ return null;
+ }
+
+ @Override
+ public Ice.ObjectPrx getProxy()
+ {
+ return null;
+ }
+
+ @Override
+ public final boolean isCompleted()
+ {
+ synchronized(this)
+ {
+ return (_state & StateDone) > 0;
+ }
+ }
+
+ @Override
+ public final void waitForCompleted()
+ {
+ synchronized(this)
+ {
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ while((_state & StateDone) == 0)
+ {
+ try
+ {
+ this.wait();
+ }
+ catch(InterruptedException ex)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ }
+ }
+ }
+
+ @Override
+ public final boolean isSent()
+ {
+ synchronized(this)
+ {
+ return (_state & StateSent) > 0;
+ }
+ }
+
+ @Override
+ public final void waitForSent()
+ {
+ synchronized(this)
+ {
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ while((_state & StateSent) == 0 && _exception == null)
+ {
+ try
+ {
+ this.wait();
+ }
+ catch(InterruptedException ex)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ }
+ }
+ }
+
+ @Override
+ public final void throwLocalException()
+ {
+ synchronized(this)
+ {
+ if(_exception != null)
+ {
+ throw _exception;
+ }
+ }
+ }
+
+ @Override
+ public final boolean sentSynchronously()
+ {
+ return _sentSynchronously; // No lock needed, immutable
+ }
+
+ @Override
+ public final String getOperation()
+ {
+ return _operation;
+ }
+
+ public final void invokeSent()
+ {
+ assert(_callback != null);
+
+ if(_instance.useApplicationClassLoader())
+ {
+ Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader());
+ }
+
+ try
+ {
+ _callback.__sent(this);
+ }
+ catch(RuntimeException ex)
+ {
+ warning(ex);
+ }
+ catch(AssertionError exc)
+ {
+ error(exc);
+ }
+ catch(OutOfMemoryError exc)
+ {
+ error(exc);
+ }
+ finally
+ {
+ if(_instance.useApplicationClassLoader())
+ {
+ Thread.currentThread().setContextClassLoader(null);
+ }
+ }
+
+ if(_observer != null)
+ {
+ Ice.ObjectPrx proxy = getProxy();
+ if(proxy == null || !proxy.ice_isTwoway())
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ }
+ }
+
+ public final void invokeCompleted()
+ {
+ assert(_callback != null);
+
+ if(_instance.useApplicationClassLoader())
+ {
+ Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader());
+ }
+
+ try
+ {
+ _callback.__completed(this);
+ }
+ catch(RuntimeException ex)
+ {
+ warning(ex);
+ }
+ catch(AssertionError exc)
+ {
+ error(exc);
+ }
+ catch(OutOfMemoryError exc)
+ {
+ error(exc);
+ }
+ finally
+ {
+ if(_instance.useApplicationClassLoader())
+ {
+ Thread.currentThread().setContextClassLoader(null);
+ }
+ }
+
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ }
+
+ public final void invokeCompletedAsync()
+ {
+ //
+ // CommunicatorDestroyedCompleted is the only exception that can propagate directly
+ // from this method.
+ //
+ _instance.clientThreadPool().dispatch(new DispatchWorkItem(_cachedConnection)
+ {
+ @Override
+ public void run()
+ {
+ invokeCompleted();
+ }
+ });
+ }
+
+ public void cancelable(final CancellationHandler handler)
+ {
+ synchronized(this)
+ {
+ if(_cancellationException == null)
+ {
+ _cancellationHandler = handler;
+ return;
+ }
+ }
+ handler.asyncRequestCanceled((OutgoingAsyncBase)this, _cancellationException);
+ }
+
+ public final boolean __wait()
+ {
+ try
+ {
+ synchronized(this)
+ {
+ if((_state & StateEndCalled) > 0)
+ {
+ throw new IllegalArgumentException("end_ method called more than once");
+ }
+
+ _state |= StateEndCalled;
+ if(Thread.interrupted())
+ {
+ throw new InterruptedException();
+ }
+ while((_state & StateDone) == 0)
+ {
+ this.wait();
+ }
+
+ if(_exception != null)
+ {
+ throw (Ice.Exception)_exception.fillInStackTrace();
+ }
+
+ return (_state & StateOK) > 0;
+ }
+ }
+ catch(InterruptedException ex)
+ {
+ Ice.OperationInterruptedException exc = new Ice.OperationInterruptedException();
+ cancel(exc); // Must be called outside the synchronization
+ throw exc;
+ }
+ }
+
+ public void cacheMessageBuffers()
+ {
+ }
+
+ protected AsyncResultI(Communicator communicator, Instance instance, String op, CallbackBase del)
+ {
+ _communicator = communicator;
+ _instance = instance;
+ _operation = op;
+ _state = 0;
+ _sentSynchronously = false;
+ _exception = null;
+ _callback = del;
+ }
+
+ protected boolean sent(boolean done)
+ {
+ synchronized(this)
+ {
+ assert(_exception == null);
+
+ boolean alreadySent = (_state & StateSent) != 0;
+ _state |= StateSent;
+ if(done)
+ {
+ _state |= StateDone | StateOK;
+ _cancellationHandler = null;
+ if(_observer != null && (_callback == null || !_callback.__hasSentCallback()))
+ {
+ _observer.detach();
+ _observer = null;
+ }
+
+ //
+ // For oneway requests after the data has been sent
+ // the buffers can be reused unless this is a
+ // collocated invocation. For collocated invocations
+ // the buffer won't be reused because it has already
+ // been marked as cached in invokeCollocated.
+ //
+ cacheMessageBuffers();
+ }
+ this.notifyAll();
+ return !alreadySent && _callback != null && _callback.__hasSentCallback();
+ }
+ }
+
+ protected boolean finished(boolean ok)
+ {
+ synchronized(this)
+ {
+ _state |= StateDone;
+ if(ok)
+ {
+ _state |= StateOK;
+ }
+ _cancellationHandler = null;
+ if(_callback == null)
+ {
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ }
+ this.notifyAll();
+ return _callback != null;
+ }
+ }
+
+ protected boolean finished(Ice.Exception ex)
+ {
+ synchronized(this)
+ {
+ _state |= StateDone;
+ _exception = ex;
+ _cancellationHandler = null;
+ if(_observer != null)
+ {
+ _observer.failed(ex.ice_name());
+ }
+ if(_callback == null)
+ {
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ }
+ this.notifyAll();
+ return _callback != null;
+ }
+ }
+
+ protected final void invokeSentAsync()
+ {
+ //
+ // This is called when it's not safe to call the sent callback
+ // synchronously from this thread. Instead the exception callback
+ // is called asynchronously from the client thread pool.
+ //
+ try
+ {
+ _instance.clientThreadPool().dispatch(new DispatchWorkItem(_cachedConnection)
+ {
+ @Override
+ public void run()
+ {
+ invokeSent();
+ }
+ });
+ }
+ catch(CommunicatorDestroyedException exc)
+ {
+ }
+ }
+
+ protected void cancel(Ice.LocalException ex)
+ {
+ synchronized(this)
+ {
+ _cancellationException = ex;
+ if(_cancellationHandler == null)
+ {
+ return;
+ }
+ }
+ _cancellationHandler.asyncRequestCanceled((OutgoingAsyncBase)this, ex);
+ }
+
+ protected void checkCanceled()
+ {
+ synchronized(this)
+ {
+ if(_cancellationException != null)
+ {
+ throw _cancellationException;
+ }
+ }
+ }
+
+ protected Ice.Instrumentation.InvocationObserver getObserver()
+ {
+ return _observer;
+ }
+
+ protected static void check(AsyncResult r, String operation)
+ {
+ if(r == null)
+ {
+ throw new IllegalArgumentException("AsyncResult == null");
+ }
+ else if(r.getOperation() != operation) // Do NOT use equals() here - we are comparing reference equality
+ {
+ throw new IllegalArgumentException("Incorrect operation for end_" + operation + " method: " +
+ r.getOperation());
+ }
+ }
+
+ private final void warning(RuntimeException ex)
+ {
+ if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ String s = "exception raised by AMI callback:\n" + Ex.toString(ex);
+ _instance.initializationData().logger.warning(s);
+ }
+ }
+
+ private final void error(Error error)
+ {
+ String s = "error raised by AMI callback:\n" + Ex.toString(error);
+ _instance.initializationData().logger.error(s);
+ }
+
+ protected final Instance _instance;
+ protected Ice.Instrumentation.InvocationObserver _observer;
+ protected Connection _cachedConnection;
+ protected boolean _sentSynchronously;
+
+ private final Communicator _communicator;
+ private final String _operation;
+ private final CallbackBase _callback;
+
+ private Ice.Exception _exception;
+
+ private CancellationHandler _cancellationHandler;
+ private Ice.LocalException _cancellationException;
+
+ protected static final byte StateOK = 0x1;
+ protected static final byte StateDone = 0x2;
+ protected static final byte StateSent = 0x4;
+ protected static final byte StateEndCalled = 0x8;
+ protected static final byte StateCachedBuffers = 0x10;
+ protected byte _state;
+}
diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java
deleted file mode 100644
index ff8ba712185..00000000000
--- a/java/src/IceInternal/BatchOutgoingAsync.java
+++ /dev/null
@@ -1,117 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package IceInternal;
-
-abstract public class BatchOutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback
-{
- BatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, CallbackBase callback)
- {
- super(communicator, instance, operation, callback);
- }
-
- @Override
- public int
- send(Ice.ConnectionI connection, boolean compress, boolean response)
- {
- _cachedConnection = connection;
- return connection.flushAsyncBatchRequests(this);
- }
-
- @Override
- public int
- invokeCollocated(CollocatedRequestHandler handler)
- {
- return handler.invokeAsyncBatchRequests(this);
- }
-
- @Override
- public boolean
- sent()
- {
- synchronized(_monitor)
- {
- _state |= StateDone | StateOK | StateSent;
- //_os.resize(0, false); // Don't clear the buffer now, it's needed for the collocation optimization
- if(_childObserver != null)
- {
- _childObserver.detach();
- _childObserver = null;
- }
- if(_timeoutRequestHandler != null)
- {
- _future.cancel(false);
- _future = null;
- _timeoutRequestHandler = null;
- }
- _monitor.notifyAll();
-
- if(_callback == null || !_callback.__hasSentCallback())
- {
- if(_observer != null)
- {
- _observer.detach();
- _observer = null;
- }
- return false;
- }
- return true;
- }
- }
-
- @Override
- public void
- invokeSent()
- {
- invokeSentInternal();
- }
-
- @Override
- public void
- finished(Ice.Exception exc)
- {
- synchronized(_monitor)
- {
- if(_childObserver != null)
- {
- _childObserver.failed(exc.ice_name());
- _childObserver.detach();
- _childObserver = null;
- }
- if(_timeoutRequestHandler != null)
- {
- _future.cancel(false);
- _future = null;
- _timeoutRequestHandler = null;
- }
- }
- invokeException(exc);
- }
-
- @Override
- public void
- processRetry()
- {
- assert(false); // Retries are never scheduled
- }
-
- @Override
- public void
- dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
- {
- threadPool.dispatch(new DispatchWorkItem(connection)
- {
- @Override
- public void run()
- {
- BatchOutgoingAsync.this.finished(ex);
- }
- });
- }
-}
diff --git a/java/src/IceInternal/CancellationHandler.java b/java/src/IceInternal/CancellationHandler.java
new file mode 100644
index 00000000000..0182f403cb4
--- /dev/null
+++ b/java/src/IceInternal/CancellationHandler.java
@@ -0,0 +1,15 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+public interface CancellationHandler
+{
+ void asyncRequestCanceled(OutgoingAsyncBase outAsync, Ice.LocalException ex);
+}
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java
index 8de4a93ebc4..48d7bfa5b7d 100644
--- a/java/src/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/IceInternal/CollocatedRequestHandler.java
@@ -13,8 +13,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
private class InvokeAllAsync extends DispatchWorkItem
{
- private InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum,
- boolean batch)
+ private InvokeAllAsync(OutgoingAsyncBase outAsync, BasicStream os, int requestId, int invokeNum, boolean batch)
{
_outAsync = outAsync;
_os = os;
@@ -32,7 +31,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
}
- private final OutgoingAsyncMessageCallback _outAsync;
+ private final OutgoingAsyncBase _outAsync;
private BasicStream _os;
private final int _requestId;
private final int _invokeNum;
@@ -186,14 +185,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
@Override
public int
- sendAsyncRequest(OutgoingAsyncMessageCallback outAsync)
+ sendAsyncRequest(OutgoingAsyncBase outAsync)
{
return outAsync.invokeCollocated(this);
}
@Override
- synchronized public boolean
- asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex)
+ synchronized public void
+ asyncRequestCanceled(OutgoingAsyncBase outAsync, Ice.LocalException ex)
{
Integer requestId = _sendAsyncRequests.get(outAsync);
if(requestId != null)
@@ -203,8 +202,11 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
_asyncRequests.remove(requestId);
}
_sendAsyncRequests.remove(outAsync);
- outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
- return true; // We're done
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
+ return;
}
if(outAsync instanceof OutgoingAsync)
@@ -216,12 +218,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(e.getValue() == o)
{
_asyncRequests.remove(e.getKey());
- outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
- return true; // We're done
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
+ return;
}
}
}
- return false;
}
@Override
@@ -242,9 +246,13 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
outAsync = _asyncRequests.remove(requestId);
+ if(outAsync != null && !outAsync.completed(os))
+ {
+ outAsync = null;
+ }
}
- if(outAsync != null && outAsync.finished(os))
+ if(outAsync != null)
{
outAsync.invokeCompleted();
}
@@ -271,18 +279,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
public void
invokeException(int requestId, Ice.LocalException ex, int invokeNum)
{
- if(requestId > 0)
- {
- OutgoingAsync outAsync = null;
- synchronized(this)
- {
- outAsync = _asyncRequests.remove(requestId);
- }
- if(outAsync != null)
- {
- outAsync.finished(ex);
- }
- }
+ handleException(requestId, ex);
_adapter.decDirectCount();
}
@@ -307,7 +304,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
return null;
}
- void invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous)
+ int invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous)
{
int requestId = 0;
if((_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) || _response)
@@ -323,6 +320,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
_sendAsyncRequests.put(outAsync, requestId);
}
+ outAsync.cancelable(this);
}
}
@@ -355,9 +353,10 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
_adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false));
}
+ return AsyncStatus.Queued;
}
- int invokeAsyncBatchRequests(BatchOutgoingAsync outAsync)
+ int invokeAsyncBatchRequests(OutgoingAsyncBase outAsync)
{
int invokeNum;
synchronized(this)
@@ -370,6 +369,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0)
{
_sendAsyncRequests.put(outAsync, 0);
+ outAsync.cancelable(this);
}
assert(!_batchStream.isEmpty());
@@ -404,7 +404,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
private boolean
- sentAsync(final OutgoingAsyncMessageCallback outAsync)
+ sentAsync(final OutgoingAsyncBase outAsync)
{
if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0)
{
@@ -511,10 +511,15 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
synchronized(this)
{
outAsync = _asyncRequests.remove(requestId);
+ if(outAsync != null && !outAsync.completed(ex))
+ {
+ outAsync = null;
+ }
}
+
if(outAsync != null)
{
- outAsync.finished(ex);
+ outAsync.invokeCompleted();
}
}
@@ -567,8 +572,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
// A map of outstanding requests that can be canceled. A request
// can be canceled if it has an invocation timeout, or we support
// interrupts.
- private java.util.Map<OutgoingAsyncMessageCallback, Integer> _sendAsyncRequests =
- new java.util.HashMap<OutgoingAsyncMessageCallback, Integer>();
+ private java.util.Map<OutgoingAsyncBase, Integer> _sendAsyncRequests =
+ new java.util.HashMap<OutgoingAsyncBase, Integer>();
private java.util.Map<Integer, OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, OutgoingAsync>();
diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorFlushBatch.java
index ab1854cbce1..19e55ecc91c 100644
--- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
+++ b/java/src/IceInternal/CommunicatorFlushBatch.java
@@ -16,12 +16,29 @@ import java.util.concurrent.RejectedExecutionException;
import Ice.CommunicatorDestroyedException;
-public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
+public class CommunicatorFlushBatch extends IceInternal.AsyncResultI
{
- public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation,
- CallbackBase callback)
+ public static CommunicatorFlushBatch check(Ice.AsyncResult r, Ice.Communicator com, String operation)
{
- super(communicator, instance, operation, callback);
+ check(r, operation);
+ if(!(r instanceof CommunicatorFlushBatch))
+ {
+ throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method");
+ }
+ if(r.getCommunicator() != com)
+ {
+ throw new IllegalArgumentException("Communicator for call to end_" + operation +
+ " does not match communicator that was used to call corresponding " +
+ "begin_" + operation + " method");
+ }
+ return (CommunicatorFlushBatch)r;
+ }
+
+ public CommunicatorFlushBatch(Ice.Communicator communicator, Instance instance, String op, CallbackBase callback)
+ {
+ super(communicator, instance, op, callback);
+
+ _observer = ObserverHelper.get(instance, op);
//
// _useCount is initialized to 1 to prevent premature callbacks.
@@ -29,34 +46,22 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
// been initiated.
//
_useCount = 1;
-
- //
- // Assume all connections are flushed synchronously.
- //
- _sentSynchronously = true;
-
- //
- // Attach observer
- //
- _observer = ObserverHelper.get(instance, operation);
}
public void flushConnection(final Ice.ConnectionI con)
{
- class BatchOutgoingAsyncI extends BatchOutgoingAsync
+ class FlushBatch extends OutgoingAsyncBase
{
- public
- BatchOutgoingAsyncI()
+ public FlushBatch()
{
- super(CommunicatorBatchOutgoingAsync.this._communicator,
- CommunicatorBatchOutgoingAsync.this._instance,
- CommunicatorBatchOutgoingAsync.this._operation,
+ super(CommunicatorFlushBatch.this.getCommunicator(),
+ CommunicatorFlushBatch.this._instance,
+ CommunicatorFlushBatch.this.getOperation(),
null);
}
@Override
- public boolean
- sent()
+ public boolean sent()
{
if(_childObserver != null)
{
@@ -69,8 +74,7 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
// TODO: MJN: This is missing a test.
@Override
- public void
- finished(Ice.Exception ex)
+ public boolean completed(Ice.Exception ex)
{
if(_childObserver != null)
{
@@ -79,37 +83,23 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
_childObserver = null;
}
doCheck(false);
+ return false;
}
- @Override
- public void
- attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size)
- {
- if(CommunicatorBatchOutgoingAsync.this._observer != null)
- {
- _childObserver = CommunicatorBatchOutgoingAsync.this._observer.getRemoteObserver(info, endpt,
- requestId, size);
- if(_childObserver != null)
- {
- _childObserver.attach();
- }
- }
- }
-
- @Override
- protected void cancelRequest()
+ @Override
+ protected Ice.Instrumentation.InvocationObserver getObserver()
{
+ return CommunicatorFlushBatch.this._observer;
}
}
- synchronized(_monitor)
+ synchronized(this)
{
++_useCount;
}
try
{
- int status;
if(_instance.queueRequests())
{
Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>()
@@ -117,7 +107,7 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
@Override
public Integer call() throws RetryException
{
- return con.flushAsyncBatchRequests(new BatchOutgoingAsyncI());
+ return con.flushAsyncBatchRequests(new FlushBatch());
}
});
@@ -126,7 +116,7 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
{
try
{
- status = future.get();
+ future.get();
if(interrupted)
{
Thread.currentThread().interrupt();
@@ -160,11 +150,7 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
}
else
{
- status = con.flushAsyncBatchRequests(new BatchOutgoingAsyncI());
- }
- if((status & AsyncStatus.Sent) > 0)
- {
- _sentSynchronously = false;
+ con.flushAsyncBatchRequests(new FlushBatch());
}
}
catch(Ice.LocalException ex)
@@ -181,53 +167,28 @@ public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
private void doCheck(boolean userThread)
{
- synchronized(_monitor)
+ synchronized(this)
{
assert(_useCount > 0);
if(--_useCount > 0)
{
return;
}
- _state |= StateDone | StateOK | StateSent;
- _os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation
- _monitor.notifyAll();
}
- if(_callback == null || !_callback.__hasSentCallback())
- {
- if(_observer != null)
- {
- _observer.detach();
- _observer = null;
- }
- }
- else
+ if(sent(true))
{
- //
- // sentSynchronously_ is immutable here.
- //
- if(!_sentSynchronously || !userThread)
+ if(userThread)
{
- invokeSentAsync();
+ _sentSynchronously = true;
+ invokeSent();
}
else
{
- invokeSentInternal();
+ invokeSentAsync();
}
}
}
- @Override
- public void
- processRetry()
- {
- assert(false); // Retries are never scheduled
- }
-
- @Override
- protected void cancelRequest()
- {
- }
-
private int _useCount;
}
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 91957af1a0c..e4c1e6b1727 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -22,12 +22,12 @@ public class ConnectRequestHandler
this.os.swap(os);
}
- Request(OutgoingAsyncMessageCallback out)
+ Request(OutgoingAsyncBase out)
{
this.outAsync = out;
}
- OutgoingAsyncMessageCallback outAsync = null;
+ OutgoingAsyncBase outAsync = null;
BasicStream os = null;
}
@@ -149,7 +149,7 @@ public class ConnectRequestHandler
@Override
public int
- sendAsyncRequest(OutgoingAsyncMessageCallback out)
+ sendAsyncRequest(OutgoingAsyncBase out)
throws RetryException
{
synchronized(this)
@@ -159,6 +159,7 @@ public class ConnectRequestHandler
if(!initialized())
{
_requests.add(new Request(out));
+ out.cancelable(this);
return AsyncStatus.Queued;
}
}
@@ -171,14 +172,14 @@ public class ConnectRequestHandler
}
@Override
- public boolean
- asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex)
+ public void
+ asyncRequestCanceled(OutgoingAsyncBase outAsync, Ice.LocalException ex)
{
synchronized(this)
{
if(_exception != null)
{
- return false; // The request has been notified of a failure already.
+ return; // The request has been notified of a failure already.
}
if(!initialized())
@@ -190,14 +191,17 @@ public class ConnectRequestHandler
if(request.outAsync == outAsync)
{
it.remove();
- outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
- return true; // We're done
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
+ return;
}
}
assert(false); // The request has to be queued if it timed out and we're not initialized yet.
}
}
- return _connection.asyncRequestCanceled(outAsync, ex);
+ _connection.asyncRequestCanceled(outAsync, ex);
}
@Override
@@ -394,8 +398,7 @@ public class ConnectRequestHandler
_flushing = true;
}
- final java.util.List<OutgoingAsyncMessageCallback> sentCallbacks =
- new java.util.ArrayList<OutgoingAsyncMessageCallback>();
+ final java.util.List<OutgoingAsyncBase> sentCallbacks = new java.util.ArrayList<OutgoingAsyncBase>();
try
{
java.util.Iterator<Request> p = _requests.iterator(); // _requests is immutable when _flushing = true
@@ -476,7 +479,7 @@ public class ConnectRequestHandler
@Override
public void run()
{
- for(OutgoingAsyncMessageCallback callback : sentCallbacks)
+ for(OutgoingAsyncBase callback : sentCallbacks)
{
callback.invokeSent();
}
@@ -546,7 +549,10 @@ public class ConnectRequestHandler
{
if(request.outAsync != null)
{
- request.outAsync.finished(_exception);
+ if(request.outAsync.completed(_exception))
+ {
+ request.outAsync.invokeCompleted();
+ }
}
}
_requests.clear();
diff --git a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
deleted file mode 100644
index 5a1f0a30886..00000000000
--- a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
+++ /dev/null
@@ -1,107 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package IceInternal;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-
-import Ice.CommunicatorDestroyedException;
-
-public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync
-{
- public ConnectionBatchOutgoingAsync(Ice.ConnectionI con, Ice.Communicator communicator, Instance instance,
- String operation, CallbackBase callback)
- {
- super(communicator, instance, operation, callback);
- _connection = con;
- }
-
- public void __invoke()
- {
- int status;
- if(_instance.queueRequests())
- {
- Future<Integer> future = _instance.getQueueExecutor().submit(new Callable<Integer>()
- {
- @Override
- public Integer call() throws RetryException
- {
- return _connection.flushAsyncBatchRequests(ConnectionBatchOutgoingAsync.this);
- }
- });
-
- boolean interrupted = false;
- while(true)
- {
- try
- {
- status = future.get();
- if(interrupted)
- {
- Thread.currentThread().interrupt();
- }
- break;
- }
- catch(InterruptedException ex)
- {
- interrupted = true;
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch(ExecutionException e)
- {
- try
- {
- throw e.getCause();
- }
- catch(RuntimeException ex)
- {
- throw ex;
- }
- catch(Throwable ex)
- {
- assert(false);
- }
- }
- }
- }
- else
- {
- status = _connection.flushAsyncBatchRequests(this);
- }
-
- if((status & AsyncStatus.Sent) > 0)
- {
- _sentSynchronously = true;
- if((status & AsyncStatus.InvokeSentCallback) > 0)
- {
- invokeSent();
- }
- }
- }
-
- @Override
- public Ice.Connection getConnection()
- {
- return _connection;
- }
-
- @Override
- protected void cancelRequest()
- {
- _connection.asyncRequestCanceled(this, new Ice.OperationInterruptedException());
- }
-
- private Ice.ConnectionI _connection;
-}
diff --git a/java/src/IceInternal/ConnectionFlushBatch.java b/java/src/IceInternal/ConnectionFlushBatch.java
new file mode 100644
index 00000000000..4b3da0bcb5e
--- /dev/null
+++ b/java/src/IceInternal/ConnectionFlushBatch.java
@@ -0,0 +1,128 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+
+import Ice.CommunicatorDestroyedException;
+
+public class ConnectionFlushBatch extends OutgoingAsyncBase
+{
+ public static ConnectionFlushBatch check(Ice.AsyncResult r, Ice.Connection con, String operation)
+ {
+ check(r, operation);
+ if(!(r instanceof ConnectionFlushBatch))
+ {
+ throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method");
+ }
+ if(r.getConnection() != con)
+ {
+ throw new IllegalArgumentException("Connection for call to end_" + operation +
+ " does not match connection that was used to call corresponding " +
+ "begin_" + operation + " method");
+ }
+ return (ConnectionFlushBatch)r;
+ }
+
+ public ConnectionFlushBatch(Ice.ConnectionI con, Ice.Communicator communicator, Instance instance,
+ String operation, CallbackBase callback)
+ {
+ super(communicator, instance, operation, callback);
+ _connection = con;
+ }
+
+ @Override
+ public Ice.Connection getConnection()
+ {
+ return _connection;
+ }
+
+ public void invoke()
+ {
+ try
+ {
+ int status;
+ if(_instance.queueRequests())
+ {
+ Future<Integer> future = _instance.getQueueExecutor().submit(
+ new Callable<Integer>()
+ {
+ @Override
+ public Integer call() throws RetryException
+ {
+ return _connection.flushAsyncBatchRequests(ConnectionFlushBatch.this);
+ }
+ });
+
+ boolean interrupted = false;
+ while(true)
+ {
+ try
+ {
+ status = future.get();
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ break;
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch(ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RuntimeException ex)
+ {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ assert(false);
+ }
+ }
+ }
+ }
+ else
+ {
+ status = _connection.flushAsyncBatchRequests(this);
+ }
+
+ if((status & AsyncStatus.Sent) > 0)
+ {
+ _sentSynchronously = true;
+ if((status & AsyncStatus.InvokeSentCallback) > 0)
+ {
+ invokeSent();
+ }
+ }
+ }
+ catch(Ice.Exception ex)
+ {
+ if(completed(ex))
+ {
+ invokeCompletedAsync();
+ }
+ }
+ }
+
+ private Ice.ConnectionI _connection;
+}
diff --git a/java/src/IceInternal/ConnectionRequestHandler.java b/java/src/IceInternal/ConnectionRequestHandler.java
index f7ca7bb5b29..924b633e670 100644
--- a/java/src/IceInternal/ConnectionRequestHandler.java
+++ b/java/src/IceInternal/ConnectionRequestHandler.java
@@ -69,17 +69,17 @@ public class ConnectionRequestHandler implements RequestHandler
}
@Override
- public int sendAsyncRequest(OutgoingAsyncMessageCallback out)
+ public int sendAsyncRequest(OutgoingAsyncBase out)
throws RetryException
{
return out.send(_connection, _compress, _response);
}
@Override
- public boolean
- asyncRequestCanceled(OutgoingAsyncMessageCallback outgoingAsync, Ice.LocalException ex)
+ public void
+ asyncRequestCanceled(OutgoingAsyncBase outgoingAsync, Ice.LocalException ex)
{
- return _connection.asyncRequestCanceled(outgoingAsync, ex);
+ _connection.asyncRequestCanceled(outgoingAsync, ex);
}
@Override
@@ -103,8 +103,8 @@ public class ConnectionRequestHandler implements RequestHandler
return _connection;
}
- public ConnectionRequestHandler(Reference ref, Ice.ConnectionI connection,
- boolean compress) {
+ public ConnectionRequestHandler(Reference ref, Ice.ConnectionI connection, boolean compress)
+ {
_reference = ref;
_response = _reference.getMode() == Reference.ModeTwoway;
_connection = connection;
diff --git a/java/src/IceInternal/GetConnectionOutgoingAsync.java b/java/src/IceInternal/GetConnectionOutgoingAsync.java
deleted file mode 100644
index 55653630ca2..00000000000
--- a/java/src/IceInternal/GetConnectionOutgoingAsync.java
+++ /dev/null
@@ -1,148 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package IceInternal;
-
-public class GetConnectionOutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback
-{
- public GetConnectionOutgoingAsync(Ice.ObjectPrxHelperBase prx, String operation, CallbackBase cb)
- {
- super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb);
- _proxy = (Ice.ObjectPrxHelperBase) prx;
- _cnt = 0;
- _observer = ObserverHelper.get(prx, operation);
- }
-
- public void __invoke()
- {
- while(true)
- {
- try
- {
- _handler = _proxy.__getRequestHandler();
- _handler.sendAsyncRequest(this);
- }
- catch(RetryException ex)
- {
- _proxy.__setRequestHandler(_handler, null);
- }
- catch(Ice.Exception ex)
- {
- handleException(ex);
- }
- break;
- }
- }
-
- @Override
- public Ice.ObjectPrx getProxy()
- {
- return _proxy;
- }
-
- @Override
- public int send(Ice.ConnectionI conection, boolean compress, boolean response)
- throws RetryException
- {
- sent();
- return 0;
- }
-
- @Override
- public int invokeCollocated(CollocatedRequestHandler handler)
- {
- sent();
- return 0;
- }
-
- @Override
- public boolean sent()
- {
- synchronized(_monitor)
- {
- _state |= StateDone;
- _monitor.notifyAll();
- }
- invokeCompleted();
- return false;
- }
-
- @Override
- public void invokeSent()
- {
- // No sent callback
- }
-
- @Override
- public void finished(Ice.Exception exc)
- {
- try
- {
- handleException(exc);
- }
- catch(Ice.Exception ex)
- {
- invokeExceptionAsync(ex);
- }
- }
-
- @Override
- void processRetry()
- {
- __invoke();
- }
-
- @Override
- public void dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
- {
- threadPool.dispatch(new DispatchWorkItem(connection)
- {
- @Override
- public void run()
- {
- GetConnectionOutgoingAsync.this.finished(ex);
- }
- });
- }
-
- @Override
- protected void cancelRequest()
- {
- if(_handler != null)
- {
- _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException());
- }
- }
-
- private void handleException(Ice.Exception exc)
- {
- try
- {
- Ice.Holder<Integer> interval = new Ice.Holder<Integer>();
- _cnt = _proxy.__handleException(exc, _handler, Ice.OperationMode.Idempotent, false, interval, _cnt);
- if(_observer != null)
- {
- _observer.retried(); // Invocation is being retried
- }
- _instance.retryQueue().add(this, interval.value);
- }
- catch(Ice.Exception ex)
- {
- if(_observer != null)
- {
- _observer.failed(ex.ice_name());
- }
- throw ex;
- }
- }
-
- private Ice.ObjectPrxHelperBase _proxy;
- private RequestHandler _handler = null;
- private int _cnt;
-}
diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java
index 89dcf9b44ed..31306a14ca6 100644
--- a/java/src/IceInternal/IncomingConnectionFactory.java
+++ b/java/src/IceInternal/IncomingConnectionFactory.java
@@ -170,7 +170,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
}
public void
- flushAsyncBatchRequests(CommunicatorBatchOutgoingAsync outAsync)
+ flushAsyncBatchRequests(CommunicatorFlushBatch outAsync)
{
for(Ice.ConnectionI c : connections()) // connections() is synchronized, no need to synchronize here.
{
diff --git a/java/src/IceInternal/ObjectAdapterFactory.java b/java/src/IceInternal/ObjectAdapterFactory.java
index 63c3e592669..a723537cd7e 100644
--- a/java/src/IceInternal/ObjectAdapterFactory.java
+++ b/java/src/IceInternal/ObjectAdapterFactory.java
@@ -218,7 +218,7 @@ public final class ObjectAdapterFactory
}
public void
- flushAsyncBatchRequests(CommunicatorBatchOutgoingAsync outAsync)
+ flushAsyncBatchRequests(CommunicatorFlushBatch outAsync)
{
java.util.List<Ice.ObjectAdapterI> adapters;
synchronized(this)
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index ef4d3d7c959..e42c817b42c 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -9,41 +9,47 @@
package IceInternal;
-public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback
+public class OutgoingAsync extends ProxyOutgoingAsyncBase
{
+ public static OutgoingAsync check(Ice.AsyncResult r, Ice.ObjectPrx prx, String operation)
+ {
+ ProxyOutgoingAsyncBase.checkImpl(r, prx, operation);
+ try
+ {
+ return (OutgoingAsync)r;
+ }
+ catch(ClassCastException ex)
+ {
+ throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method");
+ }
+ }
+
public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb)
{
- super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb);
- _proxy = (Ice.ObjectPrxHelperBase) prx;
+ super((Ice.ObjectPrxHelperBase)prx, operation, cb);
_encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding());
+ _is = null;
}
- public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, IceInternal.BasicStream is,
- IceInternal.BasicStream os)
+ public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, BasicStream is, BasicStream os)
{
- super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb,
- is, os);
- _proxy = (Ice.ObjectPrxHelperBase) prx;
+ super((Ice.ObjectPrxHelperBase)prx, operation, cb, os);
_encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding());
+ _is = is;
}
public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx,
boolean explicitCtx, boolean synchronous)
{
- _handler = null;
- _cnt = 0;
- _sent = false;
+ Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol()));
+
_mode = mode;
- _sentSynchronously = false;
_synchronous = synchronous;
- Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol()));
-
if(explicitCtx && ctx == null)
{
ctx = _emptyContext;
}
-
_observer = ObserverHelper.get(_proxy, operation, ctx);
switch(_proxy.__reference().getMode())
@@ -137,12 +143,6 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes
}
@Override
- public Ice.ObjectPrx getProxy()
- {
- return _proxy;
- }
-
- @Override
public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException
{
_cachedConnection = connection;
@@ -158,117 +158,52 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes
// Disable caching by marking the streams as cached!
_state |= StateCachedBuffers;
}
- handler.invokeAsyncRequest(this, _synchronous);
- return AsyncStatus.Queued;
+ return handler.invokeAsyncRequest(this, _synchronous);
}
@Override
- public boolean sent()
+ public void abort(Ice.Exception ex)
{
- synchronized(_monitor)
+ int mode = _proxy.__reference().getMode();
+ if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
{
- boolean alreadySent = (_state & StateSent) != 0;
- _state |= StateSent;
- _sent = true;
-
- assert ((_state & StateDone) == 0);
-
- if(!_proxy.ice_isTwoway())
+ if(_handler != null)
{
- if(_childObserver != null)
- {
- _childObserver.detach();
- _childObserver = null;
- }
- if(_observer != null && (_callback == null || !_callback.__hasSentCallback()))
- {
- _observer.detach();
- _observer = null;
- }
- if(_timeoutRequestHandler != null)
- {
- _future.cancel(false);
- _future = null;
- _timeoutRequestHandler = null;
- }
- _state |= StateDone | StateOK;
- // _os.resize(0, false); // Don't clear the buffer now, it's
- // needed for the collocation optimization
-
- // For oneway requests after the data has been sent the buffers
- // can be reused unless this is a collocated invocation. For
- // collocated invocations the buffer won't be reused as the
- // because it has already been marked as cached in
- // invokeCollocated.
- cacheMessageBuffers();
+ //
+ // If we didn't finish a batch oneway or datagram request, we
+ // must notify the connection about that we give up ownership
+ // of the batch stream.
+ //
+ _handler.abortBatchRequest();
}
- _monitor.notifyAll();
-
- // Don't call the sent call is already sent.
- return !alreadySent && _callback != null && _callback.__hasSentCallback();
}
- }
- @Override
- public void invokeSent()
- {
- invokeSentInternal();
+ super.abort(ex);
}
- @Override
- public void finished(Ice.Exception exc)
+ public void invoke()
{
- synchronized(_monitor)
+ int mode = _proxy.__reference().getMode();
+ if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
{
- assert ((_state & StateDone) == 0);
- if(_childObserver != null)
- {
- _childObserver.failed(exc.ice_name());
- _childObserver.detach();
- _childObserver = null;
- }
- if(_timeoutRequestHandler != null)
+ if(_handler != null)
{
- _future.cancel(false);
- _future = null;
- _timeoutRequestHandler = null;
+ _sentSynchronously = true;
+ _handler.finishBatchRequest(_os);
+ finished(true);
}
+ return; // Don't call sent/completed callback for batch AMI requests
}
//
- // NOTE: at this point, synchronization isn't needed, no other threads
- // should be calling on the callback.
+ // NOTE: invokeImpl doesn't throw so this can be called from the
+ // try block with the catch block calling abort() in case of an
+ // exception.
//
- try
- {
- handleException(exc);
- }
- catch(Ice.Exception ex)
- {
- invokeException(ex);
- }
- }
-
- @Override
- void processRetry()
- {
- invoke(false);
- }
-
- @Override
- public void dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
- {
- threadPool.dispatch(new DispatchWorkItem(connection)
- {
- @Override
- public void run()
- {
- OutgoingAsync.this.finished(ex);
- }
- });
+ invokeImpl(true); // userThread = true
}
- public final boolean finished(BasicStream is)
+ public final boolean completed(BasicStream is)
{
//
// NOTE: this method is called from ConnectionI.parseMessage
@@ -276,291 +211,153 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes
// any user callbacks.
//
- assert (_proxy.ice_isTwoway()); // Can only be called for twoways.
-
+ assert(_proxy.ice_isTwoway()); // Can only be called for twoways.
+
+ if(_childObserver != null)
+ {
+ _childObserver.reply(is.size() - Protocol.headerSize - 4);
+ _childObserver.detach();
+ _childObserver = null;
+ }
+
byte replyStatus;
try
{
- synchronized(_monitor)
+ // _is can already be initialized if the invocation is retried
+ if(_is == null)
+ {
+ _is = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
+ }
+ _is.swap(is);
+ replyStatus = _is.readByte();
+
+ switch(replyStatus)
+ {
+ case ReplyStatus.replyOK:
+ {
+ break;
+ }
+
+ case ReplyStatus.replyUserException:
{
- assert (_exception == null && (_state & StateDone) == 0);
- if(_childObserver != null)
+ if(_observer != null)
{
- _childObserver.reply(is.size() - Protocol.headerSize - 4);
- _childObserver.detach();
- _childObserver = null;
+ _observer.userException();
}
-
- if(_timeoutRequestHandler != null)
+ break;
+ }
+
+ case ReplyStatus.replyObjectNotExist:
+ case ReplyStatus.replyFacetNotExist:
+ case ReplyStatus.replyOperationNotExist:
+ {
+ Ice.Identity id = new Ice.Identity();
+ id.__read(_is);
+
+ //
+ // For compatibility with the old FacetPath.
+ //
+ String[] facetPath = _is.readStringSeq();
+ String facet;
+ if(facetPath.length > 0)
{
- _future.cancel(false);
- _future = null;
- _timeoutRequestHandler = null;
+ if(facetPath.length > 1)
+ {
+ throw new Ice.MarshalException();
+ }
+ facet = facetPath[0];
}
-
- // _is can already be initialized if the invocation is retried
- if(_is == null)
+ else
{
- _is = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
+ facet = "";
}
- _is.swap(is);
- replyStatus = _is.readByte();
-
+
+ String operation = _is.readString();
+
+ Ice.RequestFailedException ex = null;
switch(replyStatus)
{
- case ReplyStatus.replyOK:
- {
- break;
- }
-
- case ReplyStatus.replyUserException:
- {
- if(_observer != null)
- {
- _observer.userException();
- }
- break;
- }
-
- case ReplyStatus.replyObjectNotExist:
- case ReplyStatus.replyFacetNotExist:
- case ReplyStatus.replyOperationNotExist:
- {
- Ice.Identity id = new Ice.Identity();
- id.__read(_is);
-
- //
- // For compatibility with the old FacetPath.
- //
- String[] facetPath = _is.readStringSeq();
- String facet;
- if(facetPath.length > 0)
- {
- if(facetPath.length > 1)
- {
- throw new Ice.MarshalException();
- }
- facet = facetPath[0];
- }
- else
- {
- facet = "";
- }
-
- String operation = _is.readString();
-
- Ice.RequestFailedException ex = null;
- switch(replyStatus)
- {
- case ReplyStatus.replyObjectNotExist:
- {
- ex = new Ice.ObjectNotExistException();
- break;
- }
-
- case ReplyStatus.replyFacetNotExist:
- {
- ex = new Ice.FacetNotExistException();
- break;
- }
-
- case ReplyStatus.replyOperationNotExist:
- {
- ex = new Ice.OperationNotExistException();
- break;
- }
-
- default:
- {
- assert (false);
- break;
- }
- }
-
- ex.id = id;
- ex.facet = facet;
- ex.operation = operation;
- throw ex;
- }
-
- case ReplyStatus.replyUnknownException:
- case ReplyStatus.replyUnknownLocalException:
- case ReplyStatus.replyUnknownUserException:
- {
- String unknown = _is.readString();
-
- Ice.UnknownException ex = null;
- switch(replyStatus)
- {
- case ReplyStatus.replyUnknownException:
- {
- ex = new Ice.UnknownException();
- break;
- }
-
- case ReplyStatus.replyUnknownLocalException:
- {
- ex = new Ice.UnknownLocalException();
- break;
- }
-
- case ReplyStatus.replyUnknownUserException:
- {
- ex = new Ice.UnknownUserException();
- break;
- }
-
- default:
- {
- assert (false);
- break;
- }
- }
-
- ex.unknown = unknown;
- throw ex;
- }
+ case ReplyStatus.replyObjectNotExist:
+ {
+ ex = new Ice.ObjectNotExistException();
+ break;
+ }
- default:
- {
- throw new Ice.UnknownReplyStatusException();
- }
+ case ReplyStatus.replyFacetNotExist:
+ {
+ ex = new Ice.FacetNotExistException();
+ break;
}
- if(replyStatus == ReplyStatus.replyOK)
+ case ReplyStatus.replyOperationNotExist:
{
- _state |= StateOK;
+ ex = new Ice.OperationNotExistException();
+ break;
}
- _state |= StateDone;
- _monitor.notifyAll();
- if(_callback == null)
+ default:
{
- if(_observer != null)
- {
- _observer.detach();
- _observer = null;
- }
- return false;
+ assert(false);
+ break;
}
- return true;
- }
- }
- catch(Ice.Exception exc)
- {
- //
- // We don't call finished(exc) here because we don't want
- // to invoke the completion callback. The completion
- // callback is invoked by the connection is this method
- // returns true.
- //
- try
- {
- handleException(exc);
- return false;
+ }
+
+ ex.id = id;
+ ex.facet = facet;
+ ex.operation = operation;
+ throw ex;
}
- catch(Ice.LocalException ex)
+
+ case ReplyStatus.replyUnknownException:
+ case ReplyStatus.replyUnknownLocalException:
+ case ReplyStatus.replyUnknownUserException:
{
- synchronized(_monitor)
- {
- _state |= StateDone;
- _exception = ex;
- _monitor.notifyAll();
+ String unknown = _is.readString();
- if(_callback == null)
- {
- if(_observer != null)
- {
- _observer.detach();
- _observer = null;
- }
- return false;
- }
- return true;
+ Ice.UnknownException ex = null;
+ switch(replyStatus)
+ {
+ case ReplyStatus.replyUnknownException:
+ {
+ ex = new Ice.UnknownException();
+ break;
}
- }
- }
- }
- public final boolean invoke(boolean userThread)
- {
- int mode = _proxy.__reference().getMode();
- if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
- {
- _state |= StateDone | StateOK;
- _handler.finishBatchRequest(_os);
- if(_observer != null)
- {
- _observer.detach();
- _observer = null;
- }
- return true;
- }
+ case ReplyStatus.replyUnknownLocalException:
+ {
+ ex = new Ice.UnknownLocalException();
+ break;
+ }
- while(true)
- {
- try
- {
- _sent = false;
- _handler = _proxy.__getRequestHandler();
- int status = _handler.sendAsyncRequest(this);
- if((status & AsyncStatus.Sent) > 0)
+ case ReplyStatus.replyUnknownUserException:
{
- if(userThread)
- {
- _sentSynchronously = true;
- if((status & AsyncStatus.InvokeSentCallback) > 0)
- {
- invokeSent(); // Call from the user thread.
- }
- }
- else
- {
- if((status & AsyncStatus.InvokeSentCallback) > 0)
- {
- // Call from a client thread pool thread.
- invokeSentAsync();
- }
- }
+ ex = new Ice.UnknownUserException();
+ break;
}
- if(mode == IceInternal.Reference.ModeTwoway || (status & AsyncStatus.Sent) == 0)
+ default:
{
- synchronized(_monitor)
- {
- if((_state & StateDone) == 0)
- {
- int invocationTimeout = _handler.getReference().getInvocationTimeout();
- if(invocationTimeout > 0)
- {
- _future = _instance.timer().schedule(new Runnable()
- {
- @Override
- public void run()
- {
- timeout();
- }
- }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS);
- _timeoutRequestHandler = _handler;
- }
- }
- }
+ assert(false);
+ break;
+ }
}
+
+ ex.unknown = unknown;
+ throw ex;
}
- catch(RetryException ex)
+
+ default:
{
- // Clear request handler and retry.
- _proxy.__setRequestHandler(_handler, null);
- continue;
+ throw new Ice.UnknownReplyStatusException();
}
- catch(Ice.Exception ex)
- {
- // This will throw if the invocation can't be retried.
- handleException(ex);
}
- break;
+
+ return finished(replyStatus == ReplyStatus.replyOK);
+ }
+ catch(Ice.Exception ex)
+ {
+ return completed(ex);
}
- return _sentSynchronously;
}
public BasicStream startWriteParams(Ice.FormatType format)
@@ -591,12 +388,48 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes
}
}
+ public IceInternal.BasicStream startReadParams()
+ {
+ _is.startReadEncaps();
+ return _is;
+ }
+
+ public void endReadParams()
+ {
+ _is.endReadEncaps();
+ }
+
+ public void readEmptyParams()
+ {
+ _is.skipEmptyEncaps(null);
+ }
+
+ public byte[] readParamEncaps()
+ {
+ return _is.readEncaps(null);
+ }
+
+ public final void throwUserException()
+ throws Ice.UserException
+ {
+ try
+ {
+ _is.startReadEncaps();
+ _is.throwException(null);
+ }
+ catch(Ice.UserException ex)
+ {
+ _is.endReadEncaps();
+ throw ex;
+ }
+ }
+
@Override
public void cacheMessageBuffers()
{
if(_proxy.__reference().getInstance().cacheMessageBuffers() > 0)
{
- synchronized(_monitor)
+ synchronized(this)
{
if((_state & StateCachedBuffers) > 0)
{
@@ -612,76 +445,14 @@ public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMes
_os.reset();
_proxy.cacheMessageBuffers(_is, _os);
- }
- }
- @Override
- public void invokeExceptionAsync(final Ice.Exception ex)
- {
- if((_state & StateDone) == 0 && _handler != null)
- {
- //
- // If we didn't finish a batch oneway or datagram request, we
- // must notify the connection about that we give up ownership
- // of the batch stream.
- //
- int mode = _proxy.__reference().getMode();
- if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
- {
- _handler.abortBatchRequest();
- }
- }
-
- super.invokeExceptionAsync(ex);
- }
-
- @Override
- protected void cancelRequest()
- {
- if(_handler != null)
- {
- _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException());
+ _is = null;
+ _os = null;
}
}
- private void handleException(Ice.Exception exc)
- {
- try
- {
- Ice.Holder<Integer> interval = new Ice.Holder<Integer>();
- _cnt = _proxy.__handleException(exc, _handler, _mode, _sent, interval, _cnt);
- if(_observer != null)
- {
- _observer.retried(); // Invocation is being retried.
- }
-
- //
- // Schedule the retry. Note that we always schedule the retry
- // on the retry queue even if the invocation can be retried
- // immediately. This is required because it might not be safe
- // to retry from this thread (this is for instance called by
- // finished(BasicStream) which is called with the connection
- // locked.
- //
- _instance.retryQueue().add(this, interval.value);
- }
- catch(Ice.Exception ex)
- {
- if(_observer != null)
- {
- _observer.failed(ex.ice_name());
- }
- throw ex;
- }
- }
-
- final private Ice.ObjectPrxHelperBase _proxy;
final private Ice.EncodingVersion _encoding;
-
- private RequestHandler _handler;
- private int _cnt;
- private Ice.OperationMode _mode;
- private boolean _sent;
+ private BasicStream _is;
//
// If true this AMI request is being used for a generated synchronous invocation.
diff --git a/java/src/IceInternal/OutgoingAsyncBase.java b/java/src/IceInternal/OutgoingAsyncBase.java
index 2e6aa221f4d..f04b09ea941 100644
--- a/java/src/IceInternal/OutgoingAsyncBase.java
+++ b/java/src/IceInternal/OutgoingAsyncBase.java
@@ -9,385 +9,41 @@
package IceInternal;
-import Ice.AsyncResult;
-import Ice.Communicator;
-import Ice.CommunicatorDestroyedException;
-import Ice.Connection;
-import Ice.ObjectPrx;
-import Ice.UserException;
-
-/**
- * An AsyncResult object is the return value of an asynchronous invocation.
- * With this object, an application can obtain several attributes of the
- * invocation and discover its outcome.
- **/
-public abstract class OutgoingAsyncBase implements Ice.AsyncResult
+//
+// Base class for handling asynchronous invocations. This class is
+// responsible for the handling of the output stream and the child
+// invocation observer.
+//
+public abstract class OutgoingAsyncBase extends IceInternal.AsyncResultI
{
- protected OutgoingAsyncBase(Communicator communicator, IceInternal.Instance instance, String op,
- IceInternal.CallbackBase del)
+ public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException
{
- _communicator = communicator;
- _instance = instance;
- _operation = op;
- _os = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding);
- _state = 0;
- _sentSynchronously = false;
- _exception = null;
- _callback = del;
- }
-
- protected OutgoingAsyncBase(Communicator communicator, Instance instance, String op, CallbackBase del,
- BasicStream is, BasicStream os)
- {
- _communicator = communicator;
- _instance = instance;
- _operation = op;
- _os = os;
- _is = is;
- _state = 0;
- _sentSynchronously = false;
- _exception = null;
- _callback = del;
- }
-
- /**
- * Returns the communicator that sent the invocation.
- *
- * @return The communicator.
- **/
- @Override
- public Communicator getCommunicator()
- {
- return _communicator;
- }
-
- /**
- * Returns the connection that was used for the invocation.
- *
- * @return The connection.
- **/
- @Override
- public Connection getConnection()
- {
- return null;
- }
-
- /**
- * Returns the proxy that was used to call the <code>begin_</code> method.
- *
- * @return The proxy.
- **/
- @Override
- public ObjectPrx getProxy()
- {
- return null;
- }
-
- /**
- * Indicates whether the result of an invocation is available.
- *
- * @return True if the result is available, which means a call to the <code>end_</code>
- * method will not block. The method returns false if the result is not yet available.
- **/
- @Override
- public final boolean isCompleted()
- {
- synchronized(_monitor)
- {
- return (_state & StateDone) > 0;
- }
+ assert(false); // This should be overriden if this object is used with a request handler
+ return AsyncStatus.Queued;
}
- /**
- * Blocks the caller until the result of the invocation is available.
- **/
- @Override
- public final void waitForCompleted()
+ public int invokeCollocated(CollocatedRequestHandler handler)
{
- synchronized(_monitor)
- {
- if(Thread.interrupted())
- {
- throw new Ice.OperationInterruptedException();
- }
- while((_state & StateDone) == 0)
- {
- try
- {
- _monitor.wait();
- }
- catch(InterruptedException ex)
- {
- throw new Ice.OperationInterruptedException();
- }
- }
- }
+ assert(false); // This should be overriden if this object is used with a request handler
+ return AsyncStatus.Queued;
}
- /**
- * When you call the <code>begin_</code> method, the Ice run time attempts to
- * write the corresponding request to the client-side transport. If the
- * transport cannot accept the request, the Ice run time queues the request
- * for later transmission. This method returns true if, at the time it is called,
- * the request has been written to the local transport (whether it was initially
- * queued or not). Otherwise, if the request is still queued, this method returns
- * false.
- *
- * @return True if the request has been sent, or false if the request is queued.
- **/
- @Override
- public final boolean isSent()
+ public boolean sent()
{
- synchronized(_monitor)
- {
- return (_state & StateSent) > 0;
- }
+ return sent(true);
}
- /**
- * Blocks the caller until the request has been written to the client-side transport.
- **/
- @Override
- public final void waitForSent()
+ public boolean completed(Ice.Exception ex)
{
- synchronized(_monitor)
- {
- if(Thread.interrupted())
- {
- throw new Ice.OperationInterruptedException();
- }
- while((_state & StateSent) == 0 && _exception == null)
- {
- try
- {
- _monitor.wait();
- }
- catch(InterruptedException ex)
- {
- throw new Ice.OperationInterruptedException();
- }
- }
- }
+ return finished(ex);
}
- /**
- * If the invocation failed with a local exception, throws the local exception.
- **/
- @Override
- public final void throwLocalException()
+ public final void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId)
{
- synchronized(_monitor)
- {
- if(_exception != null)
- {
- throw _exception;
- }
- }
- }
-
- /**
- * This method returns true if a request was written to the client-side
- * transport without first being queued. If the request was initially
- * queued, this method returns false (independent of whether the request
- * is still in the queue or has since been written to the client-side transport).
- *
- * @return True if the request was sent without being queued, or false
- * otherwise.
- **/
- @Override
- public final boolean sentSynchronously()
- {
- return _sentSynchronously; // No lock needed, immutable once __send() is called
- }
-
- /**
- * Returns the name of the operation.
- *
- * @return The operation name.
- **/
- @Override
- public final String getOperation()
- {
- return _operation;
- }
-
- public final IceInternal.BasicStream getOs()
- {
- return _os;
- }
-
- public IceInternal.BasicStream
- startReadParams()
- {
- _is.startReadEncaps();
- return _is;
- }
-
- public void
- endReadParams()
- {
- _is.endReadEncaps();
- }
-
- public void
- readEmptyParams()
- {
- _is.skipEmptyEncaps(null);
- }
-
- public byte[]
- readParamEncaps()
- {
- return _is.readEncaps(null);
- }
-
- public final boolean __wait()
- {
- try
- {
- synchronized(_monitor)
- {
- if((_state & StateEndCalled) > 0)
- {
- throw new java.lang.IllegalArgumentException("end_ method called more than once");
- }
-
- _state |= StateEndCalled;
- if(Thread.interrupted())
- {
- throw new InterruptedException();
- }
- while((_state & StateDone) == 0)
- {
- _monitor.wait();
- }
-
- if(_exception != null)
- {
- throw (Ice.Exception)_exception.fillInStackTrace();
- }
-
- return (_state & StateOK) > 0;
- }
- }
- catch(InterruptedException ex)
- {
- // This must be called outside of the monitor as the
- // invocation will potentially want to lock the
- // connection (which in turn may want to lock the outgoing
- // to notify that the message has been sent).
- cancelRequest();
- throw new Ice.OperationInterruptedException();
- }
- }
-
- public final void throwUserException()
- throws UserException
- {
- try
- {
- _is.startReadEncaps();
- _is.throwException(null);
- }
- catch(UserException ex)
- {
- _is.endReadEncaps();
- throw ex;
- }
- }
-
- public void invokeExceptionAsync(final Ice.Exception ex)
- {
- //
- // This is called when it's not safe to call the exception callback synchronously
- // from this thread. Instead the exception callback is called asynchronously from
- // the client thread pool.
- //
- try
- {
- _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection)
- {
- @Override
- public void
- run()
- {
- invokeException(ex);
- }
- });
- }
- catch(CommunicatorDestroyedException exc)
- {
- throw exc; // CommunicatorDestroyedException is the only exception that can propagate directly.
- }
- }
-
- public final void invokeException(Ice.Exception ex)
- {
- synchronized(_monitor)
- {
- _state |= StateDone;
- //_os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation
- _exception = ex;
- _monitor.notifyAll();
- }
-
- invokeCompleted();
- }
-
- protected final void invokeSentInternal()
- {
- //
- // Note: no need to change the _state here, specializations are responsible for
- // changing the state.
- //
-
- if(_callback != null)
- {
- if(_instance.useApplicationClassLoader())
- {
- Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader());
- }
-
- try
- {
- _callback.__sent(this);
- }
- catch(RuntimeException ex)
- {
- warning(ex);
- }
- catch(AssertionError exc)
- {
- error(exc);
- }
- catch(OutOfMemoryError exc)
- {
- error(exc);
- }
- finally
- {
- if(_instance.useApplicationClassLoader())
- {
- Thread.currentThread().setContextClassLoader(null);
- }
- }
- }
-
if(_observer != null)
{
- Ice.ObjectPrx proxy = getProxy();
- if(proxy == null || !proxy.ice_isTwoway())
- {
- _observer.detach();
- }
- }
- }
-
- public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size)
- {
- if(_observer != null)
- {
- _childObserver = _observer.getRemoteObserver(info, endpt, requestId, size);
+ final int size = _os.size() - IceInternal.Protocol.headerSize - 4;
+ _childObserver = getObserver().getRemoteObserver(info, endpt, requestId, size);
if(_childObserver != null)
{
_childObserver.attach();
@@ -395,13 +51,12 @@ public abstract class OutgoingAsyncBase implements Ice.AsyncResult
}
}
- void attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId)
+ public final void attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId)
{
if(_observer != null)
{
- _childObserver = _observer.getCollocatedObserver(adapter,
- requestId,
- _os.size() - IceInternal.Protocol.headerSize - 4);
+ final int size = _os.size() - IceInternal.Protocol.headerSize - 4;
+ _childObserver = getObserver().getCollocatedObserver(adapter, requestId, size);
if(_childObserver != null)
{
_childObserver.attach();
@@ -409,185 +64,49 @@ public abstract class OutgoingAsyncBase implements Ice.AsyncResult
}
}
- abstract void processRetry();
-
- final protected void invokeSentAsync()
- {
- //
- // This is called when it's not safe to call the sent callback synchronously
- // from this thread. Instead the exception callback is called asynchronously from
- // the client thread pool.
- //
- try
- {
- _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection)
- {
- @Override
- public void run()
- {
- invokeSentInternal();
- }
- });
- }
- catch(CommunicatorDestroyedException exc)
- {
- }
- }
-
- public static void check(AsyncResult r, ObjectPrx prx, String operation)
- {
- check(r, operation);
- if(r.getProxy() != prx)
- {
- throw new IllegalArgumentException("Proxy for call to end_" + operation +
- " does not match proxy that was used to call corresponding begin_" +
- operation + " method");
- }
- }
-
- public static void check(AsyncResult r, Connection con, String operation)
+ public final IceInternal.BasicStream getOs()
{
- check(r, operation);
- if(r.getConnection() != con)
- {
- throw new IllegalArgumentException("Connection for call to end_" + operation +
- " does not match connection that was used to call corresponding begin_" +
- operation + " method");
- }
+ return _os;
}
- public static void check(AsyncResult r, Communicator com, String operation)
+ protected OutgoingAsyncBase(Ice.Communicator com, Instance instance, String op, CallbackBase del)
{
- check(r, operation);
- if(r.getCommunicator() != com)
- {
- throw new IllegalArgumentException("Communicator for call to end_" + operation +
- " does not match communicator that was used to call corresponding " +
- "begin_" + operation + " method");
- }
+ super(com, instance, op, del);
+ _os = new BasicStream(instance, Protocol.currentProtocolEncoding);
}
- public void cacheMessageBuffers()
+ protected OutgoingAsyncBase(Ice.Communicator com, Instance instance, String op, CallbackBase del, BasicStream os)
{
+ super(com, instance, op, del);
+ _os = os;
}
- public final void invokeCompleted()
+ @Override
+ protected boolean sent(boolean done)
{
- //
- // Note: no need to change the _state here, specializations are responsible for
- // changing the state.
- //
-
- if(_callback != null)
+ if(done)
{
- if(_instance.useApplicationClassLoader())
- {
- Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader());
- }
-
- try
- {
- _callback.__completed(this);
- }
- catch(RuntimeException ex)
- {
- warning(ex);
- }
- catch(AssertionError exc)
- {
- error(exc);
- }
- catch(OutOfMemoryError exc)
- {
- error(exc);
- }
- finally
+ if(_childObserver != null)
{
- if(_instance.useApplicationClassLoader())
- {
- Thread.currentThread().setContextClassLoader(null);
- }
+ _childObserver.detach();
+ _childObserver = null;
}
}
-
- if(_observer != null)
- {
- _observer.detach();
- _observer = null;
- }
+ return super.sent(done);
}
- protected void
- timeout()
- {
- IceInternal.RequestHandler handler;
- synchronized(_monitor)
- {
- handler = _timeoutRequestHandler;
- _timeoutRequestHandler = null;
- }
-
- if(handler != null)
- {
- handler.asyncRequestCanceled((IceInternal.OutgoingAsyncMessageCallback)this,
- new Ice.InvocationTimeoutException());
- }
- }
-
- private static void check(AsyncResult r, String operation)
- {
- if(r == null)
- {
- throw new IllegalArgumentException("AsyncResult == null");
- }
- else if(r.getOperation() != operation) // Do NOT use equals() here - we are comparing reference equality
- {
- throw new IllegalArgumentException("Incorrect operation for end_" + operation + " method: " +
- r.getOperation());
- }
- }
-
- private final void warning(RuntimeException ex)
+ @Override
+ protected boolean finished(Ice.Exception ex)
{
- if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ if(_childObserver != null)
{
- String s = "exception raised by AMI callback:\n" + IceInternal.Ex.toString(ex);
- _instance.initializationData().logger.warning(s);
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
+ _childObserver = null;
}
+ return super.finished(ex);
}
- private final void error(Error error)
- {
- String s = "error raised by AMI callback:\n" + IceInternal.Ex.toString(error);
- _instance.initializationData().logger.error(s);
- }
-
- abstract protected void cancelRequest();
-
- protected Communicator _communicator;
- protected IceInternal.Instance _instance;
- protected String _operation;
- protected Ice.Connection _cachedConnection;
-
- protected java.lang.Object _monitor = new java.lang.Object();
- protected IceInternal.BasicStream _is;
- protected IceInternal.BasicStream _os;
-
- protected IceInternal.RequestHandler _timeoutRequestHandler;
- protected java.util.concurrent.Future<?> _future;
-
- protected static final byte StateOK = 0x1;
- protected static final byte StateDone = 0x2;
- protected static final byte StateSent = 0x4;
- protected static final byte StateEndCalled = 0x8;
- protected static final byte StateCachedBuffers = 0x10;
-
- protected byte _state;
- protected boolean _sentSynchronously;
- protected Ice.Exception _exception;
-
- protected Ice.Instrumentation.InvocationObserver _observer;
+ protected BasicStream _os;
protected Ice.Instrumentation.ChildInvocationObserver _childObserver;
-
- protected IceInternal.CallbackBase _callback;
}
diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
deleted file mode 100644
index 7b1f5972434..00000000000
--- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java
+++ /dev/null
@@ -1,53 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package IceInternal;
-
-//
-// This interface is used by the connection to handle OutgoingAsync
-// and BatchOutgoingAsync messages.
-//
-public interface OutgoingAsyncMessageCallback
-{
- //
- // Called by the request handler to send the request over the connection.
- //
- int send(Ice.ConnectionI conection, boolean compress, boolean response)
- throws RetryException;
-
- //
- // Called by the collocated request handler to invoke the request.
- //
- int invokeCollocated(CollocatedRequestHandler handler);
-
- //
- // Called by the connection when the message is confirmed sent. The
- // connection is locked when this is called so this method can't call the
- // sent callback. Instead, this method returns true if there's a sent
- // callback and false otherwise. If true is returned, the connection will
- // call the __invokeSent() method bellow (which in turn should call the sent
- // callback).
- //
- boolean sent();
-
- //
- // Called by the connection to call the user sent callback.
- //
- void invokeSent();
-
- //
- // Called by the connection when the request failed.
- //
- void finished(Ice.Exception ex);
-
- //
- // Helper to dispatch the cancellation exception.
- //
- void dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection);
-}
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 534888ec89e..116038ad537 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -265,7 +265,7 @@ public final class OutgoingConnectionFactory
}
public void
- flushAsyncBatchRequests(CommunicatorBatchOutgoingAsync outAsync)
+ flushAsyncBatchRequests(CommunicatorFlushBatch outAsync)
{
java.util.List<Ice.ConnectionI> c = new java.util.LinkedList<Ice.ConnectionI>();
diff --git a/java/src/IceInternal/ProxyBatchOutgoingAsync.java b/java/src/IceInternal/ProxyBatchOutgoingAsync.java
deleted file mode 100644
index 4a575142da7..00000000000
--- a/java/src/IceInternal/ProxyBatchOutgoingAsync.java
+++ /dev/null
@@ -1,97 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package IceInternal;
-
-public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync
-{
- public ProxyBatchOutgoingAsync(Ice.ObjectPrxHelperBase prx, String operation, CallbackBase callback)
- {
- super(prx.ice_getCommunicator(), prx.__reference().getInstance(), operation, callback);
- _proxy = prx;
- _observer = ObserverHelper.get(prx, operation);
- }
-
- public void __invoke()
- {
- Protocol.checkSupportedProtocol(_proxy.__reference().getProtocol());
-
- try
- {
- _handler = _proxy.__getRequestHandler();
- int status = _handler.sendAsyncRequest(this);
- if((status & AsyncStatus.Sent) > 0)
- {
- _sentSynchronously = true;
- if((status & AsyncStatus.InvokeSentCallback) > 0)
- {
- invokeSent();
- }
- }
- else
- {
- synchronized(_monitor)
- {
- if((_state & StateDone) == 0)
- {
- int invocationTimeout = _handler.getReference().getInvocationTimeout();
- if(invocationTimeout > 0)
- {
- _future = _instance.timer().schedule(new Runnable()
- {
- @Override
- public void run()
- {
- timeout();
- }
- }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS);
- _timeoutRequestHandler = _handler;
- }
- }
- }
- }
- }
- catch(RetryException ex)
- {
- //
- // Clear request handler but don't retry or throw. Retrying
- // isn't useful, there were no batch requests associated with
- // the proxy's request handler.
- //
- _proxy.__setRequestHandler(_handler, null);
- }
- catch(Ice.Exception ex)
- {
- if(_observer != null)
- {
- _observer.failed(ex.ice_name());
- }
- _proxy.__setRequestHandler(_handler, null); // Clear request handler
- throw ex; // Throw to notify the user lthat batch requests were potentially lost.
- }
- }
-
- @Override
- public Ice.ObjectPrx getProxy()
- {
- return _proxy;
- }
-
- @Override
- protected void cancelRequest()
- {
- if(_handler != null)
- {
- _handler.asyncRequestCanceled(this, new Ice.OperationInterruptedException());
- }
- }
-
- final private Ice.ObjectPrxHelperBase _proxy;
- private RequestHandler _handler = null;
-}
diff --git a/java/src/IceInternal/ProxyFactory.java b/java/src/IceInternal/ProxyFactory.java
index fb664c89b9b..a5165c37374 100644
--- a/java/src/IceInternal/ProxyFactory.java
+++ b/java/src/IceInternal/ProxyFactory.java
@@ -217,7 +217,7 @@ public final class ProxyFactory
//
// Don't retry invocation timeouts.
//
- if(ex instanceof Ice.InvocationTimeoutException)
+ if(ex instanceof Ice.InvocationTimeoutException || ex instanceof Ice.InvocationCanceledException)
{
throw ex;
}
diff --git a/java/src/IceInternal/ProxyFlushBatch.java b/java/src/IceInternal/ProxyFlushBatch.java
new file mode 100644
index 00000000000..d3708dc67e3
--- /dev/null
+++ b/java/src/IceInternal/ProxyFlushBatch.java
@@ -0,0 +1,71 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+public class ProxyFlushBatch extends ProxyOutgoingAsyncBase
+{
+ public static ProxyFlushBatch check(Ice.AsyncResult r, Ice.ObjectPrx prx, String operation)
+ {
+ ProxyOutgoingAsyncBase.checkImpl(r, prx, operation);
+ try
+ {
+ return (ProxyFlushBatch)r;
+ }
+ catch(ClassCastException ex)
+ {
+ throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method");
+ }
+ }
+
+ public ProxyFlushBatch(Ice.ObjectPrxHelperBase prx, String operation, CallbackBase callback)
+ {
+ super(prx, operation, callback);
+ _observer = ObserverHelper.get(prx, operation);
+ }
+
+ @Override
+ public boolean sent()
+ {
+ return sent(true); // Overriden because the flush is done even if using a two-way proxy.
+ }
+
+ @Override
+ public int send(Ice.ConnectionI connection, boolean compress, boolean response)
+ {
+ _cachedConnection = connection;
+ return connection.flushAsyncBatchRequests(this);
+ }
+
+ @Override
+ public int invokeCollocated(CollocatedRequestHandler handler)
+ {
+ return handler.invokeAsyncBatchRequests(this);
+ }
+
+ public void invoke()
+ {
+ Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol()));
+ invokeImpl(true); // userThread = true
+ }
+
+ @Override
+ protected void handleRetryException(RetryException exc)
+ {
+ _proxy.__setRequestHandler(_handler, null); // Clear request handler
+ throw exc.get(); // No retries, we want to notify the user of potentially lost batch requests
+ }
+
+ @Override
+ protected int handleException(Ice.Exception exc)
+ {
+ _proxy.__setRequestHandler(_handler, null); // Clear request handler
+ throw exc; // No retries, we want to notify the user of potentially lost batch requests
+ }
+}
diff --git a/java/src/IceInternal/ProxyGetConnection.java b/java/src/IceInternal/ProxyGetConnection.java
new file mode 100644
index 00000000000..9f5388218db
--- /dev/null
+++ b/java/src/IceInternal/ProxyGetConnection.java
@@ -0,0 +1,59 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+public class ProxyGetConnection extends ProxyOutgoingAsyncBase
+{
+ public static ProxyGetConnection check(Ice.AsyncResult r, Ice.ObjectPrx prx, String operation)
+ {
+ ProxyOutgoingAsyncBase.checkImpl(r, prx, operation);
+ try
+ {
+ return (ProxyGetConnection)r;
+ }
+ catch(ClassCastException ex)
+ {
+ throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method");
+ }
+ }
+
+ public ProxyGetConnection(Ice.ObjectPrxHelperBase prx, String operation, CallbackBase cb)
+ {
+ super(prx, operation, cb);
+ _observer = ObserverHelper.get(prx, operation);
+ }
+
+ @Override
+ public int send(Ice.ConnectionI connection, boolean compress, boolean response)
+ throws RetryException
+ {
+ _cachedConnection = connection;
+ if(finished(true))
+ {
+ invokeCompletedAsync();
+ }
+ return AsyncStatus.Sent;
+ }
+
+ @Override
+ public int invokeCollocated(CollocatedRequestHandler handler)
+ {
+ if(finished(true))
+ {
+ invokeCompletedAsync();
+ }
+ return AsyncStatus.Sent;
+ }
+
+ public void invoke()
+ {
+ invokeImpl(true); // userThread = true
+ }
+}
diff --git a/java/src/IceInternal/ProxyOutgoingAsyncBase.java b/java/src/IceInternal/ProxyOutgoingAsyncBase.java
new file mode 100644
index 00000000000..eca0de9fcf6
--- /dev/null
+++ b/java/src/IceInternal/ProxyOutgoingAsyncBase.java
@@ -0,0 +1,278 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+//
+// Base class for proxy based invocations. This class handles the
+// retry for proxy invocations. It also ensures the child observer is
+// correct notified of failures and make sure the retry task is
+// correctly canceled when the invocation completes.
+//
+public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
+{
+ public static ProxyOutgoingAsyncBase check(Ice.AsyncResult r, Ice.ObjectPrx prx, String operation)
+ {
+ ProxyOutgoingAsyncBase.checkImpl(r, prx, operation);
+ try
+ {
+ return (ProxyOutgoingAsyncBase)r;
+ }
+ catch(ClassCastException ex)
+ {
+ throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method");
+ }
+ }
+
+ @Override
+ public Ice.ObjectPrx getProxy()
+ {
+ return _proxy;
+ }
+
+ @Override
+ public boolean sent()
+ {
+ return sent(!_proxy.ice_isTwoway());
+ }
+
+ @Override
+ public boolean completed(Ice.Exception exc)
+ {
+ if(_childObserver != null)
+ {
+ _childObserver.failed(exc.ice_name());
+ _childObserver.detach();
+ _childObserver = null;
+ }
+
+ //
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback.
+ //
+ try
+ {
+ _instance.retryQueue().add(this, handleException(exc));
+ return false;
+ }
+ catch(Ice.Exception ex)
+ {
+ return finished(ex); // No retries, we're done
+ }
+ }
+
+ public void retry()
+ {
+ invokeImpl(false);
+ }
+
+ public void abort(Ice.Exception ex)
+ {
+ assert(_childObserver == null);
+ if(finished(ex))
+ {
+ invokeCompletedAsync();
+ }
+ else if(ex instanceof Ice.CommunicatorDestroyedException)
+ {
+ //
+ // If it's a communicator destroyed exception, don't swallow
+ // it but instead notify the user thread. Even if no callback
+ // was provided.
+ //
+ throw ex;
+ }
+ }
+
+ protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, String op, CallbackBase delegate)
+ {
+ super(prx.ice_getCommunicator(), prx.__reference().getInstance(), op, delegate);
+ _proxy = prx;
+ _mode = Ice.OperationMode.Normal;
+ _cnt = 0;
+ _sent = false;
+ }
+
+ protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, String op, CallbackBase delegate, BasicStream os)
+ {
+ super(prx.ice_getCommunicator(), prx.__reference().getInstance(), op, delegate, os);
+ _proxy = prx;
+ _mode = Ice.OperationMode.Normal;
+ _cnt = 0;
+ _sent = false;
+ }
+
+ protected static Ice.AsyncResult checkImpl(Ice.AsyncResult r, Ice.ObjectPrx p, String operation)
+ {
+ check(r, operation);
+ if(r.getProxy() != p)
+ {
+ throw new IllegalArgumentException("Proxy for call to end_" + operation +
+ " does not match proxy that was used to call corresponding " +
+ "begin_" + operation + " method");
+ }
+ return r;
+ }
+
+ protected void invokeImpl(boolean userThread)
+ {
+ try
+ {
+ if(userThread)
+ {
+ int invocationTimeout = _proxy.__reference().getInvocationTimeout();
+ if(invocationTimeout > 0)
+ {
+ _future = _instance.timer().schedule(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ cancel(new Ice.InvocationTimeoutException());
+ }
+ }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS);
+ }
+ }
+ else // If not called from the user thread, it's called from the retry queue
+ {
+ checkCanceled(); // Cancellation exception aren't retriable
+ if(_observer != null)
+ {
+ _observer.retried();
+ }
+ }
+
+ while(true)
+ {
+ try
+ {
+ _sent = false;
+ _handler = _proxy.__getRequestHandler();
+ int status = _handler.sendAsyncRequest(this);
+ if((status & AsyncStatus.Sent) > 0)
+ {
+ if(userThread)
+ {
+ _sentSynchronously = true;
+ if((status & AsyncStatus.InvokeSentCallback) > 0)
+ {
+ invokeSent(); // Call the sent callback from the user thread.
+ }
+ }
+ else
+ {
+ if((status & AsyncStatus.InvokeSentCallback) > 0)
+ {
+ invokeSentAsync(); // Call the sent callback from a client thread pool thread.
+ }
+ }
+ }
+ return; // We're done!
+ }
+ catch(RetryException ex)
+ {
+ handleRetryException(ex);
+ }
+ catch(Ice.Exception ex)
+ {
+ if(_childObserver != null)
+ {
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
+ _childObserver = null;
+ }
+ final int interval = handleException(ex);
+ if(interval > 0)
+ {
+ _instance.retryQueue().add(this, interval);
+ return;
+ }
+ else if(_observer != null)
+ {
+ checkCanceled();
+ _observer.retried();
+ }
+ }
+ }
+ }
+ catch(Ice.Exception ex)
+ {
+ //
+ // If called from the user thread we re-throw, the exception
+ // will be catch by the caller and abort() will be called.
+ //
+ if(userThread)
+ {
+ throw ex;
+ }
+ else if(finished(ex)) // No retries, we're done
+ {
+ invokeCompletedAsync();
+ }
+ }
+ }
+
+ @Override
+ protected boolean sent(boolean done)
+ {
+ _sent = true;
+ if(done)
+ {
+ if(_future != null)
+ {
+ _future.cancel(false);
+ _future = null;
+ }
+ }
+ return super.sent(done);
+ }
+
+ @Override
+ protected boolean finished(Ice.Exception ex)
+ {
+ if(_future != null)
+ {
+ _future.cancel(false);
+ _future = null;
+ }
+ return super.finished(ex);
+ }
+
+ @Override
+ protected boolean finished(boolean ok)
+ {
+ if(_future != null)
+ {
+ _future.cancel(false);
+ _future = null;
+ }
+ return super.finished(ok);
+ }
+
+ protected void handleRetryException(RetryException exc)
+ {
+ _proxy.__setRequestHandler(_handler, null); // Clear request handler and always retry.
+ }
+
+ protected int handleException(Ice.Exception exc)
+ {
+ Ice.Holder<Integer> interval = new Ice.Holder<Integer>();
+ _cnt = _proxy.__handleException(exc, _handler, _mode, _sent, interval, _cnt);
+ return interval.value;
+ }
+
+ final protected Ice.ObjectPrxHelperBase _proxy;
+ protected RequestHandler _handler;
+ protected Ice.OperationMode _mode;
+
+ private java.util.concurrent.Future<?> _future;
+ private int _cnt;
+ private boolean _sent;
+}
diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java
index 5f40ea37403..fdb7a672e2a 100644
--- a/java/src/IceInternal/QueueRequestHandler.java
+++ b/java/src/IceInternal/QueueRequestHandler.java
@@ -124,7 +124,7 @@ public class QueueRequestHandler implements RequestHandler
@Override
public int
- sendAsyncRequest(final OutgoingAsyncMessageCallback out) throws RetryException
+ sendAsyncRequest(final OutgoingAsyncBase out) throws RetryException
{
try
{
@@ -148,15 +148,16 @@ public class QueueRequestHandler implements RequestHandler
}
@Override
- public boolean
- asyncRequestCanceled(final OutgoingAsyncMessageCallback outAsync, final Ice.LocalException ex)
+ public void
+ asyncRequestCanceled(final OutgoingAsyncBase outAsync, final Ice.LocalException ex)
{
- return performCallable(new Callable<Boolean>()
+ performCallable(new Callable<Void>()
{
@Override
- public Boolean call()
+ public Void call()
{
- return _delegate.asyncRequestCanceled(outAsync, ex);
+ _delegate.asyncRequestCanceled(outAsync, ex);
+ return null;
}
});
}
diff --git a/java/src/IceInternal/RequestHandler.java b/java/src/IceInternal/RequestHandler.java
index 1e0e4a7909a..f87b215919f 100644
--- a/java/src/IceInternal/RequestHandler.java
+++ b/java/src/IceInternal/RequestHandler.java
@@ -9,7 +9,7 @@
package IceInternal;
-public interface RequestHandler
+public interface RequestHandler extends CancellationHandler
{
RequestHandler connect();
RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler);
@@ -19,11 +19,9 @@ public interface RequestHandler
void finishBatchRequest(BasicStream out);
void abortBatchRequest();
- int sendAsyncRequest(OutgoingAsyncMessageCallback out)
+ int sendAsyncRequest(OutgoingAsyncBase out)
throws RetryException;
- boolean asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex);
-
Reference getReference();
Ice.ConnectionI getConnection();
diff --git a/java/src/IceInternal/RetryQueue.java b/java/src/IceInternal/RetryQueue.java
index 2c322d1cb72..82831529545 100644
--- a/java/src/IceInternal/RetryQueue.java
+++ b/java/src/IceInternal/RetryQueue.java
@@ -16,8 +16,7 @@ public class RetryQueue
_instance = instance;
}
- synchronized public void
- add(OutgoingAsyncBase outAsync, int interval)
+ synchronized public void add(ProxyOutgoingAsyncBase outAsync, int interval)
{
if(_instance == null)
{
@@ -26,10 +25,10 @@ public class RetryQueue
RetryTask task = new RetryTask(this, outAsync);
task.setFuture(_instance.timer().schedule(task, interval, java.util.concurrent.TimeUnit.MILLISECONDS));
_requests.add(task);
+ outAsync.cancelable(task);
}
- synchronized public void
- destroy()
+ synchronized public void destroy()
{
java.util.HashSet<RetryTask> keep = new java.util.HashSet<RetryTask>();
for(RetryTask task : _requests)
@@ -65,14 +64,14 @@ public class RetryQueue
}
}
- synchronized void
- remove(RetryTask task)
+ synchronized boolean remove(RetryTask task)
{
- _requests.remove(task);
+ boolean removed = _requests.remove(task);
if(_instance == null && _requests.isEmpty())
{
notify();
}
+ return removed;
}
private Instance _instance;
diff --git a/java/src/IceInternal/RetryTask.java b/java/src/IceInternal/RetryTask.java
index 64a54bd45b5..974dc998a79 100644
--- a/java/src/IceInternal/RetryTask.java
+++ b/java/src/IceInternal/RetryTask.java
@@ -9,26 +9,18 @@
package IceInternal;
-class RetryTask implements Runnable
+class RetryTask implements Runnable, CancellationHandler
{
- RetryTask(RetryQueue queue, OutgoingAsyncBase outAsync)
+ RetryTask(RetryQueue queue, ProxyOutgoingAsyncBase outAsync)
{
_queue = queue;
_outAsync = outAsync;
}
@Override
- public void
- run()
+ public void run()
{
- try
- {
- _outAsync.processRetry();
- }
- catch(Ice.LocalException ex)
- {
- _outAsync.invokeExceptionAsync(ex);
- }
+ _outAsync.retry();
//
// NOTE: this must be called last, destroy() blocks until all task
@@ -39,12 +31,32 @@ class RetryTask implements Runnable
_queue.remove(this);
}
- public boolean
- destroy()
+ @Override
+ public void asyncRequestCanceled(OutgoingAsyncBase outAsync, Ice.LocalException ex)
+ {
+ if(_queue.remove(this) && _future.cancel(false))
+ {
+ //
+ // We just retry the outgoing async now rather than marking it
+ // as finished. The retry will check for the cancellation
+ // exception and terminate appropriately the request.
+ //
+ _outAsync.retry();
+ }
+ }
+
+ public boolean destroy()
{
if(_future.cancel(false))
{
- _outAsync.invokeExceptionAsync(new Ice.CommunicatorDestroyedException());
+ try
+ {
+ _outAsync.abort(new Ice.CommunicatorDestroyedException());
+ }
+ catch(Ice.CommunicatorDestroyedException ex)
+ {
+ // Abort can throw if there's no callback, just ignore in this case
+ }
return true;
}
return false;
@@ -56,6 +68,6 @@ class RetryTask implements Runnable
}
private final RetryQueue _queue;
- private final OutgoingAsyncBase _outAsync;
+ private final ProxyOutgoingAsyncBase _outAsync;
private java.util.concurrent.Future<?> _future;
}