diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-11-25 15:05:41 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-11-25 15:05:41 +0100 |
commit | 2fca2c1309c4991b21ff956709068122f19eef4a (patch) | |
tree | b90e6fe1450508f5ce2962e21627a4535414e1a6 /java/src | |
parent | Update depends for SQL directories (diff) | |
download | ice-2fca2c1309c4991b21ff956709068122f19eef4a.tar.bz2 ice-2fca2c1309c4991b21ff956709068122f19eef4a.tar.xz ice-2fca2c1309c4991b21ff956709068122f19eef4a.zip |
- Cleaned up test/Ice/operations test
- Added test/Ice/ami test
- sent callback is now always called
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/Ice/AMI_Object_ice_flushBatchRequests.java | 5 | ||||
-rw-r--r-- | java/src/Ice/AMI_Object_ice_invoke.java | 5 | ||||
-rw-r--r-- | java/src/Ice/AsyncResult.java | 108 | ||||
-rw-r--r-- | java/src/Ice/ConnectionI.java | 45 | ||||
-rw-r--r-- | java/src/Ice/ExceptionCallback.java | 4 | ||||
-rw-r--r-- | java/src/Ice/ObjectPrxHelperBase.java | 18 | ||||
-rw-r--r-- | java/src/Ice/OnewayCallback.java | 4 | ||||
-rw-r--r-- | java/src/Ice/SentCallback.java | 33 | ||||
-rw-r--r-- | java/src/Ice/TwowayCallback.java | 4 | ||||
-rw-r--r-- | java/src/IceInternal/AsyncStatus.java | 17 | ||||
-rw-r--r-- | java/src/IceInternal/ConnectRequestHandler.java | 14 | ||||
-rw-r--r-- | java/src/IceInternal/ConnectionRequestHandler.java | 4 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 20 | ||||
-rw-r--r-- | java/src/IceInternal/ProxyBatchOutgoingAsync.java | 28 | ||||
-rw-r--r-- | java/src/IceInternal/RequestHandler.java | 4 |
15 files changed, 181 insertions, 132 deletions
diff --git a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java index 5093553469b..e213ed22fe2 100644 --- a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java +++ b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java @@ -29,9 +29,10 @@ public abstract class AMI_Object_ice_flushBatchRequests extends Callback_Object_ ice_exception(ex); } - public final void sent() + @Override + public final void sent(boolean sentSynchronously) { - if(this instanceof AMISentCallback) + if(sentSynchronously && this instanceof AMISentCallback) { ((AMISentCallback)this).ice_sent(); } diff --git a/java/src/Ice/AMI_Object_ice_invoke.java b/java/src/Ice/AMI_Object_ice_invoke.java index 8b273f4caab..470d69fa1a3 100644 --- a/java/src/Ice/AMI_Object_ice_invoke.java +++ b/java/src/Ice/AMI_Object_ice_invoke.java @@ -42,9 +42,10 @@ public abstract class AMI_Object_ice_invoke extends Callback_Object_ice_invoke ice_exception(ex); } - public final void sent() + @Override + public final void sent(boolean sentSynchronously) { - if(this instanceof AMISentCallback) + if(!sentSynchronously && this instanceof AMISentCallback) { ((AMISentCallback)this).ice_sent(); } diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java index a006a83f511..67b3ecbe0b2 100644 --- a/java/src/Ice/AsyncResult.java +++ b/java/src/Ice/AsyncResult.java @@ -174,6 +174,72 @@ public class AsyncResult } } + public final void __exception(LocalException ex) + { + synchronized(_monitor) + { + _state |= Done; + _exception = ex; + _monitor.notifyAll(); + } + + if(_callback != null) + { + try + { + _callback.__completed(this); + } + catch(RuntimeException exc) + { + __warning(exc); + } + } + } + + protected final void __sentInternal() + { + // + // Note: no need to change the _state here, specializations are responsible for + // changing the state. + // + + if(_callback != null) + { + try + { + _callback.__sent(this); + } + catch(RuntimeException ex) + { + __warning(ex); + } + } + } + + public final void __sentAsync() + { + // + // This is called when it's not safe to call the sent callback synchronously + // from this thread. Instead the exception callback is called asynchronously from + // the client thread pool. + // + try + { + _instance.clientThreadPool().execute(new IceInternal.ThreadPoolWorkItem() + { + public void + execute(IceInternal.ThreadPoolCurrent current) + { + current.ioCompleted(); + __sentInternal(); + } + }); + } + catch(CommunicatorDestroyedException exc) + { + } + } + public static void __check(AsyncResult r, ObjectPrx prx, String operation) { __check(r, operation); @@ -207,28 +273,6 @@ public class AsyncResult } } - public final void __exception(LocalException ex) - { - synchronized(_monitor) - { - _state |= Done; - _exception = ex; - _monitor.notifyAll(); - } - - if(_callback != null) - { - try - { - _callback.__completed(this); - } - catch(RuntimeException exc) - { - __warning(exc); - } - } - } - protected static void __check(AsyncResult r, String operation) { if(r == null) @@ -242,26 +286,6 @@ public class AsyncResult } } - protected final void __sentInternal() - { - // - // Note: no need to change the _state here, specializations are responsible for - // changing the state. - // - - if(_callback != null) - { - try - { - _callback.__sent(this); - } - catch(RuntimeException ex) - { - __warning(ex); - } - } - } - protected final void __response() { // diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index f1cbcd0723d..eac4cb11e3b 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -324,7 +324,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne boolean sent = false; try { - sent = sendMessage(new OutgoingMessage(out, out.os(), compress, requestId)); + OutgoingMessage message = new OutgoingMessage(out, out.os(), compress, requestId); + sent = (sendMessage(message) & IceInternal.AsyncStatus.Sent) > 0; } catch(Ice.LocalException ex) { @@ -344,7 +345,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return sent; // The request was sent. } - synchronized public boolean + synchronized public int sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response) throws IceInternal.LocalExceptionWrapper { @@ -389,10 +390,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne os.writeInt(requestId); } - boolean sent; + int status; try { - sent = sendMessage(new OutgoingMessage(out, out.__os(), compress, requestId)); + status = sendMessage(new OutgoingMessage(out, out.__os(), compress, requestId)); } catch(Ice.LocalException ex) { @@ -408,7 +409,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // _asyncRequests.put(requestId, out); } - return sent; + return status; } public synchronized void @@ -653,7 +654,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne try { OutgoingMessage message = new OutgoingMessage(out, out.os(), _batchRequestCompress, 0); - sent = sendMessage(message); + sent = (sendMessage(message) & IceInternal.AsyncStatus.Sent) > 0; } catch(Ice.LocalException ex) { @@ -672,7 +673,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return sent; } - synchronized public boolean + synchronized public int flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync) { while(_batchStreamInUse && _exception == null) @@ -693,8 +694,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_batchRequestNum == 0) { - outAsync.__sent(this); - return true; + int status = IceInternal.AsyncStatus.Sent; + if(outAsync.__sent(this)) + { + status |= IceInternal.AsyncStatus.InvokeSentCallback; + } + return status; } // @@ -705,11 +710,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _batchStream.swap(outAsync.__os()); - boolean sent; + int status; try { OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.__os(), _batchRequestCompress, 0); - sent = sendMessage(message); + status = sendMessage(message); } catch(Ice.LocalException ex) { @@ -725,7 +730,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _batchRequestNum = 0; _batchRequestCompress = false; _batchMarker = 0; - return sent; + return status; } synchronized public void @@ -1630,7 +1635,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne os.writeByte((byte)0); // compression status: always report 0 for CloseConnection in Java. os.writeInt(IceInternal.Protocol.headerSize); // Message size. - if(sendMessage(new OutgoingMessage(os, false, false))) + if((sendMessage(new OutgoingMessage(os, false, false)) & IceInternal.AsyncStatus.Sent) > 0) { // // Schedule the close timeout to wait for the peer to close the connection. If @@ -1854,7 +1859,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return callbacks; } - private boolean + private int sendMessage(OutgoingMessage message) { assert(_state < StateClosed); @@ -1862,7 +1867,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { message.adopt(); _sendStreams.addLast(message); - return false; + return IceInternal.AsyncStatus.Queued; } // @@ -1890,12 +1895,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_transceiver.write(message.stream.getBuffer())) { - message.sent(this, false); + int status = IceInternal.AsyncStatus.Sent; + if(message.sent(this, false)) + { + status |= IceInternal.AsyncStatus.InvokeSentCallback; + } if(_acmTimeout > 0) { _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; } - return true; + return status; } message.adopt(); @@ -1903,7 +1912,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _sendStreams.addLast(message); scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); _threadPool.register(this, IceInternal.SocketOperation.Write); - return false; + return IceInternal.AsyncStatus.Queued; } private IceInternal.BasicStream diff --git a/java/src/Ice/ExceptionCallback.java b/java/src/Ice/ExceptionCallback.java index 733542b213a..b213f639938 100644 --- a/java/src/Ice/ExceptionCallback.java +++ b/java/src/Ice/ExceptionCallback.java @@ -26,7 +26,7 @@ public abstract class ExceptionCallback extends Callback /** * Called when a queued invocation is sent successfully. **/ - public void sent() + public void sent(boolean sentSynchronously) { } @@ -45,6 +45,6 @@ public abstract class ExceptionCallback extends Callback public final void __sent(AsyncResult __result) { - sent(); + sent(__result.sentSynchronously()); } } diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index 171abe39367..891fe6653e4 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -1832,25 +1832,11 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable private AsyncResult begin_ice_flushBatchRequestsInternal(IceInternal.CallbackBase __cb) { - IceInternal.BatchOutgoingAsync __result = + IceInternal.ProxyBatchOutgoingAsync __result = new IceInternal.ProxyBatchOutgoingAsync(this, __ice_flushBatchRequests_name, __cb); try { - // - // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch - // requests were queued with the connection, they would be lost without being noticed. - // - _ObjectDel delegate = null; - int cnt = -1; // Don't retry. - try - { - delegate = __getDelegate(false); - delegate.__getRequestHandler().flushAsyncBatchRequests(__result); - } - catch(LocalException __ex) - { - cnt = __handleException(delegate, __ex, null, cnt); - } + __result.__send(); } catch(LocalException __ex) { diff --git a/java/src/Ice/OnewayCallback.java b/java/src/Ice/OnewayCallback.java index c613a0fadae..7464e397911 100644 --- a/java/src/Ice/OnewayCallback.java +++ b/java/src/Ice/OnewayCallback.java @@ -29,13 +29,13 @@ public abstract class OnewayCallback extends IceInternal.CallbackBase /** * Called when a queued invocation is sent successfully. **/ - public void sent() + public void sent(boolean sentSynchronously) { } public final void __sent(AsyncResult __result) { - sent(); + sent(__result.sentSynchronously()); } public final void __completed(AsyncResult __result) diff --git a/java/src/Ice/SentCallback.java b/java/src/Ice/SentCallback.java deleted file mode 100644 index 1a2f8a08009..00000000000 --- a/java/src/Ice/SentCallback.java +++ /dev/null @@ -1,33 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package Ice; - -/** - * An application can optionally supply an instance of this class in an - * asynchronous invocation. The application must create a subclass and - * implement the sent method. - **/ -public abstract class SentCallback extends Callback -{ - /** - * Called when a queued invocation is sent successfully. - **/ - public abstract void sent(); - - public final void __completed(AsyncResult __result) - { - // Ignore. - } - - public final void __sent(AsyncResult __result) - { - sent(); - } -} diff --git a/java/src/Ice/TwowayCallback.java b/java/src/Ice/TwowayCallback.java index 0ebe4316ef8..78b022fd58a 100644 --- a/java/src/Ice/TwowayCallback.java +++ b/java/src/Ice/TwowayCallback.java @@ -24,12 +24,12 @@ public abstract class TwowayCallback extends IceInternal.CallbackBase /** * Called when a queued invocation is sent successfully. **/ - public void sent() + public void sent(boolean sentSynchronously) { } public final void __sent(AsyncResult __result) { - sent(); + sent(__result.sentSynchronously()); } } diff --git a/java/src/IceInternal/AsyncStatus.java b/java/src/IceInternal/AsyncStatus.java new file mode 100644 index 00000000000..6d62be78cb4 --- /dev/null +++ b/java/src/IceInternal/AsyncStatus.java @@ -0,0 +1,17 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class AsyncStatus +{ + public static final int Queued = 0; + public static final int Sent = 1; + public static final int InvokeSentCallback = 2; +} diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 6b1a0a34d6b..8bd1ca9f085 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -145,7 +145,7 @@ public class ConnectRequestHandler } } - public boolean + public int sendAsyncRequest(OutgoingAsync out) throws LocalExceptionWrapper { @@ -154,7 +154,7 @@ public class ConnectRequestHandler if(!initialized()) { _requests.add(new Request(out)); - return false; + return AsyncStatus.Queued; } } return _connection.sendAsyncRequest(out, _compress, _response); @@ -166,7 +166,7 @@ public class ConnectRequestHandler return getConnection(true).flushBatchRequests(out); } - public boolean + public int flushAsyncBatchRequests(BatchOutgoingAsync out) { synchronized(this) @@ -174,7 +174,7 @@ public class ConnectRequestHandler if(!initialized()) { _requests.add(new Request(out)); - return false; + return AsyncStatus.Queued; } } return _connection.flushAsyncBatchRequests(out); @@ -409,14 +409,16 @@ public class ConnectRequestHandler Request request = p.next(); if(request.out != null) { - if(_connection.sendAsyncRequest(request.out, _compress, _response)) + if((_connection.sendAsyncRequest(request.out, _compress, _response) & + AsyncStatus.InvokeSentCallback) > 0) { sentCallbacks.add(request.out); } } else if(request.batchOut != null) { - if(_connection.flushAsyncBatchRequests(request.batchOut)) + if((_connection.flushAsyncBatchRequests(request.batchOut) & + AsyncStatus.InvokeSentCallback) > 0) { sentCallbacks.add(request.batchOut); } diff --git a/java/src/IceInternal/ConnectionRequestHandler.java b/java/src/IceInternal/ConnectionRequestHandler.java index 1f085f49dfb..61c6ee8daf3 100644 --- a/java/src/IceInternal/ConnectionRequestHandler.java +++ b/java/src/IceInternal/ConnectionRequestHandler.java @@ -44,7 +44,7 @@ public class ConnectionRequestHandler implements RequestHandler } } - public boolean + public int sendAsyncRequest(OutgoingAsync out) throws LocalExceptionWrapper { @@ -57,7 +57,7 @@ public class ConnectionRequestHandler implements RequestHandler return _connection.flushBatchRequests(out); } - public boolean + public int flushAsyncBatchRequests(BatchOutgoingAsync out) { return _connection.flushAsyncBatchRequests(out); diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 75f52715837..65139041153 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -355,10 +355,24 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa try { _delegate = _proxy.__getDelegate(true); - boolean sent = _delegate.__getRequestHandler().sendAsyncRequest(this); - if(synchronous) // Only set sentSynchronously_ If called synchronously by the user thread. + int status = _delegate.__getRequestHandler().sendAsyncRequest(this); + if((status & AsyncStatus.Sent) > 0) { - _sentSynchronously = sent; + if(synchronous) + { + _sentSynchronously = true; + if((status & AsyncStatus.InvokeSentCallback) > 0) + { + __sent(); // Call from the user thread. + } + } + else + { + if((status & AsyncStatus.InvokeSentCallback) > 0) + { + __sentAsync(); // Call from a client thread pool thread. + } + } } break; } diff --git a/java/src/IceInternal/ProxyBatchOutgoingAsync.java b/java/src/IceInternal/ProxyBatchOutgoingAsync.java index c6f7efdbe5b..aa8d9133430 100644 --- a/java/src/IceInternal/ProxyBatchOutgoingAsync.java +++ b/java/src/IceInternal/ProxyBatchOutgoingAsync.java @@ -17,6 +17,34 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync _proxy = prx; } + public void + __send() + { + // + // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch + // requests were queued with the connection, they would be lost without being noticed. + // + Ice._ObjectDel delegate = null; + int cnt = -1; // Don't retry. + try + { + delegate = ((Ice.ObjectPrxHelperBase)_proxy).__getDelegate(false); + int status = delegate.__getRequestHandler().flushAsyncBatchRequests(this); + if((status & AsyncStatus.Sent) > 0) + { + _sentSynchronously = true; + if((status & AsyncStatus.InvokeSentCallback) > 0) + { + __sent(); + } + } + } + catch(Ice.LocalException __ex) + { + cnt = ((Ice.ObjectPrxHelperBase)_proxy).__handleException(delegate, __ex, null, cnt); + } + } + public Ice.ObjectPrx getProxy() { return _proxy; diff --git a/java/src/IceInternal/RequestHandler.java b/java/src/IceInternal/RequestHandler.java index b5bf28c4d99..ac9b181fd11 100644 --- a/java/src/IceInternal/RequestHandler.java +++ b/java/src/IceInternal/RequestHandler.java @@ -19,11 +19,11 @@ public interface RequestHandler Ice.ConnectionI sendRequest(Outgoing out) throws LocalExceptionWrapper; - boolean sendAsyncRequest(OutgoingAsync out) + int sendAsyncRequest(OutgoingAsync out) throws LocalExceptionWrapper; boolean flushBatchRequests(BatchOutgoing out); - boolean flushAsyncBatchRequests(BatchOutgoingAsync out); + int flushAsyncBatchRequests(BatchOutgoingAsync out); Reference getReference(); |