diff options
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/Ice/AsyncResult.java | 2 | ||||
-rw-r--r-- | java/src/Ice/Callback_Communicator_flushBatchRequests.java | 49 | ||||
-rw-r--r-- | java/src/Ice/Callback_Connection_flushBatchRequests.java | 49 | ||||
-rw-r--r-- | java/src/Ice/Callback_Object_ice_flushBatchRequests.java | 5 | ||||
-rw-r--r-- | java/src/Ice/CommunicatorI.java | 55 | ||||
-rw-r--r-- | java/src/Ice/ConnectionI.java | 43 | ||||
-rw-r--r-- | java/src/Ice/ObjectAdapterI.java | 4 | ||||
-rw-r--r-- | java/src/IceInternal/CommunicatorBatchOutgoingAsync.java | 134 | ||||
-rw-r--r-- | java/src/IceInternal/ConnectionBatchOutgoingAsync.java | 40 | ||||
-rw-r--r-- | java/src/IceInternal/IncomingConnectionFactory.java | 6 | ||||
-rw-r--r-- | java/src/IceInternal/Instance.java | 21 | ||||
-rw-r--r-- | java/src/IceInternal/ObjectAdapterFactory.java | 4 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingConnectionFactory.java | 21 |
13 files changed, 393 insertions, 40 deletions
diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java index 24b17fe10db..6f09388abff 100644 --- a/java/src/Ice/AsyncResult.java +++ b/java/src/Ice/AsyncResult.java @@ -66,7 +66,7 @@ public class AsyncResult { synchronized(_monitor) { - return (_state & (Sent | Done)) > 0; + return (_state & Sent) > 0; } } diff --git a/java/src/Ice/Callback_Communicator_flushBatchRequests.java b/java/src/Ice/Callback_Communicator_flushBatchRequests.java new file mode 100644 index 00000000000..360ba67cc3b --- /dev/null +++ b/java/src/Ice/Callback_Communicator_flushBatchRequests.java @@ -0,0 +1,49 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package Ice; + +/** + * Asynchronous callback base class for Communicator.begin_flushBatchRequests. + **/ +public abstract class Callback_Communicator_flushBatchRequests extends IceInternal.CallbackBase +{ + /** + * Called when the invocation raises an Ice run-time exception. + * + * @param ex The Ice run-time exception raised by the operation. + **/ + public abstract void exception(LocalException ex); + + /** + * Called when a queued invocation is sent successfully. + **/ + public void sent(boolean sentSynchronously) + { + } + + @Override + public final void __completed(AsyncResult __result) + { + try + { + __result.getCommunicator().end_flushBatchRequests(__result); + } + catch(LocalException __ex) + { + exception(__ex); + } + } + + @Override + public final void __sent(AsyncResult __result) + { + sent(__result.sentSynchronously()); + } +} diff --git a/java/src/Ice/Callback_Connection_flushBatchRequests.java b/java/src/Ice/Callback_Connection_flushBatchRequests.java new file mode 100644 index 00000000000..08a1711ccff --- /dev/null +++ b/java/src/Ice/Callback_Connection_flushBatchRequests.java @@ -0,0 +1,49 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package Ice; + +/** + * Asynchronous callback base class for Connection.begin_flushBatchRequests. + **/ +public abstract class Callback_Connection_flushBatchRequests extends IceInternal.CallbackBase +{ + /** + * Called when the invocation raises an Ice run-time exception. + * + * @param ex The Ice run-time exception raised by the operation. + **/ + public abstract void exception(LocalException ex); + + /** + * Called when a queued invocation is sent successfully. + **/ + public void sent(boolean sentSynchronously) + { + } + + @Override + public final void __completed(AsyncResult __result) + { + try + { + __result.getConnection().end_flushBatchRequests(__result); + } + catch(LocalException __ex) + { + exception(__ex); + } + } + + @Override + public final void __sent(AsyncResult __result) + { + sent(__result.sentSynchronously()); + } +} diff --git a/java/src/Ice/Callback_Object_ice_flushBatchRequests.java b/java/src/Ice/Callback_Object_ice_flushBatchRequests.java index 423d7104992..9d0d784d2ea 100644 --- a/java/src/Ice/Callback_Object_ice_flushBatchRequests.java +++ b/java/src/Ice/Callback_Object_ice_flushBatchRequests.java @@ -14,4 +14,9 @@ package Ice; **/ public abstract class Callback_Object_ice_flushBatchRequests extends OnewayCallback { + @Override + public final void response() + { + // Not used. + } } diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java index 38568febdaa..5dd57ea5c00 100644 --- a/java/src/Ice/CommunicatorI.java +++ b/java/src/Ice/CommunicatorI.java @@ -178,7 +178,60 @@ public final class CommunicatorI implements Communicator public void flushBatchRequests() { - _instance.flushBatchRequests(); + AsyncResult r = begin_flushBatchRequests(); + end_flushBatchRequests(r); + } + + public AsyncResult + begin_flushBatchRequests() + { + return begin_flushBatchRequestsInternal(null); + } + + public AsyncResult + begin_flushBatchRequests(Callback cb) + { + return begin_flushBatchRequestsInternal(cb); + } + + public AsyncResult + begin_flushBatchRequests(Callback_Communicator_flushBatchRequests cb) + { + return begin_flushBatchRequestsInternal(cb); + } + + private static final String __flushBatchRequests_name = "flushBatchRequests"; + + private Ice.AsyncResult + begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb) + { + IceInternal.OutgoingConnectionFactory connectionFactory = _instance.outgoingConnectionFactory(); + IceInternal.ObjectAdapterFactory adapterFactory = _instance.objectAdapterFactory(); + + // + // This callback object receives the results of all invocations + // of Connection.begin_flushBatchRequests. + // + IceInternal.CommunicatorBatchOutgoingAsync result = + new IceInternal.CommunicatorBatchOutgoingAsync(this, _instance, __flushBatchRequests_name, cb); + + connectionFactory.flushAsyncBatchRequests(result); + adapterFactory.flushAsyncBatchRequests(result); + + // + // Inform the callback that we have finished initiating all of the + // flush requests. + // + result.ready(); + + return result; + } + + public void + end_flushBatchRequests(AsyncResult r) + { + AsyncResult.__check(r, this, __flushBatchRequests_name); + r.__wait(); } public ObjectPrx diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 9153385567e..d157b1858b1 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -617,6 +617,49 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne out.invoke(); } + private static final String __flushBatchRequests_name = "flushBatchRequests"; + + public Ice.AsyncResult + begin_flushBatchRequests() + { + return begin_flushBatchRequestsInternal(null); + } + + public Ice.AsyncResult + begin_flushBatchRequests(Callback cb) + { + return begin_flushBatchRequestsInternal(cb); + } + + public Ice.AsyncResult + begin_flushBatchRequests(Callback_Connection_flushBatchRequests cb) + { + return begin_flushBatchRequestsInternal(cb); + } + + private Ice.AsyncResult + begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb) + { + IceInternal.ConnectionBatchOutgoingAsync result = + new IceInternal.ConnectionBatchOutgoingAsync(this, _instance, __flushBatchRequests_name, cb); + try + { + result.__send(); + } + catch(LocalException __ex) + { + result.__exceptionAsync(__ex); + } + return result; + } + + public void + end_flushBatchRequests(AsyncResult r) + { + AsyncResult.__check(r, this, __flushBatchRequests_name); + r.__wait(); + } + synchronized public boolean flushBatchRequests(IceInternal.BatchOutgoing out) { diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java index 67229763453..b4bd577a414 100644 --- a/java/src/Ice/ObjectAdapterI.java +++ b/java/src/Ice/ObjectAdapterI.java @@ -732,7 +732,7 @@ public final class ObjectAdapterI implements ObjectAdapter } public void - flushBatchRequests() + flushAsyncBatchRequests(IceInternal.CommunicatorBatchOutgoingAsync outAsync) { java.util.List<IceInternal.IncomingConnectionFactory> f; synchronized(this) @@ -741,7 +741,7 @@ public final class ObjectAdapterI implements ObjectAdapter } for(IceInternal.IncomingConnectionFactory p : f) { - p.flushBatchRequests(); + p.flushAsyncBatchRequests(outAsync); } } diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java new file mode 100644 index 00000000000..3e0fbda6ae2 --- /dev/null +++ b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java @@ -0,0 +1,134 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class CommunicatorBatchOutgoingAsync extends BatchOutgoingAsync +{ + public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, + CallbackBase callback) + { + super(instance, operation, callback); + _communicator = communicator; + + // + // _useCount is initialized to 1 to prevent premature callbacks. + // The caller must invoke ready() after all flush requests have + // been initiated. + // + _useCount = 1; + + // + // Assume all connections are flushed synchronously. + // + _sentSynchronously = true; + } + + @Override + public Ice.Communicator getCommunicator() + { + return _communicator; + } + + public void flushConnection(Ice.Connection con) + { + synchronized(_monitor) + { + ++_useCount; + } + con.begin_flushBatchRequests(_cb); + } + + public void ready() + { + check(null, null, true); + } + + private void completed(Ice.AsyncResult r) + { + Ice.Connection con = r.getConnection(); + assert(con != null); + + try + { + con.end_flushBatchRequests(r); + assert(false); // completed() should only be called when an exception occurs. + } + catch(Ice.LocalException ex) + { + check(r, ex, false); + } + } + + private void sent(Ice.AsyncResult r) + { + check(r, null, r.sentSynchronously()); + } + + private void check(Ice.AsyncResult r, Ice.LocalException ex, boolean userThread) + { + boolean done = false; + + synchronized(_monitor) + { + assert(_useCount > 0); + --_useCount; + + // + // We report that the communicator flush request was sent synchronously + // if all of the connection flush requests are sent synchronously. + // + if((r != null && !r.sentSynchronously()) || ex != null) + { + _sentSynchronously = false; + } + + if(_useCount == 0) + { + done = true; + _state |= Done | OK | Sent; + _monitor.notifyAll(); + } + } + + if(done) + { + // + // sentSynchronously_ is immutable here. + // + if(!_sentSynchronously && userThread) + { + __sentAsync(); + } + else + { + assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible. + __sent(); + } + } + } + + private Ice.Communicator _communicator; + private int _useCount; + + private Ice.AsyncCallback _cb = new Ice.AsyncCallback() + { + @Override + public void completed(Ice.AsyncResult r) + { + CommunicatorBatchOutgoingAsync.this.completed(r); + } + + @Override + public void sent(Ice.AsyncResult r) + { + CommunicatorBatchOutgoingAsync.this.sent(r); + } + }; +} diff --git a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java new file mode 100644 index 00000000000..637431baa7b --- /dev/null +++ b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java @@ -0,0 +1,40 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync +{ + public ConnectionBatchOutgoingAsync(Ice.ConnectionI con, Instance instance, String operation, CallbackBase callback) + { + super(instance, operation, callback); + _connection = con; + } + + public void __send() + { + int status = _connection.flushAsyncBatchRequests(this); + if((status & AsyncStatus.Sent) > 0) + { + _sentSynchronously = true; + if((status & AsyncStatus.InvokeSentCallback) > 0) + { + __sent(); + } + } + } + + @Override + public Ice.Connection getConnection() + { + return _connection; + } + + private Ice.ConnectionI _connection; +} diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index 17d046cb4f0..48c7b1aea5f 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -149,13 +149,13 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice } public void - flushBatchRequests() + flushAsyncBatchRequests(CommunicatorBatchOutgoingAsync outAsync) { - for(Ice.ConnectionI p : connections()) // connections() is synchronized, no need to synchronize here. + for(Ice.ConnectionI c : connections()) // connections() is synchronized, no need to synchronize here. { try { - p.flushBatchRequests(); + outAsync.flushConnection(c); } catch(Ice.LocalException ex) { diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index ede817a338c..be0395fa038 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -269,27 +269,6 @@ public final class Instance return _implicitContext; } - public void - flushBatchRequests() - { - OutgoingConnectionFactory connectionFactory; - ObjectAdapterFactory adapterFactory; - - synchronized(this) - { - if(_state == StateDestroyed) - { - throw new Ice.CommunicatorDestroyedException(); - } - - connectionFactory = _outgoingConnectionFactory; - adapterFactory = _objectAdapterFactory; - } - - connectionFactory.flushBatchRequests(); - adapterFactory.flushBatchRequests(); - } - public Ice.Identity stringToIdentity(String s) { diff --git a/java/src/IceInternal/ObjectAdapterFactory.java b/java/src/IceInternal/ObjectAdapterFactory.java index 381a317e6e5..adda15e2e5d 100644 --- a/java/src/IceInternal/ObjectAdapterFactory.java +++ b/java/src/IceInternal/ObjectAdapterFactory.java @@ -179,7 +179,7 @@ public final class ObjectAdapterFactory } public void - flushBatchRequests() + flushAsyncBatchRequests(CommunicatorBatchOutgoingAsync outAsync) { java.util.List<Ice.ObjectAdapterI> adapters; synchronized(this) @@ -189,7 +189,7 @@ public final class ObjectAdapterFactory for(Ice.ObjectAdapterI adapter : adapters) { - adapter.flushBatchRequests(); + adapter.flushAsyncBatchRequests(outAsync); } } diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 638f84aab15..a3e5f01694e 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -378,22 +378,23 @@ public final class OutgoingConnectionFactory } public void - flushBatchRequests() + flushAsyncBatchRequests(CommunicatorBatchOutgoingAsync outAsync) { java.util.List<Ice.ConnectionI> c = new java.util.LinkedList<Ice.ConnectionI>(); synchronized(this) { - if(_destroyed) - { - return; - } - - for(java.util.List<Ice.ConnectionI> connectionList : _connections.values()) + if(!_destroyed) { - for(Ice.ConnectionI connection : connectionList) + for(java.util.List<Ice.ConnectionI> connectionList : _connections.values()) { - c.add(connection); + for(Ice.ConnectionI connection : connectionList) + { + if(connection.isActiveOrHolding()) + { + c.add(connection); + } + } } } } @@ -402,7 +403,7 @@ public final class OutgoingConnectionFactory { try { - conn.flushBatchRequests(); + outAsync.flushConnection(conn); } catch(Ice.LocalException ex) { |