summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/demo/Glacier2/callback/Client.java6
-rw-r--r--java/demo/Ice/hello/Client.java4
-rw-r--r--java/demo/Ice/swing/Client.java14
-rw-r--r--java/demo/IceBox/hello/Client.java3
-rw-r--r--java/demo/IceDiscovery/hello/Client.java4
-rw-r--r--java/src/Ice/src/main/java/Ice/BatchRequest.java33
-rw-r--r--java/src/Ice/src/main/java/Ice/BatchRequestInterceptor.java28
-rw-r--r--java/src/Ice/src/main/java/Ice/ConnectionI.java313
-rw-r--r--java/src/Ice/src/main/java/Ice/InitializationData.java5
-rw-r--r--java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java107
-rw-r--r--java/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java239
-rw-r--r--java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java249
-rw-r--r--java/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java30
-rw-r--r--java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java289
-rw-r--r--java/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java29
-rw-r--r--java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java44
-rw-r--r--java/src/Ice/src/main/java/IceInternal/FixedReference.java118
-rw-r--r--java/src/Ice/src/main/java/IceInternal/IncomingBase.java33
-rw-r--r--java/src/Ice/src/main/java/IceInternal/OutgoingAsync.java93
-rw-r--r--java/src/Ice/src/main/java/IceInternal/OutgoingAsyncBase.java23
-rw-r--r--java/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java31
-rw-r--r--java/src/Ice/src/main/java/IceInternal/ProxyGetConnection.java2
-rw-r--r--java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java31
-rw-r--r--java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java75
-rw-r--r--java/src/Ice/src/main/java/IceInternal/Reference.java4
-rw-r--r--java/src/Ice/src/main/java/IceInternal/RequestHandler.java10
-rw-r--r--java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java41
-rw-r--r--java/src/Ice/src/main/java/IceInternal/RoutableReference.java13
-rw-r--r--java/test/src/main/java/test/Ice/ami/AMI.java73
-rw-r--r--java/test/src/main/java/test/Ice/ami/lambda/AMI.java36
-rw-r--r--java/test/src/main/java/test/Ice/background/AllTests.java63
-rw-r--r--java/test/src/main/java/test/Ice/invoke/AllTests.java7
-rw-r--r--java/test/src/main/java/test/Ice/metrics/Client.java1
-rw-r--r--java/test/src/main/java/test/Ice/operations/BatchOneways.java137
-rw-r--r--java/test/src/main/java/test/Ice/operations/BatchOnewaysAMI.java94
35 files changed, 907 insertions, 1375 deletions
diff --git a/java/demo/Glacier2/callback/Client.java b/java/demo/Glacier2/callback/Client.java
index bb6c8c92d03..07eab42ad0d 100644
--- a/java/demo/Glacier2/callback/Client.java
+++ b/java/demo/Glacier2/callback/Client.java
@@ -182,12 +182,12 @@ public class Client extends Glacier2.Application
if(override != null)
{
context.put("_ovrd", override);
- }
+ }
batchOneway.initiateCallback(onewayR, context);
}
else if(line.equals("f"))
{
- communicator().flushBatchRequests();
+ batchOneway.ice_flushBatchRequests();
}
else if(line.equals("v"))
{
@@ -221,7 +221,7 @@ public class Client extends Glacier2.Application
onewayR.ice_identity(callbackReceiverIdent));
}
- System.out.println("callback receiver identity: " +
+ System.out.println("callback receiver identity: " +
communicator().identityToString(twowayR.ice_getIdentity()));
}
else if(line.equals("s"))
diff --git a/java/demo/Ice/hello/Client.java b/java/demo/Ice/hello/Client.java
index 184e5abb2f0..5a728fc79da 100644
--- a/java/demo/Ice/hello/Client.java
+++ b/java/demo/Ice/hello/Client.java
@@ -132,7 +132,8 @@ public class Client extends Ice.Application
}
else if(line.equals("f"))
{
- communicator().flushBatchRequests();
+ batchOneway.ice_flushBatchRequests();
+ batchDatagram.ice_flushBatchRequests();
}
else if(line.equals("T"))
{
@@ -237,4 +238,3 @@ public class Client extends Ice.Application
System.exit(status);
}
}
-
diff --git a/java/demo/Ice/swing/Client.java b/java/demo/Ice/swing/Client.java
index c4530badc0a..e7bb118f04d 100644
--- a/java/demo/Ice/swing/Client.java
+++ b/java/demo/Ice/swing/Client.java
@@ -460,6 +460,13 @@ public class Client extends JFrame
prx = prx.ice_invocationTimeout(timeout);
}
_helloPrx = Demo.HelloPrxHelper.uncheckedCast(prx);
+
+ //
+ // The batch requests associated to the proxy are lost when we
+ // update the proxy.
+ //
+ _flush.setEnabled(false);
+
_status.setText("Ready");
}
@@ -583,7 +590,12 @@ public class Client extends JFrame
private void flush()
{
- _communicator.begin_flushBatchRequests(new Ice.Callback_Communicator_flushBatchRequests()
+ if(_helloPrx == null)
+ {
+ return;
+ }
+
+ _helloPrx.begin_ice_flushBatchRequests(new Ice.Callback_Object_ice_flushBatchRequests()
{
@Override
public void exception(final Ice.LocalException ex)
diff --git a/java/demo/IceBox/hello/Client.java b/java/demo/IceBox/hello/Client.java
index 7ed2542cde5..f941c513f23 100644
--- a/java/demo/IceBox/hello/Client.java
+++ b/java/demo/IceBox/hello/Client.java
@@ -110,7 +110,8 @@ public class Client extends Ice.Application
}
else if(line.equals("f"))
{
- communicator().flushBatchRequests();
+ batchOneway.ice_flushBatchRequests();
+ batchDatagram.ice_flushBatchRequests();
}
else if(line.equals("x"))
{
diff --git a/java/demo/IceDiscovery/hello/Client.java b/java/demo/IceDiscovery/hello/Client.java
index 36c7cd21702..02997b9c854 100644
--- a/java/demo/IceDiscovery/hello/Client.java
+++ b/java/demo/IceDiscovery/hello/Client.java
@@ -131,7 +131,8 @@ public class Client extends Ice.Application
}
else if(line.equals("f"))
{
- communicator().flushBatchRequests();
+ batchOneway.ice_flushBatchRequests();
+ batchDatagram.ice_flushBatchRequests();
}
else if(line.equals("T"))
{
@@ -236,4 +237,3 @@ public class Client extends Ice.Application
System.exit(status);
}
}
-
diff --git a/java/src/Ice/src/main/java/Ice/BatchRequest.java b/java/src/Ice/src/main/java/Ice/BatchRequest.java
new file mode 100644
index 00000000000..eb31284a98f
--- /dev/null
+++ b/java/src/Ice/src/main/java/Ice/BatchRequest.java
@@ -0,0 +1,33 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2015 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package Ice;
+
+public interface BatchRequest
+{
+ /**
+ * Confirms the queuing of the batch request.
+ **/
+ void enqueue();
+
+ /**
+ * The marshalled size of the request.
+ **/
+ int getSize();
+
+ /**
+ * The name of the operation
+ **/
+ String getOperation();
+
+ /**
+ * The proxy used to invoke the batch request.
+ **/
+ Ice.ObjectPrx getProxy();
+};
diff --git a/java/src/Ice/src/main/java/Ice/BatchRequestInterceptor.java b/java/src/Ice/src/main/java/Ice/BatchRequestInterceptor.java
new file mode 100644
index 00000000000..42539a0d4cc
--- /dev/null
+++ b/java/src/Ice/src/main/java/Ice/BatchRequestInterceptor.java
@@ -0,0 +1,28 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2015 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package Ice;
+
+/**
+ * Base interface for listening to batch request queues.
+ **/
+public interface BatchRequestInterceptor
+{
+ /**
+ * Called by the Ice runtime when a batch request is about to be
+ * added to the batch request queue of a proxy or connection.
+ *
+ * The implementation of this method must call enqueue() on the
+ * request to confirm its addition to the queue, if not called
+ * the request isn't added to the queue. The implementation can
+ * raise an Ice local exception to notify the caller of a failure.
+ *
+ **/
+ void enqueue(Ice.BatchRequest request, int queueBatchRequestCount, int queueBatchRequestSize);
+}
diff --git a/java/src/Ice/src/main/java/Ice/ConnectionI.java b/java/src/Ice/src/main/java/Ice/ConnectionI.java
index bfe8715e339..3a028d88c72 100644
--- a/java/src/Ice/src/main/java/Ice/ConnectionI.java
+++ b/java/src/Ice/src/main/java/Ice/ConnectionI.java
@@ -318,7 +318,7 @@ public final class ConnectionI extends IceInternal.EventHandler
//
setState(StateClosed, new ConnectionTimeoutException());
}
- else if(acm.close != ACMClose.CloseOnInvocation && _dispatchCount == 0 && _batchStream.isEmpty() &&
+ else if(acm.close != ACMClose.CloseOnInvocation && _dispatchCount == 0 && _batchRequestQueue.isEmpty() &&
_asyncRequests.isEmpty())
{
//
@@ -329,7 +329,8 @@ public final class ConnectionI extends IceInternal.EventHandler
}
}
- synchronized public int sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response)
+ synchronized public int
+ sendAsyncRequest(IceInternal.OutgoingAsyncBase out, boolean compress, boolean response, int batchRequestNum)
throws IceInternal.RetryException
{
final IceInternal.BasicStream os = out.getOs();
@@ -378,6 +379,11 @@ public final class ConnectionI extends IceInternal.EventHandler
os.pos(IceInternal.Protocol.headerSize);
os.writeInt(requestId);
}
+ else if(batchRequestNum > 0)
+ {
+ os.pos(IceInternal.Protocol.headerSize);
+ os.writeInt(batchRequestNum);
+ }
out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
@@ -403,189 +409,10 @@ public final class ConnectionI extends IceInternal.EventHandler
return status;
}
- public synchronized void prepareBatchRequest(IceInternal.BasicStream os) throws IceInternal.RetryException
- {
- waitBatchStreamInUse();
-
- if(_exception != null)
- {
- //
- // If there were no batch requests queued when the connection
- // failed, we can safely retry with a new connection. Otherwise, we
- // must throw to notify the caller that some previous batch requests
- // were not sent.
- //
- if(_batchStream.isEmpty())
- {
- throw new IceInternal.RetryException((Ice.LocalException) _exception.fillInStackTrace());
- }
- else
- {
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
- }
-
- assert (_state > StateNotValidated);
- assert (_state < StateClosing);
-
- if(_batchStream.isEmpty())
- {
- try
- {
- _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr);
- }
- catch(LocalException ex)
- {
- setState(StateClosed, ex);
- throw ex;
- }
- }
-
- _batchStreamInUse = true;
- _batchMarker = _batchStream.size();
- _batchStream.swap(os);
-
- //
- // The batch stream now belongs to the caller, until
- // finishBatchRequest() or abortBatchRequest() is called.
- //
- }
-
- public void finishBatchRequest(IceInternal.BasicStream os, boolean compress)
- {
- try
- {
- synchronized(this)
- {
- //
- // Get the batch stream back.
- //
- _batchStream.swap(os);
-
- if(_exception != null)
- {
- return;
- }
-
- boolean flush = false;
- if(_batchAutoFlushSize > 0)
- {
- if(_batchStream.size() > _batchAutoFlushSize)
- {
- flush = true;
- }
-
- //
- // Throw memory limit exception if the first message added
- // causes us to go over limit. Otherwise put aside the
- // marshalled message that caused limit to be exceeded and
- // rollback stream to the marker.
- //
- try
- {
- _transceiver.checkSendSize(_batchStream.getBuffer());
- }
- catch(Ice.LocalException ex)
- {
- if(_batchRequestNum > 0)
- {
- flush = true;
- }
- else
- {
- throw ex;
- }
- }
- }
-
- if(flush)
- {
- //
- // Temporarily save the last request.
- //
- byte[] lastRequest = new byte[_batchStream.size() - _batchMarker];
- IceInternal.Buffer buffer = _batchStream.getBuffer();
- buffer.b.position(_batchMarker);
- buffer.b.get(lastRequest);
- _batchStream.resize(_batchMarker, false);
-
- //
- // Send the batch stream without the last request.
- //
- try
- {
- //
- // Fill in the number of requests in the batch.
- //
- _batchStream.pos(IceInternal.Protocol.headerSize);
- _batchStream.writeInt(_batchRequestNum);
-
- OutgoingMessage message = new OutgoingMessage(_batchStream, _batchRequestCompress, true);
- sendMessage(message);
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- assert (_exception != null);
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
-
- //
- // Reset the batch stream.
- //
- _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
-
- //
- // Start a new batch with the last message that caused us to
- // go over the limit.
- //
- _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr);
- _batchStream.writeBlob(lastRequest);
- }
-
- //
- // Increment the number of requests in the batch.
- //
- ++_batchRequestNum;
-
- //
- // We compress the whole batch if there is at least one
- // compressed
- // message.
- //
- if(compress)
- {
- _batchRequestCompress = true;
- }
-
- //
- // Notify about the batch stream not being in use anymore.
- //
- assert (_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
- }
- }
- catch(Ice.LocalException ex)
- {
- abortBatchRequest();
- throw ex;
- }
- }
-
- public synchronized void abortBatchRequest()
+ public IceInternal.BatchRequestQueue
+ getBatchRequestQueue()
{
- _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
-
- assert (_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
+ return _batchRequestQueue;
}
@Override
@@ -652,67 +479,6 @@ public final class ConnectionI extends IceInternal.EventHandler
r.__wait();
}
- synchronized public int flushAsyncBatchRequests(IceInternal.OutgoingAsyncBase outAsync)
- {
- waitBatchStreamInUse();
-
- if(_exception != null)
- {
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
-
- if(_batchRequestNum == 0)
- {
- int status = IceInternal.AsyncStatus.Sent;
- if(outAsync.sent())
- {
- status |= IceInternal.AsyncStatus.InvokeSentCallback;
- }
- return status;
- }
-
- //
- // Notify the request that it's cancelable with this connection.
- // This will throw if the request is canceled.
- //
- outAsync.cancelable(this);
-
- //
- // Fill in the number of requests in the batch.
- //
- _batchStream.pos(IceInternal.Protocol.headerSize);
- _batchStream.writeInt(_batchRequestNum);
-
- _batchStream.swap(outAsync.getOs());
-
- outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0);
-
- //
- // Send the batch stream.
- //
- int status;
- try
- {
- OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.getOs(), _batchRequestCompress, 0);
- status = sendMessage(message);
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- assert (_exception != null);
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
-
- //
- // Reset the batch stream.
- //
- _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
- return status;
- }
-
@Override
synchronized public void setCallback(final ConnectionCallback callback)
{
@@ -832,7 +598,7 @@ public final class ConnectionI extends IceInternal.EventHandler
if(outAsync instanceof IceInternal.OutgoingAsync)
{
IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync) outAsync;
- java.util.Iterator<IceInternal.OutgoingAsync> it2 = _asyncRequests.values().iterator();
+ java.util.Iterator<IceInternal.OutgoingAsyncBase> it2 = _asyncRequests.values().iterator();
while(it2.hasNext())
{
if(it2.next() == o)
@@ -1536,7 +1302,7 @@ public final class ConnectionI extends IceInternal.EventHandler
_sendStreams.clear();
}
- for(IceInternal.OutgoingAsync p : _asyncRequests.values())
+ for(IceInternal.OutgoingAsyncBase p : _asyncRequests.values())
{
if(p.completed(_exception))
{
@@ -1683,12 +1449,7 @@ public final class ConnectionI extends IceInternal.EventHandler
}
_nextRequestId = 1;
_messageSizeMax = adapter != null ? adapter.messageSizeMax() : instance.messageSizeMax();
- _batchAutoFlushSize = _instance.batchAutoFlushSize();
- _batchStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding);
- _batchStreamInUse = false;
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
+ _batchRequestQueue = new IceInternal.BatchRequestQueue(instance, _endpoint.datagram());
_readStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding);
_readHeader = false;
_readStreamPos = -1;
@@ -1913,6 +1674,8 @@ public final class ConnectionI extends IceInternal.EventHandler
return;
}
+ _batchRequestQueue.destroy(_exception);
+
//
// Don't need to close now for connections so only close the transceiver
// if the selector request it.
@@ -2505,7 +2268,7 @@ public final class ConnectionI extends IceInternal.EventHandler
byte compress;
IceInternal.ServantManager servantManager;
ObjectAdapter adapter;
- IceInternal.OutgoingAsync outAsync;
+ IceInternal.OutgoingAsyncBase outAsync;
ConnectionCallback heartbeatCallback;
int messageDispatchCount;
}
@@ -2633,7 +2396,7 @@ public final class ConnectionI extends IceInternal.EventHandler
IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
info.requestId = info.stream.readInt();
- IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId);
+ IceInternal.OutgoingAsyncBase outAsync = _asyncRequests.remove(info.requestId);
if(outAsync != null && outAsync.completed(info.stream))
{
info.outAsync = outAsync;
@@ -2966,35 +2729,6 @@ public final class ConnectionI extends IceInternal.EventHandler
}
}
- private void waitBatchStreamInUse()
- {
- //
- // This is similar to a mutex lock in that the flag is
- // only true for a short time period. As such we don't permit the
- // wait to be interrupted. Instead the interrupted status is saved
- // and restored.
- //
- boolean interrupted = false;
- while(_batchStreamInUse && _exception == null)
- {
- try
- {
- wait();
- }
- catch(InterruptedException e)
- {
- interrupted = true;
- }
- }
- //
- // Restore the interrupted flag if we were interrupted.
- //
- if(interrupted)
- {
- Thread.currentThread().interrupt();
- }
- }
-
private int read(IceInternal.Buffer buf)
{
int start = buf.b.position();
@@ -3140,18 +2874,13 @@ public final class ConnectionI extends IceInternal.EventHandler
private int _nextRequestId;
- private java.util.Map<Integer, IceInternal.OutgoingAsync> _asyncRequests =
- new java.util.HashMap<Integer, IceInternal.OutgoingAsync>();
+ private java.util.Map<Integer, IceInternal.OutgoingAsyncBase> _asyncRequests =
+ new java.util.HashMap<Integer, IceInternal.OutgoingAsyncBase>();
private LocalException _exception;
private final int _messageSizeMax;
- private final int _batchAutoFlushSize;
- private IceInternal.BasicStream _batchStream;
- private boolean _batchStreamInUse;
- private int _batchRequestNum;
- private boolean _batchRequestCompress;
- private int _batchMarker;
+ private IceInternal.BatchRequestQueue _batchRequestQueue;
private java.util.LinkedList<OutgoingMessage> _sendStreams = new java.util.LinkedList<OutgoingMessage>();
diff --git a/java/src/Ice/src/main/java/Ice/InitializationData.java b/java/src/Ice/src/main/java/Ice/InitializationData.java
index 434a52dec3f..0244df73b8b 100644
--- a/java/src/Ice/src/main/java/Ice/InitializationData.java
+++ b/java/src/Ice/src/main/java/Ice/InitializationData.java
@@ -83,4 +83,9 @@ public final class InitializationData implements Cloneable
* The compact type ID resolver.
**/
public CompactIdResolver compactIdResolver;
+
+ /**
+ * The batch request interceptor.
+ **/
+ public BatchRequestInterceptor batchRequestInterceptor;
}
diff --git a/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java b/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java
index 900d23f7e3d..6438cab69e9 100644
--- a/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java
@@ -2273,75 +2273,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final Connection
ice_getConnection()
{
- final InvocationObserver observer = IceInternal.ObserverHelper.get(this, "ice_getConnection");
- int cnt = 0;
- if(Thread.interrupted())
- {
- throw new Ice.OperationInterruptedException();
- }
- try
- {
- while(true)
- {
- IceInternal.RequestHandler handler = null;
- try
- {
- handler = __getRequestHandler();
- try
- {
- // Wait for the connection to be established.
- return handler.waitForConnection();
- }
- catch(InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- }
- catch(RetryException e)
- {
- // Clear request handler and retry.
- __setRequestHandler(handler, null);
- }
- catch(Ice.Exception ex)
- {
- try
- {
- Ice.Holder<Integer> interval = new Ice.Holder<Integer>();
- cnt = __handleException(ex, handler, OperationMode.Idempotent, false, interval, cnt);
- if(observer != null)
- {
- observer.retried();
- }
- if(interval.value > 0)
- {
- try
- {
- Thread.sleep(interval.value);
- }
- catch(InterruptedException ex1)
- {
- throw new Ice.OperationInterruptedException();
- }
- }
- }
- catch(Ice.Exception exc)
- {
- if(observer != null)
- {
- observer.failed(exc.ice_name());
- }
- throw exc;
- }
- }
- }
- }
- finally
- {
- if(observer != null)
- {
- observer.detach();
- }
- }
+ return end_ice_getConnection(begin_ice_getConnection());
}
/**
@@ -2632,7 +2564,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
__handleException(Exception ex, IceInternal.RequestHandler handler, OperationMode mode, boolean sent,
Holder<Integer> interval, int cnt)
{
- __setRequestHandler(handler, null); // Clear the request handler
+ __updateRequestHandler(handler, null); // Clear the request handler
//
// We only retry local exception, system exceptions aren't retried.
@@ -2741,7 +2673,6 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final IceInternal.RequestHandler
__getRequestHandler()
{
- IceInternal.RequestHandler handler;
if(_reference.getCacheConnection())
{
synchronized(this)
@@ -2750,19 +2681,40 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
{
return _requestHandler;
}
- handler = _reference.getInstance().requestHandlerFactory().getRequestHandler(_reference, this);
- _requestHandler = handler;
}
}
- else
+ return _reference.getRequestHandler(this);
+ }
+
+ synchronized public final IceInternal.BatchRequestQueue
+ __getBatchRequestQueue()
+ {
+ if(_batchRequestQueue == null)
+ {
+ _batchRequestQueue = _reference.getBatchRequestQueue();
+ }
+ return _batchRequestQueue;
+ }
+
+ public IceInternal.RequestHandler
+ __setRequestHandler(IceInternal.RequestHandler handler)
+ {
+ if(_reference.getCacheConnection())
{
- handler = _reference.getInstance().requestHandlerFactory().getRequestHandler(_reference, this);
+ synchronized(this)
+ {
+ if(_requestHandler == null)
+ {
+ _requestHandler = handler;
+ }
+ return _requestHandler;
+ }
}
- return handler.connect(this);
+ return handler;
}
public void
- __setRequestHandler(IceInternal.RequestHandler previous, IceInternal.RequestHandler handler)
+ __updateRequestHandler(IceInternal.RequestHandler previous, IceInternal.RequestHandler handler)
{
if(_reference.getCacheConnection() && previous != null)
{
@@ -3036,6 +2988,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
private transient IceInternal.Reference _reference;
private transient IceInternal.RequestHandler _requestHandler;
+ private transient IceInternal.BatchRequestQueue _batchRequestQueue;
private transient List<StreamCacheEntry> _streamCache;
public static final long serialVersionUID = 0L;
}
diff --git a/java/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java b/java/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java
new file mode 100644
index 00000000000..5fc582659bf
--- /dev/null
+++ b/java/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java
@@ -0,0 +1,239 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2015 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+public class BatchRequestQueue
+{
+ class BatchRequestI implements Ice.BatchRequest
+ {
+ public void reset(Ice.ObjectPrx proxy, String operation, int size)
+ {
+ _proxy = proxy;
+ _operation = operation;
+ _size = size;
+ }
+
+ @Override
+ public void enqueue()
+ {
+ enqueueBatchRequest();
+ }
+
+ @Override
+ public Ice.ObjectPrx getProxy()
+ {
+ return _proxy;
+ }
+
+ @Override
+ public String getOperation()
+ {
+ return _operation;
+ }
+
+ @Override
+ public int getSize()
+ {
+ return _size;
+ }
+
+ private Ice.ObjectPrx _proxy;
+ private String _operation;
+ private int _size;
+ };
+
+ public
+ BatchRequestQueue(Instance instance, boolean datagram)
+ {
+ Ice.InitializationData initData = instance.initializationData();
+ _interceptor = initData.batchRequestInterceptor;
+ _batchStreamInUse = false;
+ _batchRequestNum = 0;
+ _batchStream = new BasicStream(instance, Protocol.currentProtocolEncoding);
+ _batchStream.writeBlob(Protocol.requestBatchHdr);
+ _batchMarker = _batchStream.size();
+ _request = new BatchRequestI();
+
+ _maxSize = instance.batchAutoFlushSize();
+ if(_maxSize > 0 && datagram)
+ {
+ int udpSndSize = initData.properties.getPropertyAsIntWithDefault("Ice.UDP.SndSize", 65535 - _udpOverhead);
+ if(udpSndSize < _maxSize)
+ {
+ _maxSize = udpSndSize;
+ }
+ }
+ }
+
+ synchronized public void
+ prepareBatchRequest(BasicStream os)
+ {
+ if(_exception != null)
+ {
+ throw (Ice.LocalException)_exception.fillInStackTrace();
+ }
+
+ waitStreamInUse(false);
+ _batchStreamInUse = true;
+ _batchStream.swap(os);
+ }
+
+ public void
+ finishBatchRequest(BasicStream os, Ice.ObjectPrx proxy, String operation)
+ {
+ //
+ // No need for synchronization, no other threads are supposed
+ // to modify the queue since we set _batchStreamInUse to true.
+ //
+ assert(_batchStreamInUse);
+ _batchStream.swap(os);
+
+ try
+ {
+ _batchStreamCanFlush = true; // Allow flush to proceed even if the stream is marked in use.
+
+ if(_maxSize > 0 && _batchStream.size() >= _maxSize)
+ {
+ proxy.begin_ice_flushBatchRequests(); // Auto flush
+ }
+
+ assert(_batchMarker < _batchStream.size());
+ if(_interceptor != null)
+ {
+ _request.reset(proxy, operation, _batchStream.size() - _batchMarker);
+ _interceptor.enqueue(_request, _batchRequestNum, _batchMarker);
+ }
+ else
+ {
+ _batchMarker = _batchStream.size();
+ ++_batchRequestNum;
+ }
+ }
+ finally
+ {
+ synchronized(this)
+ {
+ _batchStream.resize(_batchMarker, false);
+ _batchStreamInUse = false;
+ _batchStreamCanFlush = false;
+ notifyAll();
+ }
+ }
+ }
+
+ synchronized public void
+ abortBatchRequest(BasicStream os)
+ {
+ if(_batchStreamInUse)
+ {
+ _batchStream.swap(os);
+ _batchStream.resize(_batchMarker, false);
+ _batchStreamInUse = false;
+ notifyAll();
+ }
+ }
+
+ synchronized public int
+ swap(BasicStream os)
+ {
+ if(_batchRequestNum == 0)
+ {
+ return 0;
+ }
+
+ waitStreamInUse(true);
+
+ byte[] lastRequest = null;
+ if(_batchMarker < _batchStream.size())
+ {
+ lastRequest = new byte[_batchStream.size() - _batchMarker];
+ Buffer buffer = _batchStream.getBuffer();
+ buffer.b.position(_batchMarker);
+ buffer.b.get(lastRequest);
+ _batchStream.resize(_batchMarker, false);
+ }
+
+ int requestNum = _batchRequestNum;
+ _batchStream.swap(os);
+
+ //
+ // Reset the batch.
+ //
+ _batchRequestNum = 0;
+ _batchStream.writeBlob(Protocol.requestBatchHdr);
+ _batchMarker = _batchStream.size();
+ if(lastRequest != null)
+ {
+ _batchStream.writeBlob(lastRequest);
+ }
+ return requestNum;
+ }
+
+ synchronized public void
+ destroy(Ice.LocalException ex)
+ {
+ _exception = ex;
+ }
+
+ synchronized public boolean
+ isEmpty()
+ {
+ return _batchStream.size() == Protocol.requestBatchHdr.length;
+ }
+
+ private void
+ waitStreamInUse(boolean flush)
+ {
+ //
+ // This is similar to a mutex lock in that the stream is
+ // only "locked" while marshaling. As such we don't permit the wait
+ // to be interrupted. Instead the interrupted status is saved and
+ // restored.
+ //
+ boolean interrupted = false;
+ while(_batchStreamInUse && !(flush && _batchStreamCanFlush))
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ }
+ //
+ // Restore the interrupted flag if we were interrupted.
+ //
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void enqueueBatchRequest()
+ {
+ assert(_batchMarker < _batchStream.size());
+ _batchMarker = _batchStream.size();
+ ++_batchRequestNum;
+ }
+
+ private Ice.BatchRequestInterceptor _interceptor;
+ private BasicStream _batchStream;
+ private boolean _batchStreamInUse;
+ private boolean _batchStreamCanFlush;
+ private int _batchRequestNum;
+ private int _batchMarker;
+ private BatchRequestI _request;
+ private Ice.LocalException _exception;
+ private int _maxSize;
+
+ final private static int _udpOverhead = 20 + 8;
+};
diff --git a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
index df41457e33c..ad563ecccdd 100644
--- a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
@@ -13,13 +13,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
private class InvokeAllAsync extends DispatchWorkItem
{
- private InvokeAllAsync(OutgoingAsyncBase outAsync, BasicStream os, int requestId, int invokeNum, boolean batch)
+ private InvokeAllAsync(OutgoingAsyncBase outAsync, BasicStream os, int requestId, int batchRequestNum)
{
_outAsync = outAsync;
_os = os;
_requestId = requestId;
- _invokeNum = invokeNum;
- _batch = batch;
+ _batchRequestNum = batchRequestNum;
}
@Override
@@ -27,15 +26,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
if(sentAsync(_outAsync))
{
- invokeAll(_os, _requestId, _invokeNum, _batch);
+ invokeAll(_os, _requestId, _batchRequestNum);
}
}
private final OutgoingAsyncBase _outAsync;
private BasicStream _os;
private final int _requestId;
- private final int _invokeNum;
- private final boolean _batch;
+ private final int _batchRequestNum;
}
public
@@ -43,23 +41,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
_reference = ref;
_dispatcher = ref.getInstance().initializationData().dispatcher != null;
- _response = _reference.getMode() == Reference.ModeTwoway;
_adapter = (Ice.ObjectAdapterI)adapter;
+ _response = _reference.getMode() == Reference.ModeTwoway;
_logger = _reference.getInstance().initializationData().logger; // Cached for better performance.
_traceLevels = _reference.getInstance().traceLevels(); // Cached for better performance.
- _batchAutoFlushSize = ref.getInstance().batchAutoFlushSize();
_requestId = 0;
- _batchStreamInUse = false;
- _batchRequestNum = 0;
- _batchStream = new BasicStream(ref.getInstance(), Protocol.currentProtocolEncoding);
- }
-
- @Override
- public RequestHandler
- connect(Ice.ObjectPrxHelperBase proxy)
- {
- return this;
}
@Override
@@ -70,110 +57,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
@Override
- synchronized public void
- prepareBatchRequest(BasicStream os)
- {
- waitStreamInUse();
- if(_batchStream.isEmpty())
- {
- try
- {
- _batchStream.writeBlob(Protocol.requestBatchHdr);
- }
- catch(Ice.LocalException ex)
- {
- throw ex;
- }
- }
-
- _batchStreamInUse = true;
- _batchMarker = _batchStream.size();
- _batchStream.swap(os);
- }
-
- @Override
- public void
- finishBatchRequest(BasicStream os)
- {
- try
- {
- synchronized(this)
- {
- _batchStream.swap(os);
-
- if(_batchAutoFlushSize > 0 && (_batchStream.size() > _batchAutoFlushSize))
- {
- //
- // Temporarily save the last request.
- //
- byte[] lastRequest = new byte[_batchStream.size() - _batchMarker];
- Buffer buffer = _batchStream.getBuffer();
- buffer.b.position(_batchMarker);
- buffer.b.get(lastRequest);
- _batchStream.resize(_batchMarker, false);
-
- final int invokeNum = _batchRequestNum;
- final BasicStream stream = new BasicStream(_reference.getInstance(),
- Protocol.currentProtocolEncoding);
- stream.swap(_batchStream);
-
- _adapter.getThreadPool().dispatch(
- new DispatchWorkItem()
- {
- @Override
- public void
- run()
- {
- CollocatedRequestHandler.this.invokeAll(stream, 0, invokeNum, true);
- }
- });
-
- //
- // Reset the batch.
- //
- _batchRequestNum = 0;
- _batchMarker = 0;
-
- //
- // Start a new batch with the last message that caused us to go over the limit.
- //
- _batchStream.writeBlob(Protocol.requestBatchHdr);
- _batchStream.writeBlob(lastRequest);
- }
-
- //
- // Increment the number of requests in the batch.
- //
- assert(_batchStreamInUse);
- ++_batchRequestNum;
- _batchStreamInUse = false;
- notifyAll();
- }
- }
- catch(Ice.LocalException ex)
- {
- abortBatchRequest();
- throw ex;
- }
- }
-
- @Override
- synchronized public void
- abortBatchRequest()
- {
- BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchMarker = 0;
-
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
- }
-
- @Override
public int
- sendAsyncRequest(OutgoingAsyncBase outAsync)
+ sendAsyncRequest(ProxyOutgoingAsyncBase outAsync)
{
return outAsync.invokeCollocated(this);
}
@@ -200,7 +85,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
OutgoingAsync o = (OutgoingAsync)outAsync;
assert(o != null);
- for(java.util.Map.Entry<Integer, OutgoingAsync> e : _asyncRequests.entrySet())
+ for(java.util.Map.Entry<Integer, OutgoingAsyncBase> e : _asyncRequests.entrySet())
{
if(e.getValue() == o)
{
@@ -219,7 +104,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
public void
sendResponse(int requestId, final BasicStream os, byte status, boolean amd)
{
- OutgoingAsync outAsync = null;
+ OutgoingAsyncBase outAsync = null;
synchronized(this)
{
assert(_response);
@@ -276,7 +161,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
@Override
public void
- invokeException(int requestId, Ice.LocalException ex, int invokeNum, boolean amd)
+ invokeException(int requestId, Ice.LocalException ex, int batchRequestNum, boolean amd)
{
handleException(requestId, ex, amd);
_adapter.decDirectCount();
@@ -296,14 +181,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
return null;
}
- @Override
- public Ice.ConnectionI
- waitForConnection()
- {
- return null;
- }
-
- int invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous)
+ int invokeAsyncRequest(OutgoingAsyncBase outAsync, int batchRequestNum, boolean synchronous)
{
int requestId = 0;
synchronized(this)
@@ -315,6 +193,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
requestId = ++_requestId;
_asyncRequests.put(requestId, outAsync);
}
+
_sendAsyncRequests.put(outAsync, requestId);
}
@@ -325,75 +204,33 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
//
// Treat this collocated call as if it is a synchronous invocation.
//
- if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0 || !_response)
+ if(!_response || _reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0)
{
// Don't invoke from the user thread, invocation timeouts wouldn't work otherwise.
- _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false));
+ _adapter.getThreadPool().dispatch(
+ new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, batchRequestNum));
}
else if(_dispatcher)
{
_adapter.getThreadPool().dispatchFromThisThread(
- new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false));
+ new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, batchRequestNum));
}
else // Optimization: directly call invokeAll if there's no dispatcher.
{
if(sentAsync(outAsync))
{
- invokeAll(outAsync.getOs(), requestId, 1, false);
+ invokeAll(outAsync.getOs(), requestId, batchRequestNum);
}
}
}
else
{
- _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false));
+ _adapter.getThreadPool().dispatch(
+ new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, batchRequestNum));
}
return AsyncStatus.Queued;
}
- int invokeAsyncBatchRequests(OutgoingAsyncBase outAsync)
- {
- int invokeNum;
- synchronized(this)
- {
- waitStreamInUse();
-
- invokeNum = _batchRequestNum;
- if(_batchRequestNum > 0)
- {
- outAsync.cancelable(this); // This will throw if the request is canceled
-
- _sendAsyncRequests.put(outAsync, 0);
-
- assert(!_batchStream.isEmpty());
- _batchStream.swap(outAsync.getOs());
-
- //
- // Reset the batch stream.
- //
- BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchMarker = 0;
- }
- }
-
- outAsync.attachCollocatedObserver(_adapter, 0);
-
- if(invokeNum > 0)
- {
- _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), 0, invokeNum, true));
- return AsyncStatus.Queued;
- }
- else if(outAsync.sent())
- {
- return AsyncStatus.Sent | AsyncStatus.InvokeSentCallback;
- }
- else
- {
- return AsyncStatus.Sent;
- }
- }
-
private boolean
sentAsync(final OutgoingAsyncBase outAsync)
{
@@ -420,9 +257,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
private void
- invokeAll(BasicStream os, int requestId, int invokeNum, boolean batch)
+ invokeAll(BasicStream os, int requestId, int batchRequestNum)
{
- if(batch)
+ if(batchRequestNum > 0)
{
os.pos(Protocol.requestBatchHdr.length);
}
@@ -438,13 +275,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
fillInValue(os, Protocol.headerSize, requestId);
}
- else if(batch)
+ else if(batchRequestNum > 0)
{
- fillInValue(os, Protocol.headerSize, invokeNum);
+ fillInValue(os, Protocol.headerSize, batchRequestNum);
}
TraceUtil.traceSend(os, _logger, _traceLevels);
}
+ int invokeNum = batchRequestNum > 0 ? batchRequestNum : 1;
ServantManager servantManager = _adapter.getServantManager();
try
{
@@ -502,7 +340,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
return; // Ignore exception for oneway messages.
}
- OutgoingAsync outAsync = null;
+ OutgoingAsyncBase outAsync = null;
synchronized(this)
{
outAsync = _asyncRequests.remove(requestId);
@@ -531,36 +369,6 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
private void
- waitStreamInUse()
- {
- //
- // This is similar to a mutex lock in that the stream is
- // only "locked" while marshaling. As such we don't permit the wait
- // to be interrupted. Instead the interrupted status is saved and
- // restored.
- //
- boolean interrupted = false;
- while(_batchStreamInUse)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- interrupted = true;
- }
- }
- //
- // Restore the interrupted flag if we were interrupted.
- //
- if(interrupted)
- {
- Thread.currentThread().interrupt();
- }
- }
-
- private void
fillInValue(BasicStream os, int pos, int value)
{
os.rewriteInt(value, pos);
@@ -572,7 +380,6 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
private final Ice.ObjectAdapterI _adapter;
private final Ice.Logger _logger;
private final TraceLevels _traceLevels;
- private int _batchAutoFlushSize;
private int _requestId;
@@ -582,10 +389,6 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
private java.util.Map<OutgoingAsyncBase, Integer> _sendAsyncRequests =
new java.util.HashMap<OutgoingAsyncBase, Integer>();
- private java.util.Map<Integer, OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, OutgoingAsync>();
-
- private BasicStream _batchStream;
- private boolean _batchStreamInUse;
- private int _batchRequestNum;
- private int _batchMarker;
+ private java.util.Map<Integer, OutgoingAsyncBase> _asyncRequests =
+ new java.util.HashMap<Integer, OutgoingAsyncBase>();
}
diff --git a/java/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java b/java/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java
index b4134c9dd1f..242341fb159 100644
--- a/java/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java
+++ b/java/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java
@@ -49,9 +49,9 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI
{
public FlushBatch()
{
- super(CommunicatorFlushBatch.this.getCommunicator(),
- CommunicatorFlushBatch.this._instance,
- CommunicatorFlushBatch.this.getOperation(),
+ super(CommunicatorFlushBatch.this.getCommunicator(),
+ CommunicatorFlushBatch.this._instance,
+ CommunicatorFlushBatch.this.getOperation(),
null);
}
@@ -81,7 +81,7 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI
return false;
}
- @Override
+ @Override
protected Ice.Instrumentation.InvocationObserver getObserver()
{
return CommunicatorFlushBatch.this._observer;
@@ -95,22 +95,34 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI
try
{
- if(_instance.queueRequests())
+ final FlushBatch flushBatch = new FlushBatch();
+ final int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs());
+ if(batchRequestNum == 0)
+ {
+ flushBatch.sent();
+ }
+ else if(_instance.queueRequests())
{
- _instance.getQueueExecutor().executeNoThrow(new Callable<Integer>()
+ _instance.getQueueExecutor().executeNoThrow(new Callable<Void>()
{
@Override
- public Integer call()
+ public Void call() throws RetryException
{
- return con.flushAsyncBatchRequests(new FlushBatch());
+ con.sendAsyncRequest(flushBatch, false, false, batchRequestNum);
+ return null;
}
});
}
else
{
- con.flushAsyncBatchRequests(new FlushBatch());
+ con.sendAsyncRequest(flushBatch, false, false, batchRequestNum);
}
}
+ catch(RetryException ex)
+ {
+ doCheck(false);
+ throw ex.get();
+ }
catch(Ice.LocalException ex)
{
doCheck(false);
diff --git a/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java
index 6b96a08707a..198625eeef8 100644
--- a/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java
+++ b/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java
@@ -16,136 +16,41 @@ import java.util.concurrent.Callable;
public class ConnectRequestHandler
implements RequestHandler, Reference.GetConnectionCallback, RouterInfo.AddProxyCallback
{
- static private class Request
- {
- Request(BasicStream os)
- {
- this.os = new BasicStream(os.instance(), Protocol.currentProtocolEncoding);
- this.os.swap(os);
- }
-
- Request(OutgoingAsyncBase out)
- {
- this.outAsync = out;
- }
-
- OutgoingAsyncBase outAsync = null;
- BasicStream os = null;
- }
-
- @Override
- public RequestHandler
+ synchronized public RequestHandler
connect(Ice.ObjectPrxHelperBase proxy)
{
- //
- // Initiate the connection if connect() is called by the proxy that
- // created the handler.
- //
- if(proxy == _proxy && _connect)
- {
- _connect = false; // Call getConnection only once
- _reference.getConnection(this);
- }
-
try
{
- synchronized(this)
- {
- if(!initialized())
- {
- _proxies.add(proxy);
- return this;
- }
- }
- }
- catch(Ice.LocalException ex)
- {
- throw ex;
- }
-
- if(_connectionRequestHandler != null)
- {
- proxy.__setRequestHandler(this, _connectionRequestHandler);
- return _connectionRequestHandler;
- }
- else
- {
- return this;
- }
- }
-
- @Override
- public RequestHandler
- update(RequestHandler previousHandler, RequestHandler newHandler)
- {
- return previousHandler == this ? newHandler : this;
- }
-
- @Override
- public void
- prepareBatchRequest(BasicStream os)
- throws RetryException
- {
- synchronized(this)
- {
- waitBatchRequestInProgress();
-
if(!initialized())
{
- _batchRequestInProgress = true;
- _batchStream.swap(os);
- return;
+ _proxies.add(proxy);
}
}
-
- _connection.prepareBatchRequest(os);
- }
-
- @Override
- public void
- finishBatchRequest(BasicStream os)
- {
- synchronized(this)
+ catch(Ice.LocalException ex)
{
- if(!initialized()) // This can't throw until _batchRequestInProgress = false
+ //
+ // Only throw if the connection didn't get established. If
+ // it died after being established, we allow the caller to
+ // retry the connection establishment by not throwing here.
+ //
+ if(_connection == null)
{
- assert(_batchRequestInProgress);
- _batchRequestInProgress = false;
- notifyAll();
-
- _batchStream.swap(os);
-
- _requests.add(new Request(_batchStream));
- return;
+ throw ex;
}
}
- _connection.finishBatchRequest(os, _compress);
+ return _requestHandler;
}
@Override
- public void
- abortBatchRequest()
+ public RequestHandler
+ update(RequestHandler previousHandler, RequestHandler newHandler)
{
- synchronized(this)
- {
- if(!initialized()) // This can't throw until _batchRequestInProgress = false
- {
- assert(_batchRequestInProgress);
- _batchRequestInProgress = false;
- notifyAll();
-
- BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding);
- _batchStream.swap(dummy);
-
- return;
- }
- }
- _connection.abortBatchRequest();
+ return previousHandler == this ? newHandler : this;
}
@Override
public int
- sendAsyncRequest(OutgoingAsyncBase out)
+ sendAsyncRequest(ProxyOutgoingAsyncBase out)
throws RetryException
{
synchronized(this)
@@ -159,7 +64,7 @@ public class ConnectRequestHandler
{
if(!initialized())
{
- _requests.add(new Request(out));
+ _requests.add(out);
return AsyncStatus.Queued;
}
}
@@ -168,7 +73,7 @@ public class ConnectRequestHandler
throw new RetryException(ex);
}
}
- return out.send(_connection, _compress, _response);
+ return out.invokeRemote(_connection, _compress, _response);
}
@Override
@@ -184,11 +89,11 @@ public class ConnectRequestHandler
if(!initialized())
{
- java.util.Iterator<Request> it = _requests.iterator();
+ java.util.Iterator<ProxyOutgoingAsyncBase> it = _requests.iterator();
while(it.hasNext())
{
- Request request = it.next();
- if(request.outAsync == outAsync)
+ OutgoingAsyncBase request = it.next();
+ if(request == outAsync)
{
it.remove();
if(outAsync.completed(ex))
@@ -225,26 +130,6 @@ public class ConnectRequestHandler
}
}
- @Override
- synchronized public
- ConnectionI waitForConnection()
- throws InterruptedException, RetryException
- {
- if(_exception != null)
- {
- throw new RetryException(_exception);
- }
-
- //
- // Wait for the connection establishment to complete or fail.
- //
- while(!_initialized && _exception == null)
- {
- wait();
- }
- return getConnection();
- }
-
//
// Implementation of Reference.GetConnectionCallback
//
@@ -300,14 +185,11 @@ public class ConnectRequestHandler
// Ignore
}
- for(Request request : _requests)
+ for(OutgoingAsyncBase outAsync : _requests)
{
- if(request.outAsync != null)
+ if(outAsync.completed(_exception))
{
- if(request.outAsync.completed(_exception))
- {
- request.outAsync.invokeCompletedAsync();
- }
+ outAsync.invokeCompletedAsync();
}
}
_requests.clear();
@@ -332,13 +214,19 @@ public class ConnectRequestHandler
ConnectRequestHandler(Reference ref, Ice.ObjectPrxHelperBase proxy)
{
_reference = ref;
- _connect = true;
_response = _reference.getMode() == Reference.ModeTwoway;
_proxy = (Ice.ObjectPrxHelperBase)proxy;
_initialized = false;
_flushing = false;
- _batchRequestInProgress = false;
- _batchStream = new BasicStream(ref.getInstance(), Protocol.currentProtocolEncoding);
+
+ if(_reference.getInstance().queueRequests())
+ {
+ _requestHandler = new QueueRequestHandler(_reference.getInstance(), this);
+ }
+ else
+ {
+ _requestHandler = this;
+ }
}
private boolean
@@ -415,7 +303,6 @@ public class ConnectRequestHandler
synchronized(this)
{
assert(_connection != null && !_initialized);
- waitBatchRequestInProgress();
//
// We set the _flushing flag to true to prevent any additional queuing. Callers
@@ -425,61 +312,34 @@ public class ConnectRequestHandler
_flushing = true;
}
- java.util.Iterator<Request> p = _requests.iterator(); // _requests is immutable when _flushing = true
Ice.LocalException exception = null;
- while(p.hasNext())
+ for(ProxyOutgoingAsyncBase outAsync : _requests)
{
- Request request = p.next();
try
{
- if(request.os != null)
- {
- BasicStream os = new BasicStream(request.os.instance(), Protocol.currentProtocolEncoding);
- _connection.prepareBatchRequest(os);
- try
- {
- request.os.pos(0);
- os.writeBlob(request.os.readBlob(request.os.size()));
- }
- catch(Ice.LocalException ex)
- {
- _connection.abortBatchRequest();
- throw ex;
- }
- _connection.finishBatchRequest(os, _compress);
- }
- else if((request.outAsync.send(_connection, _compress, _response) & AsyncStatus.InvokeSentCallback) > 0)
+ if((outAsync.invokeRemote(_connection, _compress, _response) & AsyncStatus.InvokeSentCallback) > 0)
{
- request.outAsync.invokeSentAsync();
+ outAsync.invokeSentAsync();
}
}
catch(RetryException ex)
{
exception = ex.get();
- try
- {
- // Remove the request handler before retrying.
- _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this);
- }
- catch(Ice.CommunicatorDestroyedException exc)
- {
- // Ignore
- }
- if(request.outAsync != null)
- {
- request.outAsync.retryException(ex.get());
- }
+
+ // Remove the request handler before retrying.
+ _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this);
+ outAsync.retryException(ex.get());
}
catch(Ice.LocalException ex)
{
exception = ex;
- if(request.outAsync != null && request.outAsync.completed(ex))
+ if(outAsync.completed(ex))
{
- request.outAsync.invokeCompletedAsync();
+ outAsync.invokeCompletedAsync();
}
}
- p.remove();
}
+ _requests.clear();
//
// If we aren't caching the connection, don't bother creating a
@@ -489,10 +349,14 @@ public class ConnectRequestHandler
//
if(_reference.getCacheConnection() && exception == null)
{
- _connectionRequestHandler = new ConnectionRequestHandler(_reference, _connection, _compress);
+ _requestHandler = new ConnectionRequestHandler(_reference, _connection, _compress);
+ if(_reference.getInstance().queueRequests())
+ {
+ _requestHandler = new QueueRequestHandler(_reference.getInstance(), _requestHandler);
+ }
for(Ice.ObjectPrxHelperBase proxy : _proxies)
{
- proxy.__setRequestHandler(this, _connectionRequestHandler);
+ proxy.__updateRequestHandler(this, _requestHandler);
}
}
@@ -502,56 +366,22 @@ public class ConnectRequestHandler
_exception = exception;
_initialized = _exception == null;
_flushing = false;
- try
- {
- //
- // Only remove once all the requests are flushed to
- // guarantee serialization.
- //
- _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this);
- }
- catch(Ice.CommunicatorDestroyedException ex)
- {
- // Ignore
- }
+
+ //
+ // Only remove once all the requests are flushed to
+ // guarantee serialization.
+ //
+ _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this);
+
_proxies.clear();
_proxy = null; // Break cyclic reference count.
notifyAll();
}
}
- private void
- waitBatchRequestInProgress()
- {
- //
- // This is similar to a mutex lock in that the stream is
- // only "locked" while the request is in progress.
- //
- boolean interrupted = false;
- while(_batchRequestInProgress)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- interrupted = true;
- }
- }
- //
- // Restore the interrupted flag if we were interrupted.
- //
- if(interrupted)
- {
- Thread.currentThread().interrupt();
- }
- }
-
private final Reference _reference;
- private boolean _connect;
private boolean _response;
-
+
private Ice.ObjectPrxHelperBase _proxy;
private java.util.Set<Ice.ObjectPrxHelperBase> _proxies = new java.util.HashSet<Ice.ObjectPrxHelperBase>();
@@ -561,9 +391,6 @@ public class ConnectRequestHandler
private boolean _initialized;
private boolean _flushing;
- private java.util.List<Request> _requests = new java.util.LinkedList<Request>();
- private boolean _batchRequestInProgress;
- private BasicStream _batchStream;
-
- private RequestHandler _connectionRequestHandler;
+ private java.util.List<ProxyOutgoingAsyncBase> _requests = new java.util.LinkedList<ProxyOutgoingAsyncBase>();
+ private RequestHandler _requestHandler;
}
diff --git a/java/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java b/java/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java
index 0a52e4162aa..7f9ed1de6f5 100644
--- a/java/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java
+++ b/java/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java
@@ -41,28 +41,38 @@ public class ConnectionFlushBatch extends OutgoingAsyncBase
{
return _connection;
}
-
+
public void invoke()
{
try
{
+ final int batchRequestNum = _connection.getBatchRequestQueue().swap(_os);
+
int status;
- if(_instance.queueRequests())
+ if(batchRequestNum == 0)
+ {
+ status = IceInternal.AsyncStatus.Sent;
+ if(sent())
+ {
+ status |= IceInternal.AsyncStatus.InvokeSentCallback;
+ }
+ }
+ else if(_instance.queueRequests())
{
status = _instance.getQueueExecutor().executeNoThrow(new Callable<Integer>()
{
@Override
- public Integer call()
+ public Integer call() throws RetryException
{
- return _connection.flushAsyncBatchRequests(ConnectionFlushBatch.this);
+ return _connection.sendAsyncRequest(ConnectionFlushBatch.this, false, false, batchRequestNum);
}
});
}
else
{
- status = _connection.flushAsyncBatchRequests(this);
+ status = _connection.sendAsyncRequest(this, false, false, batchRequestNum);
}
-
+
if((status & AsyncStatus.Sent) > 0)
{
_sentSynchronously = true;
@@ -72,6 +82,13 @@ public class ConnectionFlushBatch extends OutgoingAsyncBase
}
}
}
+ catch(RetryException ex)
+ {
+ if(completed(ex.get()))
+ {
+ invokeCompletedAsync();
+ }
+ }
catch(Ice.Exception ex)
{
if(completed(ex))
diff --git a/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java
index 36edd3bfcf8..398cc0836f0 100644
--- a/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java
+++ b/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java
@@ -12,14 +12,7 @@ package IceInternal;
public class ConnectionRequestHandler implements RequestHandler
{
@Override
- public RequestHandler
- connect(Ice.ObjectPrxHelperBase proxy)
- {
- return this;
- }
-
- @Override
- public RequestHandler
+ public RequestHandler
update(RequestHandler previousHandler, RequestHandler newHandler)
{
try
@@ -44,34 +37,12 @@ public class ConnectionRequestHandler implements RequestHandler
}
return this;
}
-
- @Override
- public void
- prepareBatchRequest(BasicStream out)
- throws RetryException
- {
- _connection.prepareBatchRequest(out);
- }
@Override
- public void
- finishBatchRequest(BasicStream out)
- {
- _connection.finishBatchRequest(out, _compress);
- }
-
- @Override
- public void
- abortBatchRequest()
- {
- _connection.abortBatchRequest();
- }
-
- @Override
- public int sendAsyncRequest(OutgoingAsyncBase out)
+ public int sendAsyncRequest(ProxyOutgoingAsyncBase out)
throws RetryException
{
- return out.send(_connection, _compress, _response);
+ return out.invokeRemote(_connection, _compress, _response);
}
@Override
@@ -95,14 +66,7 @@ public class ConnectionRequestHandler implements RequestHandler
return _connection;
}
- @Override
- public Ice.ConnectionI
- waitForConnection()
- {
- return _connection;
- }
-
- public ConnectionRequestHandler(Reference ref, Ice.ConnectionI connection, boolean compress)
+ public ConnectionRequestHandler(Reference ref, Ice.ConnectionI connection, boolean compress)
{
_reference = ref;
_response = _reference.getMode() == Reference.ModeTwoway;
diff --git a/java/src/Ice/src/main/java/IceInternal/FixedReference.java b/java/src/Ice/src/main/java/IceInternal/FixedReference.java
index 6bf2afc52b5..20c108471f8 100644
--- a/java/src/Ice/src/main/java/IceInternal/FixedReference.java
+++ b/java/src/Ice/src/main/java/IceInternal/FixedReference.java
@@ -210,75 +210,81 @@ public class FixedReference extends Reference
}
@Override
- public void
- getConnection(GetConnectionCallback callback)
+ public RequestHandler
+ getRequestHandler(Ice.ObjectPrxHelperBase proxy)
{
- try
+ switch(getMode())
+ {
+ case Reference.ModeTwoway:
+ case Reference.ModeOneway:
+ case Reference.ModeBatchOneway:
{
- switch(getMode())
+ if(_fixedConnection.endpoint().datagram())
{
- case Reference.ModeTwoway:
- case Reference.ModeOneway:
- case Reference.ModeBatchOneway:
- {
- if(_fixedConnection.endpoint().datagram())
- {
- throw new Ice.NoEndpointException("");
- }
- break;
- }
-
- case Reference.ModeDatagram:
- case Reference.ModeBatchDatagram:
- {
- if(!_fixedConnection.endpoint().datagram())
- {
- throw new Ice.NoEndpointException("");
- }
- break;
- }
+ throw new Ice.NoEndpointException("");
}
+ break;
+ }
- //
- // If a secure connection is requested or secure overrides is set,
- // check if the connection is secure.
- //
- boolean secure;
- DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides();
- if(defaultsAndOverrides.overrideSecure)
- {
- secure = defaultsAndOverrides.overrideSecureValue;
- }
- else
- {
- secure = getSecure();
- }
- if(secure && !_fixedConnection.endpoint().secure())
+ case Reference.ModeDatagram:
+ case Reference.ModeBatchDatagram:
+ {
+ if(!_fixedConnection.endpoint().datagram())
{
throw new Ice.NoEndpointException("");
}
+ break;
+ }
+ }
+
+ //
+ // If a secure connection is requested or secure overrides is set,
+ // check if the connection is secure.
+ //
+ boolean secure;
+ DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideSecure)
+ {
+ secure = defaultsAndOverrides.overrideSecureValue;
+ }
+ else
+ {
+ secure = getSecure();
+ }
+ if(secure && !_fixedConnection.endpoint().secure())
+ {
+ throw new Ice.NoEndpointException("");
+ }
- _fixedConnection.throwException(); // Throw in case our connection is already destroyed.
+ _fixedConnection.throwException(); // Throw in case our connection is already destroyed.
- boolean compress;
- if(defaultsAndOverrides.overrideCompress)
- {
- compress = defaultsAndOverrides.overrideCompressValue;
- }
- else if(_overrideCompress)
- {
- compress = _compress;
- }
- else
- {
- compress = _fixedConnection.endpoint().compress();
- }
- callback.setConnection(_fixedConnection, compress);
+ boolean compress;
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ compress = defaultsAndOverrides.overrideCompressValue;
}
- catch(Ice.LocalException ex)
+ else if(_overrideCompress)
{
- callback.setException(ex);
+ compress = _compress;
}
+ else
+ {
+ compress = _fixedConnection.endpoint().compress();
+ }
+
+ RequestHandler handler = new ConnectionRequestHandler(this, _fixedConnection, compress);
+ if(getInstance().queueRequests())
+ {
+ handler = new QueueRequestHandler(getInstance(), handler);
+ }
+ return proxy.__setRequestHandler(handler);
+ }
+
+ @Override
+ public BatchRequestQueue
+ getBatchRequestQueue()
+ {
+ return _fixedConnection.getBatchRequestQueue();
}
@Override
diff --git a/java/src/Ice/src/main/java/IceInternal/IncomingBase.java b/java/src/Ice/src/main/java/IceInternal/IncomingBase.java
index 0ce16f152ec..b2c6871a14a 100644
--- a/java/src/Ice/src/main/java/IceInternal/IncomingBase.java
+++ b/java/src/Ice/src/main/java/IceInternal/IncomingBase.java
@@ -93,21 +93,15 @@ class IncomingBase
public BasicStream
__startWriteParams(Ice.FormatType format)
{
- if(_response)
+ if(!_response)
{
- assert(_os.size() == Protocol.headerSize + 4); // Reply status position.
- assert(_current.encoding != null); // Encoding for reply is known.
- _os.writeByte((byte)0);
- _os.startWriteEncaps(_current.encoding, format);
+ throw new Ice.MarshalException("can't marshal out parameters for oneway dispatch");
}
- //
- // We still return the stream even if no response is expected. The
- // servant code might still write some out parameters if for
- // example a method with out parameters somehow and erroneously
- // invoked as oneway (or if the invocation is invoked on a
- // blobject and the blobject erroneously writes a response).
- //
+ assert(_os.size() == Protocol.headerSize + 4); // Reply status position.
+ assert(_current.encoding != null); // Encoding for reply is known.
+ _os.writeByte((byte)0);
+ _os.startWriteEncaps(_current.encoding, format);
return _os;
}
@@ -119,14 +113,13 @@ class IncomingBase
_observer.userException();
}
- if(_response)
- {
- int save = _os.pos();
- _os.pos(Protocol.headerSize + 4); // Reply status position.
- _os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException);
- _os.pos(save);
- _os.endWriteEncaps();
- }
+ assert(_response);
+
+ int save = _os.pos();
+ _os.pos(Protocol.headerSize + 4); // Reply status position.
+ _os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException);
+ _os.pos(save);
+ _os.endWriteEncaps();
}
public void
diff --git a/java/src/Ice/src/main/java/IceInternal/OutgoingAsync.java b/java/src/Ice/src/main/java/IceInternal/OutgoingAsync.java
index 080f81b68a2..5c6371d4ab1 100644
--- a/java/src/Ice/src/main/java/IceInternal/OutgoingAsync.java
+++ b/java/src/Ice/src/main/java/IceInternal/OutgoingAsync.java
@@ -65,31 +65,7 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase
case Reference.ModeBatchOneway:
case Reference.ModeBatchDatagram:
{
- while(true)
- {
- try
- {
- _handler = _proxy.__getRequestHandler();
- _handler.prepareBatchRequest(_os);
- break;
- }
- catch(RetryException ex)
- {
- // Clear request handler and retry.
- _proxy.__setRequestHandler(_handler, null);
- }
- catch(Ice.LocalException ex)
- {
- if(_observer != null)
- {
- _observer.failed(ex.ice_name());
- }
- // Clear request handler
- _proxy.__setRequestHandler(_handler, null);
- _handler = null;
- throw ex;
- }
- }
+ _proxy.__getBatchRequestQueue().prepareBatchRequest(_os);
break;
}
}
@@ -149,10 +125,10 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase
}
@Override
- public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException
+ public int invokeRemote(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException
{
_cachedConnection = connection;
- return connection.sendAsyncRequest(this, compress, response);
+ return connection.sendAsyncRequest(this, compress, response, 0);
}
@Override
@@ -164,7 +140,7 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase
// Disable caching by marking the streams as cached!
_state |= StateCachedBuffers;
}
- return handler.invokeAsyncRequest(this, _synchronous);
+ return handler.invokeAsyncRequest(this, 0, _synchronous);
}
@Override
@@ -173,15 +149,12 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase
int mode = _proxy.__reference().getMode();
if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
{
- if(_handler != null)
- {
- //
- // If we didn't finish a batch oneway or datagram request, we
- // must notify the connection about that we give up ownership
- // of the batch stream.
- //
- _handler.abortBatchRequest();
- }
+ //
+ // If we didn't finish a batch oneway or datagram request, we
+ // must notify the connection about that we give up ownership
+ // of the batch stream.
+ //
+ _proxy.__getBatchRequestQueue().abortBatchRequest(_os);
}
super.abort(ex);
@@ -192,23 +165,25 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase
int mode = _proxy.__reference().getMode();
if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
{
- if(_handler != null)
- {
- _sentSynchronously = true;
- _handler.finishBatchRequest(_os);
- finished(true);
- }
- return; // Don't call sent/completed callback for batch AMI requests
+ //
+ // NOTE: we don't call sent/completed callbacks for batch AMI requests
+ //
+ _sentSynchronously = true;
+ _proxy.__getBatchRequestQueue().finishBatchRequest(_os, _proxy, getOperation());
+ finished(true);
+ }
+ else
+ {
+ //
+ // NOTE: invokeImpl doesn't throw so this can be called from the
+ // try block with the catch block calling abort() in case of an
+ // exception.
+ //
+ invokeImpl(true); // userThread = true
}
-
- //
- // NOTE: invokeImpl doesn't throw so this can be called from the
- // try block with the catch block calling abort() in case of an
- // exception.
- //
- invokeImpl(true); // userThread = true
}
+ @Override
public final boolean completed(BasicStream is)
{
//
@@ -218,14 +193,14 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase
//
assert(_proxy.ice_isTwoway()); // Can only be called for twoways.
-
+
if(_childObserver != null)
{
_childObserver.reply(is.size() - Protocol.headerSize - 4);
_childObserver.detach();
_childObserver = null;
}
-
+
byte replyStatus;
try
{
@@ -236,14 +211,14 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase
}
_is.swap(is);
replyStatus = _is.readByte();
-
+
switch(replyStatus)
{
case ReplyStatus.replyOK:
{
break;
}
-
+
case ReplyStatus.replyUserException:
{
if(_observer != null)
@@ -252,14 +227,14 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase
}
break;
}
-
+
case ReplyStatus.replyObjectNotExist:
case ReplyStatus.replyFacetNotExist:
case ReplyStatus.replyOperationNotExist:
{
Ice.Identity id = new Ice.Identity();
id.__read(_is);
-
+
//
// For compatibility with the old FacetPath.
//
@@ -277,9 +252,9 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase
{
facet = "";
}
-
+
String operation = _is.readString();
-
+
Ice.RequestFailedException ex = null;
switch(replyStatus)
{
diff --git a/java/src/Ice/src/main/java/IceInternal/OutgoingAsyncBase.java b/java/src/Ice/src/main/java/IceInternal/OutgoingAsyncBase.java
index e89bf01f5c9..48b231b22da 100644
--- a/java/src/Ice/src/main/java/IceInternal/OutgoingAsyncBase.java
+++ b/java/src/Ice/src/main/java/IceInternal/OutgoingAsyncBase.java
@@ -16,31 +16,20 @@ package IceInternal;
//
public abstract class OutgoingAsyncBase extends IceInternal.AsyncResultI
{
- public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException
- {
- assert(false); // This should be overriden if this object is used with a request handler
- return AsyncStatus.Queued;
- }
-
- public int invokeCollocated(CollocatedRequestHandler handler)
- {
- assert(false); // This should be overriden if this object is used with a request handler
- return AsyncStatus.Queued;
- }
-
public boolean sent()
{
return sent(true);
}
- public boolean completed(Ice.Exception ex)
+ public boolean completed(BasicStream is)
{
- return finished(ex);
+ assert(false); // Must be implemented by classes that handle responses
+ return false;
}
- public void retryException(Ice.Exception ex)
+ public boolean completed(Ice.Exception ex)
{
- assert(false);
+ return finished(ex);
}
public final void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId)
@@ -55,7 +44,7 @@ public abstract class OutgoingAsyncBase extends IceInternal.AsyncResultI
}
}
}
-
+
public final void attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId)
{
if(_observer != null)
diff --git a/java/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java b/java/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java
index 865be583cc2..78c9f69b174 100644
--- a/java/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java
+++ b/java/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java
@@ -28,19 +28,28 @@ public class ProxyFlushBatch extends ProxyOutgoingAsyncBase
{
super(prx, operation, callback);
_observer = ObserverHelper.get(prx, operation);
+ _batchRequestNum = prx.__getBatchRequestQueue().swap(_os);
}
@Override
- public int send(Ice.ConnectionI connection, boolean compress, boolean response)
+ public int invokeRemote(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException
{
+ if(_batchRequestNum == 0)
+ {
+ return sent() ? AsyncStatus.Sent | AsyncStatus.InvokeSentCallback : AsyncStatus.Sent;
+ }
_cachedConnection = connection;
- return connection.flushAsyncBatchRequests(this);
+ return connection.sendAsyncRequest(this, compress, false, _batchRequestNum);
}
@Override
public int invokeCollocated(CollocatedRequestHandler handler)
{
- return handler.invokeAsyncBatchRequests(this);
+ if(_batchRequestNum == 0)
+ {
+ return sent() ? AsyncStatus.Sent | AsyncStatus.InvokeSentCallback : AsyncStatus.Sent;
+ }
+ return handler.invokeAsyncRequest(this, _batchRequestNum, false);
}
public void invoke()
@@ -48,18 +57,6 @@ public class ProxyFlushBatch extends ProxyOutgoingAsyncBase
Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol()));
invokeImpl(true); // userThread = true
}
-
- @Override
- protected void handleRetryException(Ice.Exception exc)
- {
- _proxy.__setRequestHandler(_handler, null); // Clear request handler
- throw exc; // No retries, we want to notify the user of potentially lost batch requests
- }
-
- @Override
- protected int handleException(Ice.Exception exc)
- {
- _proxy.__setRequestHandler(_handler, null); // Clear request handler
- throw exc; // No retries, we want to notify the user of potentially lost batch requests
- }
+
+ protected int _batchRequestNum;
}
diff --git a/java/src/Ice/src/main/java/IceInternal/ProxyGetConnection.java b/java/src/Ice/src/main/java/IceInternal/ProxyGetConnection.java
index 16069d4c423..753a921eb75 100644
--- a/java/src/Ice/src/main/java/IceInternal/ProxyGetConnection.java
+++ b/java/src/Ice/src/main/java/IceInternal/ProxyGetConnection.java
@@ -31,7 +31,7 @@ public class ProxyGetConnection extends ProxyOutgoingAsyncBase
}
@Override
- public int send(Ice.ConnectionI connection, boolean compress, boolean response)
+ public int invokeRemote(Ice.ConnectionI connection, boolean compress, boolean response)
throws RetryException
{
_cachedConnection = connection;
diff --git a/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java b/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java
index 8a3a3d1e45e..77d5c1cf0fc 100644
--- a/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java
+++ b/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java
@@ -30,6 +30,10 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
}
}
+ public abstract int invokeRemote(Ice.ConnectionI con, boolean compress, boolean response) throws RetryException;
+
+ public abstract int invokeCollocated(CollocatedRequestHandler handler);
+
@Override
public Ice.ObjectPrx getProxy()
{
@@ -45,7 +49,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
_childObserver.detach();
_childObserver = null;
}
-
+
//
// NOTE: at this point, synchronization isn't needed, no other threads should be
// calling on the callback.
@@ -66,7 +70,6 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
}
}
- @Override
public void retryException(Ice.Exception ex)
{
try
@@ -77,7 +80,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
// require could end up waiting for the flush of the
// connection to be done.
//
- handleRetryException(ex);
+ _proxy.__updateRequestHandler(_handler, null); // Clear request handler and always retry.
_instance.retryQueue().add(this, 0);
}
catch(Ice.Exception exc)
@@ -93,7 +96,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
{
invokeImpl(false);
}
-
+
public void cancelable(final CancellationHandler handler)
{
if(_proxy.__reference().getInvocationTimeout() == -2 && _cachedConnection != null)
@@ -114,7 +117,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
}
super.cancelable(handler);
}
-
+
public void abort(Ice.Exception ex)
{
assert(_childObserver == null);
@@ -141,7 +144,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
_cnt = 0;
_sent = false;
}
-
+
protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, String op, CallbackBase delegate, BasicStream os)
{
super(prx.ice_getCommunicator(), prx.__reference().getInstance(), op, delegate, os);
@@ -162,7 +165,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
}
return r;
}
-
+
protected void invokeImpl(boolean userThread)
{
try
@@ -196,6 +199,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
try
{
_sent = false;
+ _handler = null;
_handler = _proxy.__getRequestHandler();
int status = _handler.sendAsyncRequest(this);
if((status & AsyncStatus.Sent) > 0)
@@ -220,7 +224,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
}
catch(RetryException ex)
{
- handleRetryException(ex.get());
+ _proxy.__updateRequestHandler(_handler, null); // Clear request handler and always retry.
}
catch(Ice.Exception ex)
{
@@ -248,8 +252,8 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
//
// If called from the user thread we re-throw, the exception
// will be catch by the caller and abort() will be called.
- //
- if(userThread)
+ //
+ if(userThread)
{
throw ex;
}
@@ -297,18 +301,13 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
return super.finished(ok);
}
- protected void handleRetryException(Ice.Exception exc)
- {
- _proxy.__setRequestHandler(_handler, null); // Clear request handler and always retry.
- }
-
protected int handleException(Ice.Exception exc)
{
Ice.Holder<Integer> interval = new Ice.Holder<Integer>();
_cnt = _proxy.__handleException(exc, _handler, _mode, _sent, interval, _cnt);
return interval.value;
}
-
+
final protected Ice.ObjectPrxHelperBase _proxy;
protected RequestHandler _handler;
protected Ice.OperationMode _mode;
diff --git a/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java
index 1b5a87578cb..6952862e508 100644
--- a/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java
+++ b/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java
@@ -24,29 +24,13 @@ public class QueueRequestHandler implements RequestHandler
}
@Override
- public RequestHandler
- connect(final Ice.ObjectPrxHelperBase proxy)
- {
- _executor.executeNoThrow(new Callable<Void>()
- {
- @Override
- public Void call()
- {
- _delegate.connect(proxy);
- return null;
- }
- });
- return this;
- }
-
- @Override
- public RequestHandler
+ public RequestHandler
update(RequestHandler previousHandler, RequestHandler newHandler)
{
//
// Only update to new handler if the previous handler matches this one.
//
- if(previousHandler == this)
+ if(previousHandler == this || previousHandler == _delegate)
{
if(newHandler != null)
{
@@ -59,55 +43,10 @@ public class QueueRequestHandler implements RequestHandler
}
return this;
}
-
- @Override
- public void
- prepareBatchRequest(final BasicStream out) throws RetryException
- {
- _executor.execute(new Callable<Void>()
- {
- @Override
- public Void call() throws RetryException
- {
- _delegate.prepareBatchRequest(out);
- return null;
- }
- });
- }
-
- @Override
- public void
- finishBatchRequest(final BasicStream out)
- {
- _executor.executeNoThrow(new Callable<Void>()
- {
- @Override
- public Void call() throws RetryException
- {
- _delegate.finishBatchRequest(out);
- return null;
- }
- });
- }
-
- @Override
- public void
- abortBatchRequest()
- {
- _executor.executeNoThrow(new Callable<Void>()
- {
- @Override
- public Void call()
- {
- _delegate.abortBatchRequest();
- return null;
- }
- });
- }
@Override
public int
- sendAsyncRequest(final OutgoingAsyncBase out) throws RetryException
+ sendAsyncRequest(final ProxyOutgoingAsyncBase out) throws RetryException
{
return _executor.execute(new Callable<Integer>()
{
@@ -148,14 +87,6 @@ public class QueueRequestHandler implements RequestHandler
return _delegate.getConnection();
}
- @Override
- public ConnectionI
- waitForConnection()
- throws InterruptedException, RetryException
- {
- return _delegate.waitForConnection();
- }
-
private final RequestHandler _delegate;
private final QueueExecutorService _executor;
}
diff --git a/java/src/Ice/src/main/java/IceInternal/Reference.java b/java/src/Ice/src/main/java/IceInternal/Reference.java
index 8e70e0b438d..d06d45080e2 100644
--- a/java/src/Ice/src/main/java/IceInternal/Reference.java
+++ b/java/src/Ice/src/main/java/IceInternal/Reference.java
@@ -412,7 +412,9 @@ public abstract class Reference implements Cloneable
//
public abstract java.util.Map<String, String> toProperty(String prefix);
- public abstract void getConnection(GetConnectionCallback callback);
+ public abstract RequestHandler getRequestHandler(Ice.ObjectPrxHelperBase proxy);
+
+ public abstract BatchRequestQueue getBatchRequestQueue();
@Override
public boolean
diff --git a/java/src/Ice/src/main/java/IceInternal/RequestHandler.java b/java/src/Ice/src/main/java/IceInternal/RequestHandler.java
index 4130f27217f..370bbff0901 100644
--- a/java/src/Ice/src/main/java/IceInternal/RequestHandler.java
+++ b/java/src/Ice/src/main/java/IceInternal/RequestHandler.java
@@ -11,20 +11,12 @@ package IceInternal;
public interface RequestHandler extends CancellationHandler
{
- RequestHandler connect(Ice.ObjectPrxHelperBase proxy);
RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler);
- void prepareBatchRequest(BasicStream out)
- throws RetryException;
- void finishBatchRequest(BasicStream out);
- void abortBatchRequest();
-
- int sendAsyncRequest(OutgoingAsyncBase out)
+ int sendAsyncRequest(ProxyOutgoingAsyncBase out)
throws RetryException;
Reference getReference();
Ice.ConnectionI getConnection();
- Ice.ConnectionI waitForConnection()
- throws InterruptedException, RetryException;
}
diff --git a/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java b/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java
index 5e45d69b8a5..1c505f883e0 100644
--- a/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java
+++ b/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java
@@ -11,6 +11,7 @@ package IceInternal;
import java.util.Map;
import java.util.HashMap;
+import java.util.concurrent.Callable;
public final class RequestHandlerFactory
{
@@ -19,19 +20,20 @@ public final class RequestHandlerFactory
_instance = instance;
}
- public RequestHandler
- getRequestHandler(Reference ref, Ice.ObjectPrxHelperBase proxy)
+ public RequestHandler
+ getRequestHandler(final RoutableReference ref, Ice.ObjectPrxHelperBase proxy)
{
if(ref.getCollocationOptimized())
{
Ice.ObjectAdapter adapter = _instance.objectAdapterFactory().findObjectAdapter(proxy);
if(adapter != null)
{
- return new CollocatedRequestHandler(ref, adapter);
+ return proxy.__setRequestHandler(new CollocatedRequestHandler(ref, adapter));
}
}
- RequestHandler handler;
+ ConnectRequestHandler handler = null;
+ boolean connect = false;
if(ref.getCacheConnection())
{
synchronized(this)
@@ -41,25 +43,40 @@ public final class RequestHandlerFactory
{
handler = new ConnectRequestHandler(ref, proxy);
_handlers.put(ref, handler);
+ connect = true;
}
}
}
else
{
handler = new ConnectRequestHandler(ref, proxy);
+ connect = true;
}
- if(_instance.queueRequests())
+ if(connect)
{
- return new QueueRequestHandler(_instance, handler);
- }
- else
- {
- return handler;
+ if(_instance.queueRequests())
+ {
+ final ConnectRequestHandler h = handler;
+ _instance.getQueueExecutor().executeNoThrow(new Callable<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ ref.getConnection(h);
+ return null;
+ }
+ });
+ }
+ else
+ {
+ ref.getConnection(handler);
+ }
}
+ return proxy.__setRequestHandler(handler.connect(proxy));
}
- void
+ void
removeRequestHandler(Reference ref, RequestHandler handler)
{
if(ref.getCacheConnection())
@@ -75,5 +92,5 @@ public final class RequestHandlerFactory
}
private final Instance _instance;
- private final Map<Reference, RequestHandler> _handlers = new HashMap<Reference, RequestHandler>();
+ private final Map<Reference, ConnectRequestHandler> _handlers = new HashMap<Reference, ConnectRequestHandler>();
}
diff --git a/java/src/Ice/src/main/java/IceInternal/RoutableReference.java b/java/src/Ice/src/main/java/IceInternal/RoutableReference.java
index 130dc8e9d0b..4181ffda251 100644
--- a/java/src/Ice/src/main/java/IceInternal/RoutableReference.java
+++ b/java/src/Ice/src/main/java/IceInternal/RoutableReference.java
@@ -496,6 +496,19 @@ public class RoutableReference extends Reference
}
@Override
+ public RequestHandler
+ getRequestHandler(Ice.ObjectPrxHelperBase proxy)
+ {
+ return getInstance().requestHandlerFactory().getRequestHandler(this, proxy);
+ }
+
+ @Override
+ public BatchRequestQueue
+ getBatchRequestQueue()
+ {
+ return new BatchRequestQueue(getInstance(), getMode() == Reference.ModeBatchDatagram);
+ }
+
public void
getConnection(final GetConnectionCallback callback)
{
diff --git a/java/test/src/main/java/test/Ice/ami/AMI.java b/java/test/src/main/java/test/Ice/ami/AMI.java
index 6a1e709f66b..db1cb054a94 100644
--- a/java/test/src/main/java/test/Ice/ami/AMI.java
+++ b/java/test/src/main/java/test/Ice/ami/AMI.java
@@ -1818,7 +1818,7 @@ public class AMI
TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
b1.opBatch();
b1.ice_getConnection().close(false);
- final FlushExCallback cb = new FlushExCallback();
+ final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = b1.begin_ice_flushBatchRequests(
new Ice.Callback()
{
@@ -1835,9 +1835,9 @@ public class AMI
}
});
cb.check();
- test(!r.isSent());
+ test(r.isSent());
test(r.isCompleted());
- test(p.opBatchCount() == 0);
+ test(p.waitForBatch(1));
}
{
@@ -1877,9 +1877,10 @@ public class AMI
//
test(p.opBatchCount() == 0);
TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
+ b1.ice_getConnection();
b1.opBatch();
b1.ice_getConnection().close(false);
- final FlushExCallback cb = new FlushExCallback();
+ final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = b1.begin_ice_flushBatchRequests(
new Ice.Callback_Object_ice_flushBatchRequests()
{
@@ -1896,9 +1897,9 @@ public class AMI
}
});
cb.check();
- test(!r.isSent());
+ test(r.isSent());
test(r.isCompleted());
- test(p.opBatchCount() == 0);
+ test(p.waitForBatch(1));
}
}
out.println("ok");
@@ -1913,7 +1914,8 @@ public class AMI
// AsyncResult.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy(
+ p.ice_getIdentity()).ice_batchOneway());
b1.opBatch();
b1.opBatch();
final FlushCallback cb = new FlushCallback();
@@ -1943,7 +1945,8 @@ public class AMI
// AsyncResult exception.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy(
+ p.ice_getIdentity()).ice_batchOneway());
b1.opBatch();
b1.ice_getConnection().close(false);
final FlushExCallback cb = new FlushExCallback();
@@ -1973,7 +1976,8 @@ public class AMI
// Type-safe.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy(
+ p.ice_getIdentity()).ice_batchOneway());
b1.opBatch();
b1.opBatch();
final FlushCallback cb = new FlushCallback();
@@ -2003,7 +2007,8 @@ public class AMI
// Type-safe exception.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy(
+ p.ice_getIdentity()).ice_batchOneway());
b1.opBatch();
b1.ice_getConnection().close(false);
final FlushExCallback cb = new FlushExCallback();
@@ -2038,7 +2043,8 @@ public class AMI
// AsyncResult - 1 connection.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy(
+ p.ice_getIdentity()).ice_batchOneway());
b1.opBatch();
b1.opBatch();
final FlushCallback cb = new FlushCallback();
@@ -2068,7 +2074,8 @@ public class AMI
// AsyncResult exception - 1 connection.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy(
+ p.ice_getIdentity()).ice_batchOneway());
b1.opBatch();
b1.ice_getConnection().close(false);
final FlushCallback cb = new FlushCallback();
@@ -2098,8 +2105,10 @@ public class AMI
// AsyncResult - 2 connections.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
- TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
+ TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
b2.ice_getConnection(); // Ensure connection is established.
b1.opBatch();
b1.opBatch();
@@ -2135,8 +2144,10 @@ public class AMI
// Exceptions should not be reported.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
- TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
+ TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
b2.ice_getConnection(); // Ensure connection is established.
b1.opBatch();
b2.opBatch();
@@ -2170,8 +2181,10 @@ public class AMI
// The sent callback should be invoked even if all connections fail.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
- TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
+ TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
b2.ice_getConnection(); // Ensure connection is established.
b1.opBatch();
b2.opBatch();
@@ -2204,7 +2217,8 @@ public class AMI
// Type-safe - 1 connection.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
b1.opBatch();
b1.opBatch();
final FlushCallback cb = new FlushCallback();
@@ -2234,7 +2248,8 @@ public class AMI
// Type-safe exception - 1 connection.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
b1.opBatch();
b1.ice_getConnection().close(false);
final FlushCallback cb = new FlushCallback();
@@ -2264,8 +2279,10 @@ public class AMI
// 2 connections.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
- TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
+ TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
b2.ice_getConnection(); // Ensure connection is established.
b1.opBatch();
b1.opBatch();
@@ -2301,8 +2318,10 @@ public class AMI
// Exceptions should not be reported.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
- TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
+ TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
b2.ice_getConnection(); // Ensure connection is established.
b1.opBatch();
b2.opBatch();
@@ -2336,8 +2355,10 @@ public class AMI
// The sent callback should be invoked even if all connections fail.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
- TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
+ TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
b2.ice_getConnection(); // Ensure connection is established.
b1.opBatch();
b2.opBatch();
diff --git a/java/test/src/main/java/test/Ice/ami/lambda/AMI.java b/java/test/src/main/java/test/Ice/ami/lambda/AMI.java
index 09b3cfbe23b..d246212bda3 100644
--- a/java/test/src/main/java/test/Ice/ami/lambda/AMI.java
+++ b/java/test/src/main/java/test/Ice/ami/lambda/AMI.java
@@ -846,15 +846,15 @@ public class AMI
TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
b1.opBatch();
b1.ice_getConnection().close(false);
- final FlushExCallback cb = new FlushExCallback();
+ final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = b1.begin_ice_flushBatchRequests(
null,
(Ice.Exception ex) -> cb.exception(ex),
(boolean sentSynchronously) -> cb.sent(sentSynchronously));
cb.check();
- test(!r.isSent());
+ test(r.isSent());
test(r.isCompleted());
- test(p.opBatchCount() == 0);
+ test(p.waitForBatch(1));
}
}
out.println("ok");
@@ -869,7 +869,8 @@ public class AMI
// Type-safe.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy(
+ p.ice_getIdentity()).ice_batchOneway());
b1.opBatch();
b1.opBatch();
final FlushCallback cb = new FlushCallback();
@@ -897,7 +898,8 @@ public class AMI
// Type-safe exception.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy(
+ p.ice_getIdentity()).ice_batchOneway());
b1.opBatch();
b1.ice_getConnection().close(false);
final FlushExCallback cb = new FlushExCallback();
@@ -921,7 +923,8 @@ public class AMI
// Type-safe - 1 connection.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy(
+ p.ice_getIdentity()).ice_batchOneway());
b1.opBatch();
b1.opBatch();
final FlushCallback cb = new FlushCallback();
@@ -940,7 +943,8 @@ public class AMI
// Type-safe exception - 1 connection.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy(
+ p.ice_getIdentity()).ice_batchOneway());
b1.opBatch();
b1.ice_getConnection().close(false);
final FlushCallback cb = new FlushCallback();
@@ -959,8 +963,10 @@ public class AMI
// 2 connections.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
- TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy(
+ p.ice_getIdentity()).ice_batchOneway());
+ TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
b2.ice_getConnection(); // Ensure connection is established.
b1.opBatch();
b1.opBatch();
@@ -985,8 +991,10 @@ public class AMI
// Exceptions should not be reported.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
- TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy(
+ p.ice_getIdentity()).ice_batchOneway());
+ TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
b2.ice_getConnection(); // Ensure connection is established.
b1.opBatch();
b2.opBatch();
@@ -1009,8 +1017,10 @@ public class AMI
// The sent callback should be invoked even if all connections fail.
//
test(p.opBatchCount() == 0);
- TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway();
- TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway();
+ TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy(
+ p.ice_getIdentity()).ice_batchOneway());
+ TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast(
+ p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway());
b2.ice_getConnection(); // Ensure connection is established.
b1.opBatch();
b2.opBatch();
diff --git a/java/test/src/main/java/test/Ice/background/AllTests.java b/java/test/src/main/java/test/Ice/background/AllTests.java
index 6b890a32c8a..87eb9d4ed7a 100644
--- a/java/test/src/main/java/test/Ice/background/AllTests.java
+++ b/java/test/src/main/java/test/Ice/background/AllTests.java
@@ -726,7 +726,16 @@ public class AllTests
configuration.initializeSocketStatus(IceInternal.SocketOperation.Write);
background.ice_getCachedConnection().close(true);
- background.ice_ping();
+
+ try
+ {
+ background.ice_ping();
+ }
+ catch(Ice.LocalException ex)
+ {
+ test(false); // Something's wrong with retries.
+ }
+
configuration.initializeSocketStatus(IceInternal.SocketOperation.None);
ctl.initializeException(true);
@@ -932,16 +941,6 @@ public class AllTests
//
// First send small requests to test without auto-flushing.
//
- backgroundBatchOneway.ice_ping();
- backgroundBatchOneway.ice_getConnection().close(false);
- try
- {
- backgroundBatchOneway.ice_ping();
- test(false);
- }
- catch(Ice.CloseConnectionException ex)
- {
- }
ctl.holdAdapter();
backgroundBatchOneway.op();
backgroundBatchOneway.op();
@@ -949,20 +948,11 @@ public class AllTests
backgroundBatchOneway.op();
ctl.resumeAdapter();
backgroundBatchOneway.ice_flushBatchRequests();
+ backgroundBatchOneway.ice_getConnection().close(false);
//
// Send bigger requests to test with auto-flushing.
//
- backgroundBatchOneway.ice_ping();
- backgroundBatchOneway.ice_getConnection().close(false);
- try
- {
- backgroundBatchOneway.ice_ping();
- test(false);
- }
- catch(Ice.CloseConnectionException ex)
- {
- }
ctl.holdAdapter();
backgroundBatchOneway.opWithPayload(seq);
backgroundBatchOneway.opWithPayload(seq);
@@ -970,21 +960,11 @@ public class AllTests
backgroundBatchOneway.opWithPayload(seq);
ctl.resumeAdapter();
backgroundBatchOneway.ice_flushBatchRequests();
+ backgroundBatchOneway.ice_getConnection().close(false);
//
// Then try the same thing with async flush.
//
-
- backgroundBatchOneway.ice_ping();
- backgroundBatchOneway.ice_getConnection().close(false);
- try
- {
- backgroundBatchOneway.ice_ping();
- test(false);
- }
- catch(Ice.CloseConnectionException ex)
- {
- }
ctl.holdAdapter();
backgroundBatchOneway.op();
backgroundBatchOneway.op();
@@ -994,16 +974,6 @@ public class AllTests
backgroundBatchOneway.begin_ice_flushBatchRequests();
backgroundBatchOneway.ice_getConnection().close(false);
- backgroundBatchOneway.ice_ping();
- backgroundBatchOneway.ice_getConnection().close(false);
- try
- {
- backgroundBatchOneway.ice_ping();
- test(false);
- }
- catch(Ice.CloseConnectionException ex)
- {
- }
ctl.holdAdapter();
backgroundBatchOneway.opWithPayload(seq);
backgroundBatchOneway.opWithPayload(seq);
@@ -1011,15 +981,6 @@ public class AllTests
backgroundBatchOneway.opWithPayload(seq);
ctl.resumeAdapter();
r = backgroundBatchOneway.begin_ice_flushBatchRequests();
- //
- // We can't close the connection before ensuring all the batches
- // have been sent since with auto-flushing the close connection
- // message might be sent once the first call opWithPayload is sent
- // and before the flushBatchRequests (this would therefore result
- // in the flush to report a CloseConnectionException). Instead we
- // wait for the first flush to complete.
- //
- //backgroundBatchOneway.ice_getConnection().close(false);
backgroundBatchOneway.end_ice_flushBatchRequests(r);
backgroundBatchOneway.ice_getConnection().close(false);
}
diff --git a/java/test/src/main/java/test/Ice/invoke/AllTests.java b/java/test/src/main/java/test/Ice/invoke/AllTests.java
index b07b9805346..de577181c4f 100644
--- a/java/test/src/main/java/test/Ice/invoke/AllTests.java
+++ b/java/test/src/main/java/test/Ice/invoke/AllTests.java
@@ -239,6 +239,7 @@ public class AllTests
Ice.ObjectPrx base = communicator.stringToProxy(ref);
MyClassPrx cl = MyClassPrxHelper.checkedCast(base);
MyClassPrx oneway = MyClassPrxHelper.uncheckedCast(cl.ice_oneway());
+ MyClassPrx batchOneway = MyClassPrxHelper.uncheckedCast(cl.ice_batchOneway());
out.print("testing ice_invoke... ");
out.flush();
@@ -249,6 +250,12 @@ public class AllTests
test(false);
}
+ test(batchOneway.ice_invoke("opOneway", Ice.OperationMode.Normal, null, null));
+ test(batchOneway.ice_invoke("opOneway", Ice.OperationMode.Normal, null, null));
+ test(batchOneway.ice_invoke("opOneway", Ice.OperationMode.Normal, null, null));
+ test(batchOneway.ice_invoke("opOneway", Ice.OperationMode.Normal, null, null));
+ batchOneway.ice_flushBatchRequests();
+
Ice.OutputStream outS = Ice.Util.createOutputStream(communicator);
outS.startEncapsulation();
outS.writeString(testString);
diff --git a/java/test/src/main/java/test/Ice/metrics/Client.java b/java/test/src/main/java/test/Ice/metrics/Client.java
index d009e428b79..0f2e565f939 100644
--- a/java/test/src/main/java/test/Ice/metrics/Client.java
+++ b/java/test/src/main/java/test/Ice/metrics/Client.java
@@ -43,6 +43,7 @@ public class Client extends test.Util.Application
initData.properties.setProperty("Ice.Warn.Connections", "0");
initData.properties.setProperty("Ice.MessageSizeMax", "50000");
initData.properties.setProperty("Ice.Default.Host", "127.0.0.1");
+
initData.observer = _observer;
return initData;
}
diff --git a/java/test/src/main/java/test/Ice/operations/BatchOneways.java b/java/test/src/main/java/test/Ice/operations/BatchOneways.java
index 6f914b46f97..6982d89a4c3 100644
--- a/java/test/src/main/java/test/Ice/operations/BatchOneways.java
+++ b/java/test/src/main/java/test/Ice/operations/BatchOneways.java
@@ -25,32 +25,60 @@ class BatchOneways
}
}
- static void
- batchOneways(MyClassPrx p, PrintWriter out)
+ static class BatchRequestInterceptorI implements Ice.BatchRequestInterceptor
{
- final byte[] bs1 = new byte[10 * 1024];
- final byte[] bs2 = new byte[99 * 1024];
-
- try
+ public void
+ enqueue(Ice.BatchRequest request, int count, int size)
{
- p.opByteSOneway(bs1);
- }
- catch(Ice.MemoryLimitException ex)
- {
- test(false);
+ test(request.getOperation().equals("opByteSOneway") || request.getOperation().equals("ice_ping"));
+ test(request.getProxy().ice_isBatchOneway());
+
+ if(count > 0)
+ {
+ test(_lastRequestSize + _size == size);
+ }
+ _count = count;
+ _size = size;
+
+ if(_size + request.getSize() > 25000)
+ {
+ request.getProxy().begin_ice_flushBatchRequests();
+ _size = 18; // header
+ }
+
+ if(_enabled)
+ {
+ _lastRequestSize = request.getSize();
+ ++_count;
+ request.enqueue();
+ }
}
- try
+ public void
+ setEnqueue(boolean enabled)
{
- p.opByteSOneway(bs2);
+ _enabled = enabled;
}
- catch(Ice.MemoryLimitException ex)
+
+ public int
+ count()
{
- test(false);
+ return _count;
}
+ private boolean _enabled;
+ private int _count;
+ private int _size;
+ private int _lastRequestSize;
+ };
+
+ static void
+ batchOneways(MyClassPrx p, PrintWriter out)
+ {
+ final byte[] bs1 = new byte[10 * 1024];
+
MyClassPrx batch = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway());
- batch.ice_flushBatchRequests();
+ batch.ice_flushBatchRequests(); // Empty flush
p.opByteSOnewayCallCount(); // Reset the call count
@@ -81,39 +109,22 @@ class BatchOneways
if(batch.ice_getConnection() != null)
{
- batch.ice_getConnection().flushBatchRequests();
-
- MyClassPrx batch2 = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway());
+ MyClassPrx batch1 = (MyClassPrx)p.ice_batchOneway();
+ MyClassPrx batch2 = (MyClassPrx)p.ice_batchOneway();
- batch.ice_ping();
+ batch1.ice_ping();
batch2.ice_ping();
- batch.ice_flushBatchRequests();
- batch.ice_getConnection().close(false);
- batch.ice_ping();
+ batch1.ice_flushBatchRequests();
+ batch1.ice_getConnection().close(false);
+ batch1.ice_ping();
batch2.ice_ping();
- batch.ice_getConnection();
+ batch1.ice_getConnection();
batch2.ice_getConnection();
- batch.ice_ping();
- batch.ice_getConnection().close(false);
- try
- {
- batch.ice_ping();
- test(false);
- }
- catch(Ice.CloseConnectionException ex)
- {
- }
- try
- {
- batch2.ice_ping();
- test(false);
- }
- catch(Ice.CloseConnectionException ex)
- {
- }
- batch.ice_ping();
+ batch1.ice_ping();
+ batch1.ice_getConnection().close(false);
+ batch1.ice_ping();
batch2.ice_ping();
}
@@ -122,11 +133,49 @@ class BatchOneways
Ice.ObjectPrx batch3 = batch.ice_identity(identity);
batch3.ice_ping();
batch3.ice_flushBatchRequests();
-
+
// Make sure that a bogus batch request doesn't cause troubles to other ones.
batch3.ice_ping();
batch.ice_ping();
batch.ice_flushBatchRequests();
batch.ice_ping();
+
+ if(batch.ice_getConnection() != null)
+ {
+ Ice.InitializationData initData = new Ice.InitializationData();
+ initData.properties = p.ice_getCommunicator().getProperties()._clone();
+ BatchRequestInterceptorI interceptor = new BatchRequestInterceptorI();
+ initData.batchRequestInterceptor = interceptor;
+ Ice.Communicator ic = Ice.Util.initialize(initData);
+
+ batch = MyClassPrxHelper.uncheckedCast(ic.stringToProxy(p.toString()).ice_batchOneway());
+
+ test(interceptor.count() == 0);
+ batch.ice_ping();
+ batch.ice_ping();
+ batch.ice_ping();
+ test(interceptor.count() == 0);
+
+ interceptor.setEnqueue(true);
+ batch.ice_ping();
+ batch.ice_ping();
+ batch.ice_ping();
+ test(interceptor.count() == 3);
+
+ batch.ice_flushBatchRequests();
+ batch.ice_ping();
+ test(interceptor.count() == 1);
+
+ batch.opByteSOneway(bs1);
+ test(interceptor.count() == 2);
+ batch.opByteSOneway(bs1);
+ test(interceptor.count() == 3);
+
+ batch.opByteSOneway(bs1); // This should trigger the flush
+ batch.ice_ping();
+ test(interceptor.count() == 2);
+
+ ic.destroy();
+ }
}
}
diff --git a/java/test/src/main/java/test/Ice/operations/BatchOnewaysAMI.java b/java/test/src/main/java/test/Ice/operations/BatchOnewaysAMI.java
index 83f664a5ee9..717712f279e 100644
--- a/java/test/src/main/java/test/Ice/operations/BatchOnewaysAMI.java
+++ b/java/test/src/main/java/test/Ice/operations/BatchOnewaysAMI.java
@@ -62,42 +62,13 @@ class BatchOnewaysAMI
static void batchOneways(MyClassPrx p, PrintWriter out)
{
final byte[] bs1 = new byte[10 * 1024];
- final byte[] bs2 = new byte[99 * 1024];
-
- final Callback cb = new Callback();
- p.begin_opByteSOneway(bs1, new Callback_MyClass_opByteSOneway()
- {
- @Override
- public void exception(LocalException ex)
- {
- test(false);
- }
-
- @Override
- public void response()
- {
- cb.called();
- }
- });
- cb.check();
- p.begin_opByteSOneway(bs2, new Callback_MyClass_opByteSOneway()
- {
- @Override
- public void exception(LocalException ex)
- {
- test(false);
- }
-
- @Override
- public void response()
- {
- cb.called();
- }
- });
- cb.check();
MyClassPrx batch = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway());
- batch.end_ice_flushBatchRequests(batch.begin_ice_flushBatchRequests());
+ batch.end_ice_flushBatchRequests(batch.begin_ice_flushBatchRequests()); // Empty flush
+
+ test(batch.begin_ice_flushBatchRequests().isCompleted()); // Empty flush
+ test(batch.begin_ice_flushBatchRequests().isSent()); // Empty flush
+ test(batch.begin_ice_flushBatchRequests().sentSynchronously()); // Empty flush
for(int i = 0; i < 30; ++i)
{
@@ -116,10 +87,21 @@ class BatchOnewaysAMI
});
}
- if(batch.ice_getConnection() != null)
+ int count = 0;
+ while(count < 27) // 3 * 9 requests auto-flushed.
{
- batch.ice_getConnection().end_flushBatchRequests(batch.ice_getConnection().begin_flushBatchRequests());
+ count += p.opByteSOnewayCallCount();
+ try
+ {
+ Thread.sleep(10);
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+ if(batch.ice_getConnection() != null)
+ {
MyClassPrx batch2 = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway());
batch.begin_ice_ping();
@@ -134,44 +116,8 @@ class BatchOnewaysAMI
batch.begin_ice_ping();
batch.ice_getConnection().close(false);
- batch.begin_ice_ping(new Ice.Callback_Object_ice_ping()
- {
-
- @Override
- public void response()
- {
- test(false);
- }
-
- @Override
- public void exception(LocalException ex)
- {
- test(ex instanceof Ice.CloseConnectionException);
- cb.called();
- }
-
- });
- cb.check();
- batch2.begin_ice_ping(new Ice.Callback_Object_ice_ping()
- {
-
- @Override
- public void response()
- {
- test(false);
- }
-
- @Override
- public void exception(LocalException ex)
- {
- test(ex instanceof Ice.CloseConnectionException);
- cb.called();
- }
-
- });
- cb.check();
- batch.begin_ice_ping();
- batch2.begin_ice_ping();
+ batch.begin_ice_ping().throwLocalException();
+ batch2.begin_ice_ping().throwLocalException();
}
Ice.Identity identity = new Ice.Identity();