summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/AsyncResult.java2
-rw-r--r--java/src/Ice/Callback_Communicator_flushBatchRequests.java49
-rw-r--r--java/src/Ice/Callback_Connection_flushBatchRequests.java49
-rw-r--r--java/src/Ice/Callback_Object_ice_flushBatchRequests.java5
-rw-r--r--java/src/Ice/CommunicatorI.java55
-rw-r--r--java/src/Ice/ConnectionI.java43
-rw-r--r--java/src/Ice/ObjectAdapterI.java4
-rw-r--r--java/src/IceInternal/CommunicatorBatchOutgoingAsync.java134
-rw-r--r--java/src/IceInternal/ConnectionBatchOutgoingAsync.java40
-rw-r--r--java/src/IceInternal/IncomingConnectionFactory.java6
-rw-r--r--java/src/IceInternal/Instance.java21
-rw-r--r--java/src/IceInternal/ObjectAdapterFactory.java4
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java21
13 files changed, 393 insertions, 40 deletions
diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java
index 24b17fe10db..6f09388abff 100644
--- a/java/src/Ice/AsyncResult.java
+++ b/java/src/Ice/AsyncResult.java
@@ -66,7 +66,7 @@ public class AsyncResult
{
synchronized(_monitor)
{
- return (_state & (Sent | Done)) > 0;
+ return (_state & Sent) > 0;
}
}
diff --git a/java/src/Ice/Callback_Communicator_flushBatchRequests.java b/java/src/Ice/Callback_Communicator_flushBatchRequests.java
new file mode 100644
index 00000000000..360ba67cc3b
--- /dev/null
+++ b/java/src/Ice/Callback_Communicator_flushBatchRequests.java
@@ -0,0 +1,49 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package Ice;
+
+/**
+ * Asynchronous callback base class for Communicator.begin_flushBatchRequests.
+ **/
+public abstract class Callback_Communicator_flushBatchRequests extends IceInternal.CallbackBase
+{
+ /**
+ * Called when the invocation raises an Ice run-time exception.
+ *
+ * @param ex The Ice run-time exception raised by the operation.
+ **/
+ public abstract void exception(LocalException ex);
+
+ /**
+ * Called when a queued invocation is sent successfully.
+ **/
+ public void sent(boolean sentSynchronously)
+ {
+ }
+
+ @Override
+ public final void __completed(AsyncResult __result)
+ {
+ try
+ {
+ __result.getCommunicator().end_flushBatchRequests(__result);
+ }
+ catch(LocalException __ex)
+ {
+ exception(__ex);
+ }
+ }
+
+ @Override
+ public final void __sent(AsyncResult __result)
+ {
+ sent(__result.sentSynchronously());
+ }
+}
diff --git a/java/src/Ice/Callback_Connection_flushBatchRequests.java b/java/src/Ice/Callback_Connection_flushBatchRequests.java
new file mode 100644
index 00000000000..08a1711ccff
--- /dev/null
+++ b/java/src/Ice/Callback_Connection_flushBatchRequests.java
@@ -0,0 +1,49 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package Ice;
+
+/**
+ * Asynchronous callback base class for Connection.begin_flushBatchRequests.
+ **/
+public abstract class Callback_Connection_flushBatchRequests extends IceInternal.CallbackBase
+{
+ /**
+ * Called when the invocation raises an Ice run-time exception.
+ *
+ * @param ex The Ice run-time exception raised by the operation.
+ **/
+ public abstract void exception(LocalException ex);
+
+ /**
+ * Called when a queued invocation is sent successfully.
+ **/
+ public void sent(boolean sentSynchronously)
+ {
+ }
+
+ @Override
+ public final void __completed(AsyncResult __result)
+ {
+ try
+ {
+ __result.getConnection().end_flushBatchRequests(__result);
+ }
+ catch(LocalException __ex)
+ {
+ exception(__ex);
+ }
+ }
+
+ @Override
+ public final void __sent(AsyncResult __result)
+ {
+ sent(__result.sentSynchronously());
+ }
+}
diff --git a/java/src/Ice/Callback_Object_ice_flushBatchRequests.java b/java/src/Ice/Callback_Object_ice_flushBatchRequests.java
index 423d7104992..9d0d784d2ea 100644
--- a/java/src/Ice/Callback_Object_ice_flushBatchRequests.java
+++ b/java/src/Ice/Callback_Object_ice_flushBatchRequests.java
@@ -14,4 +14,9 @@ package Ice;
**/
public abstract class Callback_Object_ice_flushBatchRequests extends OnewayCallback
{
+ @Override
+ public final void response()
+ {
+ // Not used.
+ }
}
diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java
index 38568febdaa..5dd57ea5c00 100644
--- a/java/src/Ice/CommunicatorI.java
+++ b/java/src/Ice/CommunicatorI.java
@@ -178,7 +178,60 @@ public final class CommunicatorI implements Communicator
public void
flushBatchRequests()
{
- _instance.flushBatchRequests();
+ AsyncResult r = begin_flushBatchRequests();
+ end_flushBatchRequests(r);
+ }
+
+ public AsyncResult
+ begin_flushBatchRequests()
+ {
+ return begin_flushBatchRequestsInternal(null);
+ }
+
+ public AsyncResult
+ begin_flushBatchRequests(Callback cb)
+ {
+ return begin_flushBatchRequestsInternal(cb);
+ }
+
+ public AsyncResult
+ begin_flushBatchRequests(Callback_Communicator_flushBatchRequests cb)
+ {
+ return begin_flushBatchRequestsInternal(cb);
+ }
+
+ private static final String __flushBatchRequests_name = "flushBatchRequests";
+
+ private Ice.AsyncResult
+ begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb)
+ {
+ IceInternal.OutgoingConnectionFactory connectionFactory = _instance.outgoingConnectionFactory();
+ IceInternal.ObjectAdapterFactory adapterFactory = _instance.objectAdapterFactory();
+
+ //
+ // This callback object receives the results of all invocations
+ // of Connection.begin_flushBatchRequests.
+ //
+ IceInternal.CommunicatorBatchOutgoingAsync result =
+ new IceInternal.CommunicatorBatchOutgoingAsync(this, _instance, __flushBatchRequests_name, cb);
+
+ connectionFactory.flushAsyncBatchRequests(result);
+ adapterFactory.flushAsyncBatchRequests(result);
+
+ //
+ // Inform the callback that we have finished initiating all of the
+ // flush requests.
+ //
+ result.ready();
+
+ return result;
+ }
+
+ public void
+ end_flushBatchRequests(AsyncResult r)
+ {
+ AsyncResult.__check(r, this, __flushBatchRequests_name);
+ r.__wait();
}
public ObjectPrx
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 9153385567e..d157b1858b1 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -617,6 +617,49 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
out.invoke();
}
+ private static final String __flushBatchRequests_name = "flushBatchRequests";
+
+ public Ice.AsyncResult
+ begin_flushBatchRequests()
+ {
+ return begin_flushBatchRequestsInternal(null);
+ }
+
+ public Ice.AsyncResult
+ begin_flushBatchRequests(Callback cb)
+ {
+ return begin_flushBatchRequestsInternal(cb);
+ }
+
+ public Ice.AsyncResult
+ begin_flushBatchRequests(Callback_Connection_flushBatchRequests cb)
+ {
+ return begin_flushBatchRequestsInternal(cb);
+ }
+
+ private Ice.AsyncResult
+ begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb)
+ {
+ IceInternal.ConnectionBatchOutgoingAsync result =
+ new IceInternal.ConnectionBatchOutgoingAsync(this, _instance, __flushBatchRequests_name, cb);
+ try
+ {
+ result.__send();
+ }
+ catch(LocalException __ex)
+ {
+ result.__exceptionAsync(__ex);
+ }
+ return result;
+ }
+
+ public void
+ end_flushBatchRequests(AsyncResult r)
+ {
+ AsyncResult.__check(r, this, __flushBatchRequests_name);
+ r.__wait();
+ }
+
synchronized public boolean
flushBatchRequests(IceInternal.BatchOutgoing out)
{
diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java
index 67229763453..b4bd577a414 100644
--- a/java/src/Ice/ObjectAdapterI.java
+++ b/java/src/Ice/ObjectAdapterI.java
@@ -732,7 +732,7 @@ public final class ObjectAdapterI implements ObjectAdapter
}
public void
- flushBatchRequests()
+ flushAsyncBatchRequests(IceInternal.CommunicatorBatchOutgoingAsync outAsync)
{
java.util.List<IceInternal.IncomingConnectionFactory> f;
synchronized(this)
@@ -741,7 +741,7 @@ public final class ObjectAdapterI implements ObjectAdapter
}
for(IceInternal.IncomingConnectionFactory p : f)
{
- p.flushBatchRequests();
+ p.flushAsyncBatchRequests(outAsync);
}
}
diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
new file mode 100644
index 00000000000..3e0fbda6ae2
--- /dev/null
+++ b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
@@ -0,0 +1,134 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+public class CommunicatorBatchOutgoingAsync extends BatchOutgoingAsync
+{
+ public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation,
+ CallbackBase callback)
+ {
+ super(instance, operation, callback);
+ _communicator = communicator;
+
+ //
+ // _useCount is initialized to 1 to prevent premature callbacks.
+ // The caller must invoke ready() after all flush requests have
+ // been initiated.
+ //
+ _useCount = 1;
+
+ //
+ // Assume all connections are flushed synchronously.
+ //
+ _sentSynchronously = true;
+ }
+
+ @Override
+ public Ice.Communicator getCommunicator()
+ {
+ return _communicator;
+ }
+
+ public void flushConnection(Ice.Connection con)
+ {
+ synchronized(_monitor)
+ {
+ ++_useCount;
+ }
+ con.begin_flushBatchRequests(_cb);
+ }
+
+ public void ready()
+ {
+ check(null, null, true);
+ }
+
+ private void completed(Ice.AsyncResult r)
+ {
+ Ice.Connection con = r.getConnection();
+ assert(con != null);
+
+ try
+ {
+ con.end_flushBatchRequests(r);
+ assert(false); // completed() should only be called when an exception occurs.
+ }
+ catch(Ice.LocalException ex)
+ {
+ check(r, ex, false);
+ }
+ }
+
+ private void sent(Ice.AsyncResult r)
+ {
+ check(r, null, r.sentSynchronously());
+ }
+
+ private void check(Ice.AsyncResult r, Ice.LocalException ex, boolean userThread)
+ {
+ boolean done = false;
+
+ synchronized(_monitor)
+ {
+ assert(_useCount > 0);
+ --_useCount;
+
+ //
+ // We report that the communicator flush request was sent synchronously
+ // if all of the connection flush requests are sent synchronously.
+ //
+ if((r != null && !r.sentSynchronously()) || ex != null)
+ {
+ _sentSynchronously = false;
+ }
+
+ if(_useCount == 0)
+ {
+ done = true;
+ _state |= Done | OK | Sent;
+ _monitor.notifyAll();
+ }
+ }
+
+ if(done)
+ {
+ //
+ // sentSynchronously_ is immutable here.
+ //
+ if(!_sentSynchronously && userThread)
+ {
+ __sentAsync();
+ }
+ else
+ {
+ assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible.
+ __sent();
+ }
+ }
+ }
+
+ private Ice.Communicator _communicator;
+ private int _useCount;
+
+ private Ice.AsyncCallback _cb = new Ice.AsyncCallback()
+ {
+ @Override
+ public void completed(Ice.AsyncResult r)
+ {
+ CommunicatorBatchOutgoingAsync.this.completed(r);
+ }
+
+ @Override
+ public void sent(Ice.AsyncResult r)
+ {
+ CommunicatorBatchOutgoingAsync.this.sent(r);
+ }
+ };
+}
diff --git a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
new file mode 100644
index 00000000000..637431baa7b
--- /dev/null
+++ b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
@@ -0,0 +1,40 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync
+{
+ public ConnectionBatchOutgoingAsync(Ice.ConnectionI con, Instance instance, String operation, CallbackBase callback)
+ {
+ super(instance, operation, callback);
+ _connection = con;
+ }
+
+ public void __send()
+ {
+ int status = _connection.flushAsyncBatchRequests(this);
+ if((status & AsyncStatus.Sent) > 0)
+ {
+ _sentSynchronously = true;
+ if((status & AsyncStatus.InvokeSentCallback) > 0)
+ {
+ __sent();
+ }
+ }
+ }
+
+ @Override
+ public Ice.Connection getConnection()
+ {
+ return _connection;
+ }
+
+ private Ice.ConnectionI _connection;
+}
diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java
index 17d046cb4f0..48c7b1aea5f 100644
--- a/java/src/IceInternal/IncomingConnectionFactory.java
+++ b/java/src/IceInternal/IncomingConnectionFactory.java
@@ -149,13 +149,13 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
}
public void
- flushBatchRequests()
+ flushAsyncBatchRequests(CommunicatorBatchOutgoingAsync outAsync)
{
- for(Ice.ConnectionI p : connections()) // connections() is synchronized, no need to synchronize here.
+ for(Ice.ConnectionI c : connections()) // connections() is synchronized, no need to synchronize here.
{
try
{
- p.flushBatchRequests();
+ outAsync.flushConnection(c);
}
catch(Ice.LocalException ex)
{
diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java
index ede817a338c..be0395fa038 100644
--- a/java/src/IceInternal/Instance.java
+++ b/java/src/IceInternal/Instance.java
@@ -269,27 +269,6 @@ public final class Instance
return _implicitContext;
}
- public void
- flushBatchRequests()
- {
- OutgoingConnectionFactory connectionFactory;
- ObjectAdapterFactory adapterFactory;
-
- synchronized(this)
- {
- if(_state == StateDestroyed)
- {
- throw new Ice.CommunicatorDestroyedException();
- }
-
- connectionFactory = _outgoingConnectionFactory;
- adapterFactory = _objectAdapterFactory;
- }
-
- connectionFactory.flushBatchRequests();
- adapterFactory.flushBatchRequests();
- }
-
public Ice.Identity
stringToIdentity(String s)
{
diff --git a/java/src/IceInternal/ObjectAdapterFactory.java b/java/src/IceInternal/ObjectAdapterFactory.java
index 381a317e6e5..adda15e2e5d 100644
--- a/java/src/IceInternal/ObjectAdapterFactory.java
+++ b/java/src/IceInternal/ObjectAdapterFactory.java
@@ -179,7 +179,7 @@ public final class ObjectAdapterFactory
}
public void
- flushBatchRequests()
+ flushAsyncBatchRequests(CommunicatorBatchOutgoingAsync outAsync)
{
java.util.List<Ice.ObjectAdapterI> adapters;
synchronized(this)
@@ -189,7 +189,7 @@ public final class ObjectAdapterFactory
for(Ice.ObjectAdapterI adapter : adapters)
{
- adapter.flushBatchRequests();
+ adapter.flushAsyncBatchRequests(outAsync);
}
}
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 638f84aab15..a3e5f01694e 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -378,22 +378,23 @@ public final class OutgoingConnectionFactory
}
public void
- flushBatchRequests()
+ flushAsyncBatchRequests(CommunicatorBatchOutgoingAsync outAsync)
{
java.util.List<Ice.ConnectionI> c = new java.util.LinkedList<Ice.ConnectionI>();
synchronized(this)
{
- if(_destroyed)
- {
- return;
- }
-
- for(java.util.List<Ice.ConnectionI> connectionList : _connections.values())
+ if(!_destroyed)
{
- for(Ice.ConnectionI connection : connectionList)
+ for(java.util.List<Ice.ConnectionI> connectionList : _connections.values())
{
- c.add(connection);
+ for(Ice.ConnectionI connection : connectionList)
+ {
+ if(connection.isActiveOrHolding())
+ {
+ c.add(connection);
+ }
+ }
}
}
}
@@ -402,7 +403,7 @@ public final class OutgoingConnectionFactory
{
try
{
- conn.flushBatchRequests();
+ outAsync.flushConnection(conn);
}
catch(Ice.LocalException ex)
{