summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2009-11-25 15:05:41 +0100
committerBenoit Foucher <benoit@zeroc.com>2009-11-25 15:05:41 +0100
commit2fca2c1309c4991b21ff956709068122f19eef4a (patch)
treeb90e6fe1450508f5ce2962e21627a4535414e1a6 /java/src
parentUpdate depends for SQL directories (diff)
downloadice-2fca2c1309c4991b21ff956709068122f19eef4a.tar.bz2
ice-2fca2c1309c4991b21ff956709068122f19eef4a.tar.xz
ice-2fca2c1309c4991b21ff956709068122f19eef4a.zip
- Cleaned up test/Ice/operations test
- Added test/Ice/ami test - sent callback is now always called
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/AMI_Object_ice_flushBatchRequests.java5
-rw-r--r--java/src/Ice/AMI_Object_ice_invoke.java5
-rw-r--r--java/src/Ice/AsyncResult.java108
-rw-r--r--java/src/Ice/ConnectionI.java45
-rw-r--r--java/src/Ice/ExceptionCallback.java4
-rw-r--r--java/src/Ice/ObjectPrxHelperBase.java18
-rw-r--r--java/src/Ice/OnewayCallback.java4
-rw-r--r--java/src/Ice/SentCallback.java33
-rw-r--r--java/src/Ice/TwowayCallback.java4
-rw-r--r--java/src/IceInternal/AsyncStatus.java17
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java14
-rw-r--r--java/src/IceInternal/ConnectionRequestHandler.java4
-rw-r--r--java/src/IceInternal/OutgoingAsync.java20
-rw-r--r--java/src/IceInternal/ProxyBatchOutgoingAsync.java28
-rw-r--r--java/src/IceInternal/RequestHandler.java4
15 files changed, 181 insertions, 132 deletions
diff --git a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java
index 5093553469b..e213ed22fe2 100644
--- a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java
+++ b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java
@@ -29,9 +29,10 @@ public abstract class AMI_Object_ice_flushBatchRequests extends Callback_Object_
ice_exception(ex);
}
- public final void sent()
+ @Override
+ public final void sent(boolean sentSynchronously)
{
- if(this instanceof AMISentCallback)
+ if(sentSynchronously && this instanceof AMISentCallback)
{
((AMISentCallback)this).ice_sent();
}
diff --git a/java/src/Ice/AMI_Object_ice_invoke.java b/java/src/Ice/AMI_Object_ice_invoke.java
index 8b273f4caab..470d69fa1a3 100644
--- a/java/src/Ice/AMI_Object_ice_invoke.java
+++ b/java/src/Ice/AMI_Object_ice_invoke.java
@@ -42,9 +42,10 @@ public abstract class AMI_Object_ice_invoke extends Callback_Object_ice_invoke
ice_exception(ex);
}
- public final void sent()
+ @Override
+ public final void sent(boolean sentSynchronously)
{
- if(this instanceof AMISentCallback)
+ if(!sentSynchronously && this instanceof AMISentCallback)
{
((AMISentCallback)this).ice_sent();
}
diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java
index a006a83f511..67b3ecbe0b2 100644
--- a/java/src/Ice/AsyncResult.java
+++ b/java/src/Ice/AsyncResult.java
@@ -174,6 +174,72 @@ public class AsyncResult
}
}
+ public final void __exception(LocalException ex)
+ {
+ synchronized(_monitor)
+ {
+ _state |= Done;
+ _exception = ex;
+ _monitor.notifyAll();
+ }
+
+ if(_callback != null)
+ {
+ try
+ {
+ _callback.__completed(this);
+ }
+ catch(RuntimeException exc)
+ {
+ __warning(exc);
+ }
+ }
+ }
+
+ protected final void __sentInternal()
+ {
+ //
+ // Note: no need to change the _state here, specializations are responsible for
+ // changing the state.
+ //
+
+ if(_callback != null)
+ {
+ try
+ {
+ _callback.__sent(this);
+ }
+ catch(RuntimeException ex)
+ {
+ __warning(ex);
+ }
+ }
+ }
+
+ public final void __sentAsync()
+ {
+ //
+ // This is called when it's not safe to call the sent callback synchronously
+ // from this thread. Instead the exception callback is called asynchronously from
+ // the client thread pool.
+ //
+ try
+ {
+ _instance.clientThreadPool().execute(new IceInternal.ThreadPoolWorkItem()
+ {
+ public void
+ execute(IceInternal.ThreadPoolCurrent current)
+ {
+ current.ioCompleted();
+ __sentInternal();
+ }
+ });
+ }
+ catch(CommunicatorDestroyedException exc)
+ {
+ }
+ }
+
public static void __check(AsyncResult r, ObjectPrx prx, String operation)
{
__check(r, operation);
@@ -207,28 +273,6 @@ public class AsyncResult
}
}
- public final void __exception(LocalException ex)
- {
- synchronized(_monitor)
- {
- _state |= Done;
- _exception = ex;
- _monitor.notifyAll();
- }
-
- if(_callback != null)
- {
- try
- {
- _callback.__completed(this);
- }
- catch(RuntimeException exc)
- {
- __warning(exc);
- }
- }
- }
-
protected static void __check(AsyncResult r, String operation)
{
if(r == null)
@@ -242,26 +286,6 @@ public class AsyncResult
}
}
- protected final void __sentInternal()
- {
- //
- // Note: no need to change the _state here, specializations are responsible for
- // changing the state.
- //
-
- if(_callback != null)
- {
- try
- {
- _callback.__sent(this);
- }
- catch(RuntimeException ex)
- {
- __warning(ex);
- }
- }
- }
-
protected final void __response()
{
//
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index f1cbcd0723d..eac4cb11e3b 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -324,7 +324,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
boolean sent = false;
try
{
- sent = sendMessage(new OutgoingMessage(out, out.os(), compress, requestId));
+ OutgoingMessage message = new OutgoingMessage(out, out.os(), compress, requestId);
+ sent = (sendMessage(message) & IceInternal.AsyncStatus.Sent) > 0;
}
catch(Ice.LocalException ex)
{
@@ -344,7 +345,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return sent; // The request was sent.
}
- synchronized public boolean
+ synchronized public int
sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response)
throws IceInternal.LocalExceptionWrapper
{
@@ -389,10 +390,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
os.writeInt(requestId);
}
- boolean sent;
+ int status;
try
{
- sent = sendMessage(new OutgoingMessage(out, out.__os(), compress, requestId));
+ status = sendMessage(new OutgoingMessage(out, out.__os(), compress, requestId));
}
catch(Ice.LocalException ex)
{
@@ -408,7 +409,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
_asyncRequests.put(requestId, out);
}
- return sent;
+ return status;
}
public synchronized void
@@ -653,7 +654,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
try
{
OutgoingMessage message = new OutgoingMessage(out, out.os(), _batchRequestCompress, 0);
- sent = sendMessage(message);
+ sent = (sendMessage(message) & IceInternal.AsyncStatus.Sent) > 0;
}
catch(Ice.LocalException ex)
{
@@ -672,7 +673,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return sent;
}
- synchronized public boolean
+ synchronized public int
flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync)
{
while(_batchStreamInUse && _exception == null)
@@ -693,8 +694,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_batchRequestNum == 0)
{
- outAsync.__sent(this);
- return true;
+ int status = IceInternal.AsyncStatus.Sent;
+ if(outAsync.__sent(this))
+ {
+ status |= IceInternal.AsyncStatus.InvokeSentCallback;
+ }
+ return status;
}
//
@@ -705,11 +710,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_batchStream.swap(outAsync.__os());
- boolean sent;
+ int status;
try
{
OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.__os(), _batchRequestCompress, 0);
- sent = sendMessage(message);
+ status = sendMessage(message);
}
catch(Ice.LocalException ex)
{
@@ -725,7 +730,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_batchRequestNum = 0;
_batchRequestCompress = false;
_batchMarker = 0;
- return sent;
+ return status;
}
synchronized public void
@@ -1630,7 +1635,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
os.writeByte((byte)0); // compression status: always report 0 for CloseConnection in Java.
os.writeInt(IceInternal.Protocol.headerSize); // Message size.
- if(sendMessage(new OutgoingMessage(os, false, false)))
+ if((sendMessage(new OutgoingMessage(os, false, false)) & IceInternal.AsyncStatus.Sent) > 0)
{
//
// Schedule the close timeout to wait for the peer to close the connection. If
@@ -1854,7 +1859,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return callbacks;
}
- private boolean
+ private int
sendMessage(OutgoingMessage message)
{
assert(_state < StateClosed);
@@ -1862,7 +1867,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
message.adopt();
_sendStreams.addLast(message);
- return false;
+ return IceInternal.AsyncStatus.Queued;
}
//
@@ -1890,12 +1895,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_transceiver.write(message.stream.getBuffer()))
{
- message.sent(this, false);
+ int status = IceInternal.AsyncStatus.Sent;
+ if(message.sent(this, false))
+ {
+ status |= IceInternal.AsyncStatus.InvokeSentCallback;
+ }
if(_acmTimeout > 0)
{
_acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
}
- return true;
+ return status;
}
message.adopt();
@@ -1903,7 +1912,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_sendStreams.addLast(message);
scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout());
_threadPool.register(this, IceInternal.SocketOperation.Write);
- return false;
+ return IceInternal.AsyncStatus.Queued;
}
private IceInternal.BasicStream
diff --git a/java/src/Ice/ExceptionCallback.java b/java/src/Ice/ExceptionCallback.java
index 733542b213a..b213f639938 100644
--- a/java/src/Ice/ExceptionCallback.java
+++ b/java/src/Ice/ExceptionCallback.java
@@ -26,7 +26,7 @@ public abstract class ExceptionCallback extends Callback
/**
* Called when a queued invocation is sent successfully.
**/
- public void sent()
+ public void sent(boolean sentSynchronously)
{
}
@@ -45,6 +45,6 @@ public abstract class ExceptionCallback extends Callback
public final void __sent(AsyncResult __result)
{
- sent();
+ sent(__result.sentSynchronously());
}
}
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java
index 171abe39367..891fe6653e4 100644
--- a/java/src/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/ObjectPrxHelperBase.java
@@ -1832,25 +1832,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
private AsyncResult
begin_ice_flushBatchRequestsInternal(IceInternal.CallbackBase __cb)
{
- IceInternal.BatchOutgoingAsync __result =
+ IceInternal.ProxyBatchOutgoingAsync __result =
new IceInternal.ProxyBatchOutgoingAsync(this, __ice_flushBatchRequests_name, __cb);
try
{
- //
- // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch
- // requests were queued with the connection, they would be lost without being noticed.
- //
- _ObjectDel delegate = null;
- int cnt = -1; // Don't retry.
- try
- {
- delegate = __getDelegate(false);
- delegate.__getRequestHandler().flushAsyncBatchRequests(__result);
- }
- catch(LocalException __ex)
- {
- cnt = __handleException(delegate, __ex, null, cnt);
- }
+ __result.__send();
}
catch(LocalException __ex)
{
diff --git a/java/src/Ice/OnewayCallback.java b/java/src/Ice/OnewayCallback.java
index c613a0fadae..7464e397911 100644
--- a/java/src/Ice/OnewayCallback.java
+++ b/java/src/Ice/OnewayCallback.java
@@ -29,13 +29,13 @@ public abstract class OnewayCallback extends IceInternal.CallbackBase
/**
* Called when a queued invocation is sent successfully.
**/
- public void sent()
+ public void sent(boolean sentSynchronously)
{
}
public final void __sent(AsyncResult __result)
{
- sent();
+ sent(__result.sentSynchronously());
}
public final void __completed(AsyncResult __result)
diff --git a/java/src/Ice/SentCallback.java b/java/src/Ice/SentCallback.java
deleted file mode 100644
index 1a2f8a08009..00000000000
--- a/java/src/Ice/SentCallback.java
+++ /dev/null
@@ -1,33 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package Ice;
-
-/**
- * An application can optionally supply an instance of this class in an
- * asynchronous invocation. The application must create a subclass and
- * implement the sent method.
- **/
-public abstract class SentCallback extends Callback
-{
- /**
- * Called when a queued invocation is sent successfully.
- **/
- public abstract void sent();
-
- public final void __completed(AsyncResult __result)
- {
- // Ignore.
- }
-
- public final void __sent(AsyncResult __result)
- {
- sent();
- }
-}
diff --git a/java/src/Ice/TwowayCallback.java b/java/src/Ice/TwowayCallback.java
index 0ebe4316ef8..78b022fd58a 100644
--- a/java/src/Ice/TwowayCallback.java
+++ b/java/src/Ice/TwowayCallback.java
@@ -24,12 +24,12 @@ public abstract class TwowayCallback extends IceInternal.CallbackBase
/**
* Called when a queued invocation is sent successfully.
**/
- public void sent()
+ public void sent(boolean sentSynchronously)
{
}
public final void __sent(AsyncResult __result)
{
- sent();
+ sent(__result.sentSynchronously());
}
}
diff --git a/java/src/IceInternal/AsyncStatus.java b/java/src/IceInternal/AsyncStatus.java
new file mode 100644
index 00000000000..6d62be78cb4
--- /dev/null
+++ b/java/src/IceInternal/AsyncStatus.java
@@ -0,0 +1,17 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+public class AsyncStatus
+{
+ public static final int Queued = 0;
+ public static final int Sent = 1;
+ public static final int InvokeSentCallback = 2;
+}
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 6b1a0a34d6b..8bd1ca9f085 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -145,7 +145,7 @@ public class ConnectRequestHandler
}
}
- public boolean
+ public int
sendAsyncRequest(OutgoingAsync out)
throws LocalExceptionWrapper
{
@@ -154,7 +154,7 @@ public class ConnectRequestHandler
if(!initialized())
{
_requests.add(new Request(out));
- return false;
+ return AsyncStatus.Queued;
}
}
return _connection.sendAsyncRequest(out, _compress, _response);
@@ -166,7 +166,7 @@ public class ConnectRequestHandler
return getConnection(true).flushBatchRequests(out);
}
- public boolean
+ public int
flushAsyncBatchRequests(BatchOutgoingAsync out)
{
synchronized(this)
@@ -174,7 +174,7 @@ public class ConnectRequestHandler
if(!initialized())
{
_requests.add(new Request(out));
- return false;
+ return AsyncStatus.Queued;
}
}
return _connection.flushAsyncBatchRequests(out);
@@ -409,14 +409,16 @@ public class ConnectRequestHandler
Request request = p.next();
if(request.out != null)
{
- if(_connection.sendAsyncRequest(request.out, _compress, _response))
+ if((_connection.sendAsyncRequest(request.out, _compress, _response) &
+ AsyncStatus.InvokeSentCallback) > 0)
{
sentCallbacks.add(request.out);
}
}
else if(request.batchOut != null)
{
- if(_connection.flushAsyncBatchRequests(request.batchOut))
+ if((_connection.flushAsyncBatchRequests(request.batchOut) &
+ AsyncStatus.InvokeSentCallback) > 0)
{
sentCallbacks.add(request.batchOut);
}
diff --git a/java/src/IceInternal/ConnectionRequestHandler.java b/java/src/IceInternal/ConnectionRequestHandler.java
index 1f085f49dfb..61c6ee8daf3 100644
--- a/java/src/IceInternal/ConnectionRequestHandler.java
+++ b/java/src/IceInternal/ConnectionRequestHandler.java
@@ -44,7 +44,7 @@ public class ConnectionRequestHandler implements RequestHandler
}
}
- public boolean
+ public int
sendAsyncRequest(OutgoingAsync out)
throws LocalExceptionWrapper
{
@@ -57,7 +57,7 @@ public class ConnectionRequestHandler implements RequestHandler
return _connection.flushBatchRequests(out);
}
- public boolean
+ public int
flushAsyncBatchRequests(BatchOutgoingAsync out)
{
return _connection.flushAsyncBatchRequests(out);
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 75f52715837..65139041153 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -355,10 +355,24 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
try
{
_delegate = _proxy.__getDelegate(true);
- boolean sent = _delegate.__getRequestHandler().sendAsyncRequest(this);
- if(synchronous) // Only set sentSynchronously_ If called synchronously by the user thread.
+ int status = _delegate.__getRequestHandler().sendAsyncRequest(this);
+ if((status & AsyncStatus.Sent) > 0)
{
- _sentSynchronously = sent;
+ if(synchronous)
+ {
+ _sentSynchronously = true;
+ if((status & AsyncStatus.InvokeSentCallback) > 0)
+ {
+ __sent(); // Call from the user thread.
+ }
+ }
+ else
+ {
+ if((status & AsyncStatus.InvokeSentCallback) > 0)
+ {
+ __sentAsync(); // Call from a client thread pool thread.
+ }
+ }
}
break;
}
diff --git a/java/src/IceInternal/ProxyBatchOutgoingAsync.java b/java/src/IceInternal/ProxyBatchOutgoingAsync.java
index c6f7efdbe5b..aa8d9133430 100644
--- a/java/src/IceInternal/ProxyBatchOutgoingAsync.java
+++ b/java/src/IceInternal/ProxyBatchOutgoingAsync.java
@@ -17,6 +17,34 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync
_proxy = prx;
}
+ public void
+ __send()
+ {
+ //
+ // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch
+ // requests were queued with the connection, they would be lost without being noticed.
+ //
+ Ice._ObjectDel delegate = null;
+ int cnt = -1; // Don't retry.
+ try
+ {
+ delegate = ((Ice.ObjectPrxHelperBase)_proxy).__getDelegate(false);
+ int status = delegate.__getRequestHandler().flushAsyncBatchRequests(this);
+ if((status & AsyncStatus.Sent) > 0)
+ {
+ _sentSynchronously = true;
+ if((status & AsyncStatus.InvokeSentCallback) > 0)
+ {
+ __sent();
+ }
+ }
+ }
+ catch(Ice.LocalException __ex)
+ {
+ cnt = ((Ice.ObjectPrxHelperBase)_proxy).__handleException(delegate, __ex, null, cnt);
+ }
+ }
+
public Ice.ObjectPrx getProxy()
{
return _proxy;
diff --git a/java/src/IceInternal/RequestHandler.java b/java/src/IceInternal/RequestHandler.java
index b5bf28c4d99..ac9b181fd11 100644
--- a/java/src/IceInternal/RequestHandler.java
+++ b/java/src/IceInternal/RequestHandler.java
@@ -19,11 +19,11 @@ public interface RequestHandler
Ice.ConnectionI sendRequest(Outgoing out)
throws LocalExceptionWrapper;
- boolean sendAsyncRequest(OutgoingAsync out)
+ int sendAsyncRequest(OutgoingAsync out)
throws LocalExceptionWrapper;
boolean flushBatchRequests(BatchOutgoing out);
- boolean flushAsyncBatchRequests(BatchOutgoingAsync out);
+ int flushAsyncBatchRequests(BatchOutgoingAsync out);
Reference getReference();