diff options
author | Benoit Foucher <benoit@zeroc.com> | 2015-03-10 12:12:10 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2015-03-10 12:12:10 +0100 |
commit | c6ca68d97aa5bbc2a172e3e35171b5452657fa22 (patch) | |
tree | 46edcca4c8e313285a205bf6fad7c56c452c0cc0 /java | |
parent | Minor JS style fixes (diff) | |
download | ice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.tar.bz2 ice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.tar.xz ice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.zip |
ICE-6170 - fixed behavior of batch requests
Diffstat (limited to 'java')
35 files changed, 907 insertions, 1375 deletions
diff --git a/java/demo/Glacier2/callback/Client.java b/java/demo/Glacier2/callback/Client.java index bb6c8c92d03..07eab42ad0d 100644 --- a/java/demo/Glacier2/callback/Client.java +++ b/java/demo/Glacier2/callback/Client.java @@ -182,12 +182,12 @@ public class Client extends Glacier2.Application if(override != null) { context.put("_ovrd", override); - } + } batchOneway.initiateCallback(onewayR, context); } else if(line.equals("f")) { - communicator().flushBatchRequests(); + batchOneway.ice_flushBatchRequests(); } else if(line.equals("v")) { @@ -221,7 +221,7 @@ public class Client extends Glacier2.Application onewayR.ice_identity(callbackReceiverIdent)); } - System.out.println("callback receiver identity: " + + System.out.println("callback receiver identity: " + communicator().identityToString(twowayR.ice_getIdentity())); } else if(line.equals("s")) diff --git a/java/demo/Ice/hello/Client.java b/java/demo/Ice/hello/Client.java index 184e5abb2f0..5a728fc79da 100644 --- a/java/demo/Ice/hello/Client.java +++ b/java/demo/Ice/hello/Client.java @@ -132,7 +132,8 @@ public class Client extends Ice.Application } else if(line.equals("f")) { - communicator().flushBatchRequests(); + batchOneway.ice_flushBatchRequests(); + batchDatagram.ice_flushBatchRequests(); } else if(line.equals("T")) { @@ -237,4 +238,3 @@ public class Client extends Ice.Application System.exit(status); } } - diff --git a/java/demo/Ice/swing/Client.java b/java/demo/Ice/swing/Client.java index c4530badc0a..e7bb118f04d 100644 --- a/java/demo/Ice/swing/Client.java +++ b/java/demo/Ice/swing/Client.java @@ -460,6 +460,13 @@ public class Client extends JFrame prx = prx.ice_invocationTimeout(timeout); } _helloPrx = Demo.HelloPrxHelper.uncheckedCast(prx); + + // + // The batch requests associated to the proxy are lost when we + // update the proxy. + // + _flush.setEnabled(false); + _status.setText("Ready"); } @@ -583,7 +590,12 @@ public class Client extends JFrame private void flush() { - _communicator.begin_flushBatchRequests(new Ice.Callback_Communicator_flushBatchRequests() + if(_helloPrx == null) + { + return; + } + + _helloPrx.begin_ice_flushBatchRequests(new Ice.Callback_Object_ice_flushBatchRequests() { @Override public void exception(final Ice.LocalException ex) diff --git a/java/demo/IceBox/hello/Client.java b/java/demo/IceBox/hello/Client.java index 7ed2542cde5..f941c513f23 100644 --- a/java/demo/IceBox/hello/Client.java +++ b/java/demo/IceBox/hello/Client.java @@ -110,7 +110,8 @@ public class Client extends Ice.Application } else if(line.equals("f")) { - communicator().flushBatchRequests(); + batchOneway.ice_flushBatchRequests(); + batchDatagram.ice_flushBatchRequests(); } else if(line.equals("x")) { diff --git a/java/demo/IceDiscovery/hello/Client.java b/java/demo/IceDiscovery/hello/Client.java index 36c7cd21702..02997b9c854 100644 --- a/java/demo/IceDiscovery/hello/Client.java +++ b/java/demo/IceDiscovery/hello/Client.java @@ -131,7 +131,8 @@ public class Client extends Ice.Application } else if(line.equals("f")) { - communicator().flushBatchRequests(); + batchOneway.ice_flushBatchRequests(); + batchDatagram.ice_flushBatchRequests(); } else if(line.equals("T")) { @@ -236,4 +237,3 @@ public class Client extends Ice.Application System.exit(status); } } - diff --git a/java/src/Ice/src/main/java/Ice/BatchRequest.java b/java/src/Ice/src/main/java/Ice/BatchRequest.java new file mode 100644 index 00000000000..eb31284a98f --- /dev/null +++ b/java/src/Ice/src/main/java/Ice/BatchRequest.java @@ -0,0 +1,33 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2015 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package Ice; + +public interface BatchRequest +{ + /** + * Confirms the queuing of the batch request. + **/ + void enqueue(); + + /** + * The marshalled size of the request. + **/ + int getSize(); + + /** + * The name of the operation + **/ + String getOperation(); + + /** + * The proxy used to invoke the batch request. + **/ + Ice.ObjectPrx getProxy(); +}; diff --git a/java/src/Ice/src/main/java/Ice/BatchRequestInterceptor.java b/java/src/Ice/src/main/java/Ice/BatchRequestInterceptor.java new file mode 100644 index 00000000000..42539a0d4cc --- /dev/null +++ b/java/src/Ice/src/main/java/Ice/BatchRequestInterceptor.java @@ -0,0 +1,28 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2015 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package Ice; + +/** + * Base interface for listening to batch request queues. + **/ +public interface BatchRequestInterceptor +{ + /** + * Called by the Ice runtime when a batch request is about to be + * added to the batch request queue of a proxy or connection. + * + * The implementation of this method must call enqueue() on the + * request to confirm its addition to the queue, if not called + * the request isn't added to the queue. The implementation can + * raise an Ice local exception to notify the caller of a failure. + * + **/ + void enqueue(Ice.BatchRequest request, int queueBatchRequestCount, int queueBatchRequestSize); +} diff --git a/java/src/Ice/src/main/java/Ice/ConnectionI.java b/java/src/Ice/src/main/java/Ice/ConnectionI.java index bfe8715e339..3a028d88c72 100644 --- a/java/src/Ice/src/main/java/Ice/ConnectionI.java +++ b/java/src/Ice/src/main/java/Ice/ConnectionI.java @@ -318,7 +318,7 @@ public final class ConnectionI extends IceInternal.EventHandler // setState(StateClosed, new ConnectionTimeoutException()); } - else if(acm.close != ACMClose.CloseOnInvocation && _dispatchCount == 0 && _batchStream.isEmpty() && + else if(acm.close != ACMClose.CloseOnInvocation && _dispatchCount == 0 && _batchRequestQueue.isEmpty() && _asyncRequests.isEmpty()) { // @@ -329,7 +329,8 @@ public final class ConnectionI extends IceInternal.EventHandler } } - synchronized public int sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response) + synchronized public int + sendAsyncRequest(IceInternal.OutgoingAsyncBase out, boolean compress, boolean response, int batchRequestNum) throws IceInternal.RetryException { final IceInternal.BasicStream os = out.getOs(); @@ -378,6 +379,11 @@ public final class ConnectionI extends IceInternal.EventHandler os.pos(IceInternal.Protocol.headerSize); os.writeInt(requestId); } + else if(batchRequestNum > 0) + { + os.pos(IceInternal.Protocol.headerSize); + os.writeInt(batchRequestNum); + } out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); @@ -403,189 +409,10 @@ public final class ConnectionI extends IceInternal.EventHandler return status; } - public synchronized void prepareBatchRequest(IceInternal.BasicStream os) throws IceInternal.RetryException - { - waitBatchStreamInUse(); - - if(_exception != null) - { - // - // If there were no batch requests queued when the connection - // failed, we can safely retry with a new connection. Otherwise, we - // must throw to notify the caller that some previous batch requests - // were not sent. - // - if(_batchStream.isEmpty()) - { - throw new IceInternal.RetryException((Ice.LocalException) _exception.fillInStackTrace()); - } - else - { - throw (Ice.LocalException) _exception.fillInStackTrace(); - } - } - - assert (_state > StateNotValidated); - assert (_state < StateClosing); - - if(_batchStream.isEmpty()) - { - try - { - _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr); - } - catch(LocalException ex) - { - setState(StateClosed, ex); - throw ex; - } - } - - _batchStreamInUse = true; - _batchMarker = _batchStream.size(); - _batchStream.swap(os); - - // - // The batch stream now belongs to the caller, until - // finishBatchRequest() or abortBatchRequest() is called. - // - } - - public void finishBatchRequest(IceInternal.BasicStream os, boolean compress) - { - try - { - synchronized(this) - { - // - // Get the batch stream back. - // - _batchStream.swap(os); - - if(_exception != null) - { - return; - } - - boolean flush = false; - if(_batchAutoFlushSize > 0) - { - if(_batchStream.size() > _batchAutoFlushSize) - { - flush = true; - } - - // - // Throw memory limit exception if the first message added - // causes us to go over limit. Otherwise put aside the - // marshalled message that caused limit to be exceeded and - // rollback stream to the marker. - // - try - { - _transceiver.checkSendSize(_batchStream.getBuffer()); - } - catch(Ice.LocalException ex) - { - if(_batchRequestNum > 0) - { - flush = true; - } - else - { - throw ex; - } - } - } - - if(flush) - { - // - // Temporarily save the last request. - // - byte[] lastRequest = new byte[_batchStream.size() - _batchMarker]; - IceInternal.Buffer buffer = _batchStream.getBuffer(); - buffer.b.position(_batchMarker); - buffer.b.get(lastRequest); - _batchStream.resize(_batchMarker, false); - - // - // Send the batch stream without the last request. - // - try - { - // - // Fill in the number of requests in the batch. - // - _batchStream.pos(IceInternal.Protocol.headerSize); - _batchStream.writeInt(_batchRequestNum); - - OutgoingMessage message = new OutgoingMessage(_batchStream, _batchRequestCompress, true); - sendMessage(message); - } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - assert (_exception != null); - throw (Ice.LocalException) _exception.fillInStackTrace(); - } - - // - // Reset the batch stream. - // - _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding); - _batchRequestNum = 0; - _batchRequestCompress = false; - _batchMarker = 0; - - // - // Start a new batch with the last message that caused us to - // go over the limit. - // - _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr); - _batchStream.writeBlob(lastRequest); - } - - // - // Increment the number of requests in the batch. - // - ++_batchRequestNum; - - // - // We compress the whole batch if there is at least one - // compressed - // message. - // - if(compress) - { - _batchRequestCompress = true; - } - - // - // Notify about the batch stream not being in use anymore. - // - assert (_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); - } - } - catch(Ice.LocalException ex) - { - abortBatchRequest(); - throw ex; - } - } - - public synchronized void abortBatchRequest() + public IceInternal.BatchRequestQueue + getBatchRequestQueue() { - _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding); - _batchRequestNum = 0; - _batchRequestCompress = false; - _batchMarker = 0; - - assert (_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); + return _batchRequestQueue; } @Override @@ -652,67 +479,6 @@ public final class ConnectionI extends IceInternal.EventHandler r.__wait(); } - synchronized public int flushAsyncBatchRequests(IceInternal.OutgoingAsyncBase outAsync) - { - waitBatchStreamInUse(); - - if(_exception != null) - { - throw (Ice.LocalException) _exception.fillInStackTrace(); - } - - if(_batchRequestNum == 0) - { - int status = IceInternal.AsyncStatus.Sent; - if(outAsync.sent()) - { - status |= IceInternal.AsyncStatus.InvokeSentCallback; - } - return status; - } - - // - // Notify the request that it's cancelable with this connection. - // This will throw if the request is canceled. - // - outAsync.cancelable(this); - - // - // Fill in the number of requests in the batch. - // - _batchStream.pos(IceInternal.Protocol.headerSize); - _batchStream.writeInt(_batchRequestNum); - - _batchStream.swap(outAsync.getOs()); - - outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0); - - // - // Send the batch stream. - // - int status; - try - { - OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.getOs(), _batchRequestCompress, 0); - status = sendMessage(message); - } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - assert (_exception != null); - throw (Ice.LocalException) _exception.fillInStackTrace(); - } - - // - // Reset the batch stream. - // - _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding); - _batchRequestNum = 0; - _batchRequestCompress = false; - _batchMarker = 0; - return status; - } - @Override synchronized public void setCallback(final ConnectionCallback callback) { @@ -832,7 +598,7 @@ public final class ConnectionI extends IceInternal.EventHandler if(outAsync instanceof IceInternal.OutgoingAsync) { IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync) outAsync; - java.util.Iterator<IceInternal.OutgoingAsync> it2 = _asyncRequests.values().iterator(); + java.util.Iterator<IceInternal.OutgoingAsyncBase> it2 = _asyncRequests.values().iterator(); while(it2.hasNext()) { if(it2.next() == o) @@ -1536,7 +1302,7 @@ public final class ConnectionI extends IceInternal.EventHandler _sendStreams.clear(); } - for(IceInternal.OutgoingAsync p : _asyncRequests.values()) + for(IceInternal.OutgoingAsyncBase p : _asyncRequests.values()) { if(p.completed(_exception)) { @@ -1683,12 +1449,7 @@ public final class ConnectionI extends IceInternal.EventHandler } _nextRequestId = 1; _messageSizeMax = adapter != null ? adapter.messageSizeMax() : instance.messageSizeMax(); - _batchAutoFlushSize = _instance.batchAutoFlushSize(); - _batchStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding); - _batchStreamInUse = false; - _batchRequestNum = 0; - _batchRequestCompress = false; - _batchMarker = 0; + _batchRequestQueue = new IceInternal.BatchRequestQueue(instance, _endpoint.datagram()); _readStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding); _readHeader = false; _readStreamPos = -1; @@ -1913,6 +1674,8 @@ public final class ConnectionI extends IceInternal.EventHandler return; } + _batchRequestQueue.destroy(_exception); + // // Don't need to close now for connections so only close the transceiver // if the selector request it. @@ -2505,7 +2268,7 @@ public final class ConnectionI extends IceInternal.EventHandler byte compress; IceInternal.ServantManager servantManager; ObjectAdapter adapter; - IceInternal.OutgoingAsync outAsync; + IceInternal.OutgoingAsyncBase outAsync; ConnectionCallback heartbeatCallback; int messageDispatchCount; } @@ -2633,7 +2396,7 @@ public final class ConnectionI extends IceInternal.EventHandler IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels); info.requestId = info.stream.readInt(); - IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId); + IceInternal.OutgoingAsyncBase outAsync = _asyncRequests.remove(info.requestId); if(outAsync != null && outAsync.completed(info.stream)) { info.outAsync = outAsync; @@ -2966,35 +2729,6 @@ public final class ConnectionI extends IceInternal.EventHandler } } - private void waitBatchStreamInUse() - { - // - // This is similar to a mutex lock in that the flag is - // only true for a short time period. As such we don't permit the - // wait to be interrupted. Instead the interrupted status is saved - // and restored. - // - boolean interrupted = false; - while(_batchStreamInUse && _exception == null) - { - try - { - wait(); - } - catch(InterruptedException e) - { - interrupted = true; - } - } - // - // Restore the interrupted flag if we were interrupted. - // - if(interrupted) - { - Thread.currentThread().interrupt(); - } - } - private int read(IceInternal.Buffer buf) { int start = buf.b.position(); @@ -3140,18 +2874,13 @@ public final class ConnectionI extends IceInternal.EventHandler private int _nextRequestId; - private java.util.Map<Integer, IceInternal.OutgoingAsync> _asyncRequests = - new java.util.HashMap<Integer, IceInternal.OutgoingAsync>(); + private java.util.Map<Integer, IceInternal.OutgoingAsyncBase> _asyncRequests = + new java.util.HashMap<Integer, IceInternal.OutgoingAsyncBase>(); private LocalException _exception; private final int _messageSizeMax; - private final int _batchAutoFlushSize; - private IceInternal.BasicStream _batchStream; - private boolean _batchStreamInUse; - private int _batchRequestNum; - private boolean _batchRequestCompress; - private int _batchMarker; + private IceInternal.BatchRequestQueue _batchRequestQueue; private java.util.LinkedList<OutgoingMessage> _sendStreams = new java.util.LinkedList<OutgoingMessage>(); diff --git a/java/src/Ice/src/main/java/Ice/InitializationData.java b/java/src/Ice/src/main/java/Ice/InitializationData.java index 434a52dec3f..0244df73b8b 100644 --- a/java/src/Ice/src/main/java/Ice/InitializationData.java +++ b/java/src/Ice/src/main/java/Ice/InitializationData.java @@ -83,4 +83,9 @@ public final class InitializationData implements Cloneable * The compact type ID resolver. **/ public CompactIdResolver compactIdResolver; + + /** + * The batch request interceptor. + **/ + public BatchRequestInterceptor batchRequestInterceptor; } diff --git a/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java b/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java index 900d23f7e3d..6438cab69e9 100644 --- a/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java @@ -2273,75 +2273,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final Connection ice_getConnection() { - final InvocationObserver observer = IceInternal.ObserverHelper.get(this, "ice_getConnection"); - int cnt = 0; - if(Thread.interrupted()) - { - throw new Ice.OperationInterruptedException(); - } - try - { - while(true) - { - IceInternal.RequestHandler handler = null; - try - { - handler = __getRequestHandler(); - try - { - // Wait for the connection to be established. - return handler.waitForConnection(); - } - catch(InterruptedException e) - { - throw new Ice.OperationInterruptedException(); - } - } - catch(RetryException e) - { - // Clear request handler and retry. - __setRequestHandler(handler, null); - } - catch(Ice.Exception ex) - { - try - { - Ice.Holder<Integer> interval = new Ice.Holder<Integer>(); - cnt = __handleException(ex, handler, OperationMode.Idempotent, false, interval, cnt); - if(observer != null) - { - observer.retried(); - } - if(interval.value > 0) - { - try - { - Thread.sleep(interval.value); - } - catch(InterruptedException ex1) - { - throw new Ice.OperationInterruptedException(); - } - } - } - catch(Ice.Exception exc) - { - if(observer != null) - { - observer.failed(exc.ice_name()); - } - throw exc; - } - } - } - } - finally - { - if(observer != null) - { - observer.detach(); - } - } + return end_ice_getConnection(begin_ice_getConnection()); } /** @@ -2632,7 +2564,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable __handleException(Exception ex, IceInternal.RequestHandler handler, OperationMode mode, boolean sent, Holder<Integer> interval, int cnt) { - __setRequestHandler(handler, null); // Clear the request handler + __updateRequestHandler(handler, null); // Clear the request handler // // We only retry local exception, system exceptions aren't retried. @@ -2741,7 +2673,6 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final IceInternal.RequestHandler __getRequestHandler() { - IceInternal.RequestHandler handler; if(_reference.getCacheConnection()) { synchronized(this) @@ -2750,19 +2681,40 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable { return _requestHandler; } - handler = _reference.getInstance().requestHandlerFactory().getRequestHandler(_reference, this); - _requestHandler = handler; } } - else + return _reference.getRequestHandler(this); + } + + synchronized public final IceInternal.BatchRequestQueue + __getBatchRequestQueue() + { + if(_batchRequestQueue == null) + { + _batchRequestQueue = _reference.getBatchRequestQueue(); + } + return _batchRequestQueue; + } + + public IceInternal.RequestHandler + __setRequestHandler(IceInternal.RequestHandler handler) + { + if(_reference.getCacheConnection()) { - handler = _reference.getInstance().requestHandlerFactory().getRequestHandler(_reference, this); + synchronized(this) + { + if(_requestHandler == null) + { + _requestHandler = handler; + } + return _requestHandler; + } } - return handler.connect(this); + return handler; } public void - __setRequestHandler(IceInternal.RequestHandler previous, IceInternal.RequestHandler handler) + __updateRequestHandler(IceInternal.RequestHandler previous, IceInternal.RequestHandler handler) { if(_reference.getCacheConnection() && previous != null) { @@ -3036,6 +2988,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable private transient IceInternal.Reference _reference; private transient IceInternal.RequestHandler _requestHandler; + private transient IceInternal.BatchRequestQueue _batchRequestQueue; private transient List<StreamCacheEntry> _streamCache; public static final long serialVersionUID = 0L; } diff --git a/java/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java b/java/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java new file mode 100644 index 00000000000..5fc582659bf --- /dev/null +++ b/java/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java @@ -0,0 +1,239 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2015 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class BatchRequestQueue +{ + class BatchRequestI implements Ice.BatchRequest + { + public void reset(Ice.ObjectPrx proxy, String operation, int size) + { + _proxy = proxy; + _operation = operation; + _size = size; + } + + @Override + public void enqueue() + { + enqueueBatchRequest(); + } + + @Override + public Ice.ObjectPrx getProxy() + { + return _proxy; + } + + @Override + public String getOperation() + { + return _operation; + } + + @Override + public int getSize() + { + return _size; + } + + private Ice.ObjectPrx _proxy; + private String _operation; + private int _size; + }; + + public + BatchRequestQueue(Instance instance, boolean datagram) + { + Ice.InitializationData initData = instance.initializationData(); + _interceptor = initData.batchRequestInterceptor; + _batchStreamInUse = false; + _batchRequestNum = 0; + _batchStream = new BasicStream(instance, Protocol.currentProtocolEncoding); + _batchStream.writeBlob(Protocol.requestBatchHdr); + _batchMarker = _batchStream.size(); + _request = new BatchRequestI(); + + _maxSize = instance.batchAutoFlushSize(); + if(_maxSize > 0 && datagram) + { + int udpSndSize = initData.properties.getPropertyAsIntWithDefault("Ice.UDP.SndSize", 65535 - _udpOverhead); + if(udpSndSize < _maxSize) + { + _maxSize = udpSndSize; + } + } + } + + synchronized public void + prepareBatchRequest(BasicStream os) + { + if(_exception != null) + { + throw (Ice.LocalException)_exception.fillInStackTrace(); + } + + waitStreamInUse(false); + _batchStreamInUse = true; + _batchStream.swap(os); + } + + public void + finishBatchRequest(BasicStream os, Ice.ObjectPrx proxy, String operation) + { + // + // No need for synchronization, no other threads are supposed + // to modify the queue since we set _batchStreamInUse to true. + // + assert(_batchStreamInUse); + _batchStream.swap(os); + + try + { + _batchStreamCanFlush = true; // Allow flush to proceed even if the stream is marked in use. + + if(_maxSize > 0 && _batchStream.size() >= _maxSize) + { + proxy.begin_ice_flushBatchRequests(); // Auto flush + } + + assert(_batchMarker < _batchStream.size()); + if(_interceptor != null) + { + _request.reset(proxy, operation, _batchStream.size() - _batchMarker); + _interceptor.enqueue(_request, _batchRequestNum, _batchMarker); + } + else + { + _batchMarker = _batchStream.size(); + ++_batchRequestNum; + } + } + finally + { + synchronized(this) + { + _batchStream.resize(_batchMarker, false); + _batchStreamInUse = false; + _batchStreamCanFlush = false; + notifyAll(); + } + } + } + + synchronized public void + abortBatchRequest(BasicStream os) + { + if(_batchStreamInUse) + { + _batchStream.swap(os); + _batchStream.resize(_batchMarker, false); + _batchStreamInUse = false; + notifyAll(); + } + } + + synchronized public int + swap(BasicStream os) + { + if(_batchRequestNum == 0) + { + return 0; + } + + waitStreamInUse(true); + + byte[] lastRequest = null; + if(_batchMarker < _batchStream.size()) + { + lastRequest = new byte[_batchStream.size() - _batchMarker]; + Buffer buffer = _batchStream.getBuffer(); + buffer.b.position(_batchMarker); + buffer.b.get(lastRequest); + _batchStream.resize(_batchMarker, false); + } + + int requestNum = _batchRequestNum; + _batchStream.swap(os); + + // + // Reset the batch. + // + _batchRequestNum = 0; + _batchStream.writeBlob(Protocol.requestBatchHdr); + _batchMarker = _batchStream.size(); + if(lastRequest != null) + { + _batchStream.writeBlob(lastRequest); + } + return requestNum; + } + + synchronized public void + destroy(Ice.LocalException ex) + { + _exception = ex; + } + + synchronized public boolean + isEmpty() + { + return _batchStream.size() == Protocol.requestBatchHdr.length; + } + + private void + waitStreamInUse(boolean flush) + { + // + // This is similar to a mutex lock in that the stream is + // only "locked" while marshaling. As such we don't permit the wait + // to be interrupted. Instead the interrupted status is saved and + // restored. + // + boolean interrupted = false; + while(_batchStreamInUse && !(flush && _batchStreamCanFlush)) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + interrupted = true; + } + } + // + // Restore the interrupted flag if we were interrupted. + // + if(interrupted) + { + Thread.currentThread().interrupt(); + } + } + + private void enqueueBatchRequest() + { + assert(_batchMarker < _batchStream.size()); + _batchMarker = _batchStream.size(); + ++_batchRequestNum; + } + + private Ice.BatchRequestInterceptor _interceptor; + private BasicStream _batchStream; + private boolean _batchStreamInUse; + private boolean _batchStreamCanFlush; + private int _batchRequestNum; + private int _batchMarker; + private BatchRequestI _request; + private Ice.LocalException _exception; + private int _maxSize; + + final private static int _udpOverhead = 20 + 8; +}; diff --git a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java index df41457e33c..ad563ecccdd 100644 --- a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java @@ -13,13 +13,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { private class InvokeAllAsync extends DispatchWorkItem { - private InvokeAllAsync(OutgoingAsyncBase outAsync, BasicStream os, int requestId, int invokeNum, boolean batch) + private InvokeAllAsync(OutgoingAsyncBase outAsync, BasicStream os, int requestId, int batchRequestNum) { _outAsync = outAsync; _os = os; _requestId = requestId; - _invokeNum = invokeNum; - _batch = batch; + _batchRequestNum = batchRequestNum; } @Override @@ -27,15 +26,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { if(sentAsync(_outAsync)) { - invokeAll(_os, _requestId, _invokeNum, _batch); + invokeAll(_os, _requestId, _batchRequestNum); } } private final OutgoingAsyncBase _outAsync; private BasicStream _os; private final int _requestId; - private final int _invokeNum; - private final boolean _batch; + private final int _batchRequestNum; } public @@ -43,23 +41,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { _reference = ref; _dispatcher = ref.getInstance().initializationData().dispatcher != null; - _response = _reference.getMode() == Reference.ModeTwoway; _adapter = (Ice.ObjectAdapterI)adapter; + _response = _reference.getMode() == Reference.ModeTwoway; _logger = _reference.getInstance().initializationData().logger; // Cached for better performance. _traceLevels = _reference.getInstance().traceLevels(); // Cached for better performance. - _batchAutoFlushSize = ref.getInstance().batchAutoFlushSize(); _requestId = 0; - _batchStreamInUse = false; - _batchRequestNum = 0; - _batchStream = new BasicStream(ref.getInstance(), Protocol.currentProtocolEncoding); - } - - @Override - public RequestHandler - connect(Ice.ObjectPrxHelperBase proxy) - { - return this; } @Override @@ -70,110 +57,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } @Override - synchronized public void - prepareBatchRequest(BasicStream os) - { - waitStreamInUse(); - if(_batchStream.isEmpty()) - { - try - { - _batchStream.writeBlob(Protocol.requestBatchHdr); - } - catch(Ice.LocalException ex) - { - throw ex; - } - } - - _batchStreamInUse = true; - _batchMarker = _batchStream.size(); - _batchStream.swap(os); - } - - @Override - public void - finishBatchRequest(BasicStream os) - { - try - { - synchronized(this) - { - _batchStream.swap(os); - - if(_batchAutoFlushSize > 0 && (_batchStream.size() > _batchAutoFlushSize)) - { - // - // Temporarily save the last request. - // - byte[] lastRequest = new byte[_batchStream.size() - _batchMarker]; - Buffer buffer = _batchStream.getBuffer(); - buffer.b.position(_batchMarker); - buffer.b.get(lastRequest); - _batchStream.resize(_batchMarker, false); - - final int invokeNum = _batchRequestNum; - final BasicStream stream = new BasicStream(_reference.getInstance(), - Protocol.currentProtocolEncoding); - stream.swap(_batchStream); - - _adapter.getThreadPool().dispatch( - new DispatchWorkItem() - { - @Override - public void - run() - { - CollocatedRequestHandler.this.invokeAll(stream, 0, invokeNum, true); - } - }); - - // - // Reset the batch. - // - _batchRequestNum = 0; - _batchMarker = 0; - - // - // Start a new batch with the last message that caused us to go over the limit. - // - _batchStream.writeBlob(Protocol.requestBatchHdr); - _batchStream.writeBlob(lastRequest); - } - - // - // Increment the number of requests in the batch. - // - assert(_batchStreamInUse); - ++_batchRequestNum; - _batchStreamInUse = false; - notifyAll(); - } - } - catch(Ice.LocalException ex) - { - abortBatchRequest(); - throw ex; - } - } - - @Override - synchronized public void - abortBatchRequest() - { - BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchMarker = 0; - - assert(_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); - } - - @Override public int - sendAsyncRequest(OutgoingAsyncBase outAsync) + sendAsyncRequest(ProxyOutgoingAsyncBase outAsync) { return outAsync.invokeCollocated(this); } @@ -200,7 +85,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { OutgoingAsync o = (OutgoingAsync)outAsync; assert(o != null); - for(java.util.Map.Entry<Integer, OutgoingAsync> e : _asyncRequests.entrySet()) + for(java.util.Map.Entry<Integer, OutgoingAsyncBase> e : _asyncRequests.entrySet()) { if(e.getValue() == o) { @@ -219,7 +104,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler public void sendResponse(int requestId, final BasicStream os, byte status, boolean amd) { - OutgoingAsync outAsync = null; + OutgoingAsyncBase outAsync = null; synchronized(this) { assert(_response); @@ -276,7 +161,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler @Override public void - invokeException(int requestId, Ice.LocalException ex, int invokeNum, boolean amd) + invokeException(int requestId, Ice.LocalException ex, int batchRequestNum, boolean amd) { handleException(requestId, ex, amd); _adapter.decDirectCount(); @@ -296,14 +181,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler return null; } - @Override - public Ice.ConnectionI - waitForConnection() - { - return null; - } - - int invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous) + int invokeAsyncRequest(OutgoingAsyncBase outAsync, int batchRequestNum, boolean synchronous) { int requestId = 0; synchronized(this) @@ -315,6 +193,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler requestId = ++_requestId; _asyncRequests.put(requestId, outAsync); } + _sendAsyncRequests.put(outAsync, requestId); } @@ -325,75 +204,33 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler // // Treat this collocated call as if it is a synchronous invocation. // - if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0 || !_response) + if(!_response || _reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) { // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise. - _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false)); + _adapter.getThreadPool().dispatch( + new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, batchRequestNum)); } else if(_dispatcher) { _adapter.getThreadPool().dispatchFromThisThread( - new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false)); + new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, batchRequestNum)); } else // Optimization: directly call invokeAll if there's no dispatcher. { if(sentAsync(outAsync)) { - invokeAll(outAsync.getOs(), requestId, 1, false); + invokeAll(outAsync.getOs(), requestId, batchRequestNum); } } } else { - _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false)); + _adapter.getThreadPool().dispatch( + new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, batchRequestNum)); } return AsyncStatus.Queued; } - int invokeAsyncBatchRequests(OutgoingAsyncBase outAsync) - { - int invokeNum; - synchronized(this) - { - waitStreamInUse(); - - invokeNum = _batchRequestNum; - if(_batchRequestNum > 0) - { - outAsync.cancelable(this); // This will throw if the request is canceled - - _sendAsyncRequests.put(outAsync, 0); - - assert(!_batchStream.isEmpty()); - _batchStream.swap(outAsync.getOs()); - - // - // Reset the batch stream. - // - BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchMarker = 0; - } - } - - outAsync.attachCollocatedObserver(_adapter, 0); - - if(invokeNum > 0) - { - _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), 0, invokeNum, true)); - return AsyncStatus.Queued; - } - else if(outAsync.sent()) - { - return AsyncStatus.Sent | AsyncStatus.InvokeSentCallback; - } - else - { - return AsyncStatus.Sent; - } - } - private boolean sentAsync(final OutgoingAsyncBase outAsync) { @@ -420,9 +257,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } private void - invokeAll(BasicStream os, int requestId, int invokeNum, boolean batch) + invokeAll(BasicStream os, int requestId, int batchRequestNum) { - if(batch) + if(batchRequestNum > 0) { os.pos(Protocol.requestBatchHdr.length); } @@ -438,13 +275,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { fillInValue(os, Protocol.headerSize, requestId); } - else if(batch) + else if(batchRequestNum > 0) { - fillInValue(os, Protocol.headerSize, invokeNum); + fillInValue(os, Protocol.headerSize, batchRequestNum); } TraceUtil.traceSend(os, _logger, _traceLevels); } + int invokeNum = batchRequestNum > 0 ? batchRequestNum : 1; ServantManager servantManager = _adapter.getServantManager(); try { @@ -502,7 +340,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler return; // Ignore exception for oneway messages. } - OutgoingAsync outAsync = null; + OutgoingAsyncBase outAsync = null; synchronized(this) { outAsync = _asyncRequests.remove(requestId); @@ -531,36 +369,6 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } private void - waitStreamInUse() - { - // - // This is similar to a mutex lock in that the stream is - // only "locked" while marshaling. As such we don't permit the wait - // to be interrupted. Instead the interrupted status is saved and - // restored. - // - boolean interrupted = false; - while(_batchStreamInUse) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - interrupted = true; - } - } - // - // Restore the interrupted flag if we were interrupted. - // - if(interrupted) - { - Thread.currentThread().interrupt(); - } - } - - private void fillInValue(BasicStream os, int pos, int value) { os.rewriteInt(value, pos); @@ -572,7 +380,6 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler private final Ice.ObjectAdapterI _adapter; private final Ice.Logger _logger; private final TraceLevels _traceLevels; - private int _batchAutoFlushSize; private int _requestId; @@ -582,10 +389,6 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler private java.util.Map<OutgoingAsyncBase, Integer> _sendAsyncRequests = new java.util.HashMap<OutgoingAsyncBase, Integer>(); - private java.util.Map<Integer, OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, OutgoingAsync>(); - - private BasicStream _batchStream; - private boolean _batchStreamInUse; - private int _batchRequestNum; - private int _batchMarker; + private java.util.Map<Integer, OutgoingAsyncBase> _asyncRequests = + new java.util.HashMap<Integer, OutgoingAsyncBase>(); } diff --git a/java/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java b/java/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java index b4134c9dd1f..242341fb159 100644 --- a/java/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java +++ b/java/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java @@ -49,9 +49,9 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI { public FlushBatch() { - super(CommunicatorFlushBatch.this.getCommunicator(), - CommunicatorFlushBatch.this._instance, - CommunicatorFlushBatch.this.getOperation(), + super(CommunicatorFlushBatch.this.getCommunicator(), + CommunicatorFlushBatch.this._instance, + CommunicatorFlushBatch.this.getOperation(), null); } @@ -81,7 +81,7 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI return false; } - @Override + @Override protected Ice.Instrumentation.InvocationObserver getObserver() { return CommunicatorFlushBatch.this._observer; @@ -95,22 +95,34 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI try { - if(_instance.queueRequests()) + final FlushBatch flushBatch = new FlushBatch(); + final int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs()); + if(batchRequestNum == 0) + { + flushBatch.sent(); + } + else if(_instance.queueRequests()) { - _instance.getQueueExecutor().executeNoThrow(new Callable<Integer>() + _instance.getQueueExecutor().executeNoThrow(new Callable<Void>() { @Override - public Integer call() + public Void call() throws RetryException { - return con.flushAsyncBatchRequests(new FlushBatch()); + con.sendAsyncRequest(flushBatch, false, false, batchRequestNum); + return null; } }); } else { - con.flushAsyncBatchRequests(new FlushBatch()); + con.sendAsyncRequest(flushBatch, false, false, batchRequestNum); } } + catch(RetryException ex) + { + doCheck(false); + throw ex.get(); + } catch(Ice.LocalException ex) { doCheck(false); diff --git a/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java index 6b96a08707a..198625eeef8 100644 --- a/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java @@ -16,136 +16,41 @@ import java.util.concurrent.Callable; public class ConnectRequestHandler implements RequestHandler, Reference.GetConnectionCallback, RouterInfo.AddProxyCallback { - static private class Request - { - Request(BasicStream os) - { - this.os = new BasicStream(os.instance(), Protocol.currentProtocolEncoding); - this.os.swap(os); - } - - Request(OutgoingAsyncBase out) - { - this.outAsync = out; - } - - OutgoingAsyncBase outAsync = null; - BasicStream os = null; - } - - @Override - public RequestHandler + synchronized public RequestHandler connect(Ice.ObjectPrxHelperBase proxy) { - // - // Initiate the connection if connect() is called by the proxy that - // created the handler. - // - if(proxy == _proxy && _connect) - { - _connect = false; // Call getConnection only once - _reference.getConnection(this); - } - try { - synchronized(this) - { - if(!initialized()) - { - _proxies.add(proxy); - return this; - } - } - } - catch(Ice.LocalException ex) - { - throw ex; - } - - if(_connectionRequestHandler != null) - { - proxy.__setRequestHandler(this, _connectionRequestHandler); - return _connectionRequestHandler; - } - else - { - return this; - } - } - - @Override - public RequestHandler - update(RequestHandler previousHandler, RequestHandler newHandler) - { - return previousHandler == this ? newHandler : this; - } - - @Override - public void - prepareBatchRequest(BasicStream os) - throws RetryException - { - synchronized(this) - { - waitBatchRequestInProgress(); - if(!initialized()) { - _batchRequestInProgress = true; - _batchStream.swap(os); - return; + _proxies.add(proxy); } } - - _connection.prepareBatchRequest(os); - } - - @Override - public void - finishBatchRequest(BasicStream os) - { - synchronized(this) + catch(Ice.LocalException ex) { - if(!initialized()) // This can't throw until _batchRequestInProgress = false + // + // Only throw if the connection didn't get established. If + // it died after being established, we allow the caller to + // retry the connection establishment by not throwing here. + // + if(_connection == null) { - assert(_batchRequestInProgress); - _batchRequestInProgress = false; - notifyAll(); - - _batchStream.swap(os); - - _requests.add(new Request(_batchStream)); - return; + throw ex; } } - _connection.finishBatchRequest(os, _compress); + return _requestHandler; } @Override - public void - abortBatchRequest() + public RequestHandler + update(RequestHandler previousHandler, RequestHandler newHandler) { - synchronized(this) - { - if(!initialized()) // This can't throw until _batchRequestInProgress = false - { - assert(_batchRequestInProgress); - _batchRequestInProgress = false; - notifyAll(); - - BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding); - _batchStream.swap(dummy); - - return; - } - } - _connection.abortBatchRequest(); + return previousHandler == this ? newHandler : this; } @Override public int - sendAsyncRequest(OutgoingAsyncBase out) + sendAsyncRequest(ProxyOutgoingAsyncBase out) throws RetryException { synchronized(this) @@ -159,7 +64,7 @@ public class ConnectRequestHandler { if(!initialized()) { - _requests.add(new Request(out)); + _requests.add(out); return AsyncStatus.Queued; } } @@ -168,7 +73,7 @@ public class ConnectRequestHandler throw new RetryException(ex); } } - return out.send(_connection, _compress, _response); + return out.invokeRemote(_connection, _compress, _response); } @Override @@ -184,11 +89,11 @@ public class ConnectRequestHandler if(!initialized()) { - java.util.Iterator<Request> it = _requests.iterator(); + java.util.Iterator<ProxyOutgoingAsyncBase> it = _requests.iterator(); while(it.hasNext()) { - Request request = it.next(); - if(request.outAsync == outAsync) + OutgoingAsyncBase request = it.next(); + if(request == outAsync) { it.remove(); if(outAsync.completed(ex)) @@ -225,26 +130,6 @@ public class ConnectRequestHandler } } - @Override - synchronized public - ConnectionI waitForConnection() - throws InterruptedException, RetryException - { - if(_exception != null) - { - throw new RetryException(_exception); - } - - // - // Wait for the connection establishment to complete or fail. - // - while(!_initialized && _exception == null) - { - wait(); - } - return getConnection(); - } - // // Implementation of Reference.GetConnectionCallback // @@ -300,14 +185,11 @@ public class ConnectRequestHandler // Ignore } - for(Request request : _requests) + for(OutgoingAsyncBase outAsync : _requests) { - if(request.outAsync != null) + if(outAsync.completed(_exception)) { - if(request.outAsync.completed(_exception)) - { - request.outAsync.invokeCompletedAsync(); - } + outAsync.invokeCompletedAsync(); } } _requests.clear(); @@ -332,13 +214,19 @@ public class ConnectRequestHandler ConnectRequestHandler(Reference ref, Ice.ObjectPrxHelperBase proxy) { _reference = ref; - _connect = true; _response = _reference.getMode() == Reference.ModeTwoway; _proxy = (Ice.ObjectPrxHelperBase)proxy; _initialized = false; _flushing = false; - _batchRequestInProgress = false; - _batchStream = new BasicStream(ref.getInstance(), Protocol.currentProtocolEncoding); + + if(_reference.getInstance().queueRequests()) + { + _requestHandler = new QueueRequestHandler(_reference.getInstance(), this); + } + else + { + _requestHandler = this; + } } private boolean @@ -415,7 +303,6 @@ public class ConnectRequestHandler synchronized(this) { assert(_connection != null && !_initialized); - waitBatchRequestInProgress(); // // We set the _flushing flag to true to prevent any additional queuing. Callers @@ -425,61 +312,34 @@ public class ConnectRequestHandler _flushing = true; } - java.util.Iterator<Request> p = _requests.iterator(); // _requests is immutable when _flushing = true Ice.LocalException exception = null; - while(p.hasNext()) + for(ProxyOutgoingAsyncBase outAsync : _requests) { - Request request = p.next(); try { - if(request.os != null) - { - BasicStream os = new BasicStream(request.os.instance(), Protocol.currentProtocolEncoding); - _connection.prepareBatchRequest(os); - try - { - request.os.pos(0); - os.writeBlob(request.os.readBlob(request.os.size())); - } - catch(Ice.LocalException ex) - { - _connection.abortBatchRequest(); - throw ex; - } - _connection.finishBatchRequest(os, _compress); - } - else if((request.outAsync.send(_connection, _compress, _response) & AsyncStatus.InvokeSentCallback) > 0) + if((outAsync.invokeRemote(_connection, _compress, _response) & AsyncStatus.InvokeSentCallback) > 0) { - request.outAsync.invokeSentAsync(); + outAsync.invokeSentAsync(); } } catch(RetryException ex) { exception = ex.get(); - try - { - // Remove the request handler before retrying. - _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this); - } - catch(Ice.CommunicatorDestroyedException exc) - { - // Ignore - } - if(request.outAsync != null) - { - request.outAsync.retryException(ex.get()); - } + + // Remove the request handler before retrying. + _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this); + outAsync.retryException(ex.get()); } catch(Ice.LocalException ex) { exception = ex; - if(request.outAsync != null && request.outAsync.completed(ex)) + if(outAsync.completed(ex)) { - request.outAsync.invokeCompletedAsync(); + outAsync.invokeCompletedAsync(); } } - p.remove(); } + _requests.clear(); // // If we aren't caching the connection, don't bother creating a @@ -489,10 +349,14 @@ public class ConnectRequestHandler // if(_reference.getCacheConnection() && exception == null) { - _connectionRequestHandler = new ConnectionRequestHandler(_reference, _connection, _compress); + _requestHandler = new ConnectionRequestHandler(_reference, _connection, _compress); + if(_reference.getInstance().queueRequests()) + { + _requestHandler = new QueueRequestHandler(_reference.getInstance(), _requestHandler); + } for(Ice.ObjectPrxHelperBase proxy : _proxies) { - proxy.__setRequestHandler(this, _connectionRequestHandler); + proxy.__updateRequestHandler(this, _requestHandler); } } @@ -502,56 +366,22 @@ public class ConnectRequestHandler _exception = exception; _initialized = _exception == null; _flushing = false; - try - { - // - // Only remove once all the requests are flushed to - // guarantee serialization. - // - _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this); - } - catch(Ice.CommunicatorDestroyedException ex) - { - // Ignore - } + + // + // Only remove once all the requests are flushed to + // guarantee serialization. + // + _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this); + _proxies.clear(); _proxy = null; // Break cyclic reference count. notifyAll(); } } - private void - waitBatchRequestInProgress() - { - // - // This is similar to a mutex lock in that the stream is - // only "locked" while the request is in progress. - // - boolean interrupted = false; - while(_batchRequestInProgress) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - interrupted = true; - } - } - // - // Restore the interrupted flag if we were interrupted. - // - if(interrupted) - { - Thread.currentThread().interrupt(); - } - } - private final Reference _reference; - private boolean _connect; private boolean _response; - + private Ice.ObjectPrxHelperBase _proxy; private java.util.Set<Ice.ObjectPrxHelperBase> _proxies = new java.util.HashSet<Ice.ObjectPrxHelperBase>(); @@ -561,9 +391,6 @@ public class ConnectRequestHandler private boolean _initialized; private boolean _flushing; - private java.util.List<Request> _requests = new java.util.LinkedList<Request>(); - private boolean _batchRequestInProgress; - private BasicStream _batchStream; - - private RequestHandler _connectionRequestHandler; + private java.util.List<ProxyOutgoingAsyncBase> _requests = new java.util.LinkedList<ProxyOutgoingAsyncBase>(); + private RequestHandler _requestHandler; } diff --git a/java/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java b/java/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java index 0a52e4162aa..7f9ed1de6f5 100644 --- a/java/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java +++ b/java/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java @@ -41,28 +41,38 @@ public class ConnectionFlushBatch extends OutgoingAsyncBase { return _connection; } - + public void invoke() { try { + final int batchRequestNum = _connection.getBatchRequestQueue().swap(_os); + int status; - if(_instance.queueRequests()) + if(batchRequestNum == 0) + { + status = IceInternal.AsyncStatus.Sent; + if(sent()) + { + status |= IceInternal.AsyncStatus.InvokeSentCallback; + } + } + else if(_instance.queueRequests()) { status = _instance.getQueueExecutor().executeNoThrow(new Callable<Integer>() { @Override - public Integer call() + public Integer call() throws RetryException { - return _connection.flushAsyncBatchRequests(ConnectionFlushBatch.this); + return _connection.sendAsyncRequest(ConnectionFlushBatch.this, false, false, batchRequestNum); } }); } else { - status = _connection.flushAsyncBatchRequests(this); + status = _connection.sendAsyncRequest(this, false, false, batchRequestNum); } - + if((status & AsyncStatus.Sent) > 0) { _sentSynchronously = true; @@ -72,6 +82,13 @@ public class ConnectionFlushBatch extends OutgoingAsyncBase } } } + catch(RetryException ex) + { + if(completed(ex.get())) + { + invokeCompletedAsync(); + } + } catch(Ice.Exception ex) { if(completed(ex)) diff --git a/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java index 36edd3bfcf8..398cc0836f0 100644 --- a/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java @@ -12,14 +12,7 @@ package IceInternal; public class ConnectionRequestHandler implements RequestHandler { @Override - public RequestHandler - connect(Ice.ObjectPrxHelperBase proxy) - { - return this; - } - - @Override - public RequestHandler + public RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler) { try @@ -44,34 +37,12 @@ public class ConnectionRequestHandler implements RequestHandler } return this; } - - @Override - public void - prepareBatchRequest(BasicStream out) - throws RetryException - { - _connection.prepareBatchRequest(out); - } @Override - public void - finishBatchRequest(BasicStream out) - { - _connection.finishBatchRequest(out, _compress); - } - - @Override - public void - abortBatchRequest() - { - _connection.abortBatchRequest(); - } - - @Override - public int sendAsyncRequest(OutgoingAsyncBase out) + public int sendAsyncRequest(ProxyOutgoingAsyncBase out) throws RetryException { - return out.send(_connection, _compress, _response); + return out.invokeRemote(_connection, _compress, _response); } @Override @@ -95,14 +66,7 @@ public class ConnectionRequestHandler implements RequestHandler return _connection; } - @Override - public Ice.ConnectionI - waitForConnection() - { - return _connection; - } - - public ConnectionRequestHandler(Reference ref, Ice.ConnectionI connection, boolean compress) + public ConnectionRequestHandler(Reference ref, Ice.ConnectionI connection, boolean compress) { _reference = ref; _response = _reference.getMode() == Reference.ModeTwoway; diff --git a/java/src/Ice/src/main/java/IceInternal/FixedReference.java b/java/src/Ice/src/main/java/IceInternal/FixedReference.java index 6bf2afc52b5..20c108471f8 100644 --- a/java/src/Ice/src/main/java/IceInternal/FixedReference.java +++ b/java/src/Ice/src/main/java/IceInternal/FixedReference.java @@ -210,75 +210,81 @@ public class FixedReference extends Reference } @Override - public void - getConnection(GetConnectionCallback callback) + public RequestHandler + getRequestHandler(Ice.ObjectPrxHelperBase proxy) { - try + switch(getMode()) + { + case Reference.ModeTwoway: + case Reference.ModeOneway: + case Reference.ModeBatchOneway: { - switch(getMode()) + if(_fixedConnection.endpoint().datagram()) { - case Reference.ModeTwoway: - case Reference.ModeOneway: - case Reference.ModeBatchOneway: - { - if(_fixedConnection.endpoint().datagram()) - { - throw new Ice.NoEndpointException(""); - } - break; - } - - case Reference.ModeDatagram: - case Reference.ModeBatchDatagram: - { - if(!_fixedConnection.endpoint().datagram()) - { - throw new Ice.NoEndpointException(""); - } - break; - } + throw new Ice.NoEndpointException(""); } + break; + } - // - // If a secure connection is requested or secure overrides is set, - // check if the connection is secure. - // - boolean secure; - DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides(); - if(defaultsAndOverrides.overrideSecure) - { - secure = defaultsAndOverrides.overrideSecureValue; - } - else - { - secure = getSecure(); - } - if(secure && !_fixedConnection.endpoint().secure()) + case Reference.ModeDatagram: + case Reference.ModeBatchDatagram: + { + if(!_fixedConnection.endpoint().datagram()) { throw new Ice.NoEndpointException(""); } + break; + } + } + + // + // If a secure connection is requested or secure overrides is set, + // check if the connection is secure. + // + boolean secure; + DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides(); + if(defaultsAndOverrides.overrideSecure) + { + secure = defaultsAndOverrides.overrideSecureValue; + } + else + { + secure = getSecure(); + } + if(secure && !_fixedConnection.endpoint().secure()) + { + throw new Ice.NoEndpointException(""); + } - _fixedConnection.throwException(); // Throw in case our connection is already destroyed. + _fixedConnection.throwException(); // Throw in case our connection is already destroyed. - boolean compress; - if(defaultsAndOverrides.overrideCompress) - { - compress = defaultsAndOverrides.overrideCompressValue; - } - else if(_overrideCompress) - { - compress = _compress; - } - else - { - compress = _fixedConnection.endpoint().compress(); - } - callback.setConnection(_fixedConnection, compress); + boolean compress; + if(defaultsAndOverrides.overrideCompress) + { + compress = defaultsAndOverrides.overrideCompressValue; } - catch(Ice.LocalException ex) + else if(_overrideCompress) { - callback.setException(ex); + compress = _compress; } + else + { + compress = _fixedConnection.endpoint().compress(); + } + + RequestHandler handler = new ConnectionRequestHandler(this, _fixedConnection, compress); + if(getInstance().queueRequests()) + { + handler = new QueueRequestHandler(getInstance(), handler); + } + return proxy.__setRequestHandler(handler); + } + + @Override + public BatchRequestQueue + getBatchRequestQueue() + { + return _fixedConnection.getBatchRequestQueue(); } @Override diff --git a/java/src/Ice/src/main/java/IceInternal/IncomingBase.java b/java/src/Ice/src/main/java/IceInternal/IncomingBase.java index 0ce16f152ec..b2c6871a14a 100644 --- a/java/src/Ice/src/main/java/IceInternal/IncomingBase.java +++ b/java/src/Ice/src/main/java/IceInternal/IncomingBase.java @@ -93,21 +93,15 @@ class IncomingBase public BasicStream __startWriteParams(Ice.FormatType format) { - if(_response) + if(!_response) { - assert(_os.size() == Protocol.headerSize + 4); // Reply status position. - assert(_current.encoding != null); // Encoding for reply is known. - _os.writeByte((byte)0); - _os.startWriteEncaps(_current.encoding, format); + throw new Ice.MarshalException("can't marshal out parameters for oneway dispatch"); } - // - // We still return the stream even if no response is expected. The - // servant code might still write some out parameters if for - // example a method with out parameters somehow and erroneously - // invoked as oneway (or if the invocation is invoked on a - // blobject and the blobject erroneously writes a response). - // + assert(_os.size() == Protocol.headerSize + 4); // Reply status position. + assert(_current.encoding != null); // Encoding for reply is known. + _os.writeByte((byte)0); + _os.startWriteEncaps(_current.encoding, format); return _os; } @@ -119,14 +113,13 @@ class IncomingBase _observer.userException(); } - if(_response) - { - int save = _os.pos(); - _os.pos(Protocol.headerSize + 4); // Reply status position. - _os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException); - _os.pos(save); - _os.endWriteEncaps(); - } + assert(_response); + + int save = _os.pos(); + _os.pos(Protocol.headerSize + 4); // Reply status position. + _os.writeByte(ok ? ReplyStatus.replyOK : ReplyStatus.replyUserException); + _os.pos(save); + _os.endWriteEncaps(); } public void diff --git a/java/src/Ice/src/main/java/IceInternal/OutgoingAsync.java b/java/src/Ice/src/main/java/IceInternal/OutgoingAsync.java index 080f81b68a2..5c6371d4ab1 100644 --- a/java/src/Ice/src/main/java/IceInternal/OutgoingAsync.java +++ b/java/src/Ice/src/main/java/IceInternal/OutgoingAsync.java @@ -65,31 +65,7 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase case Reference.ModeBatchOneway: case Reference.ModeBatchDatagram: { - while(true) - { - try - { - _handler = _proxy.__getRequestHandler(); - _handler.prepareBatchRequest(_os); - break; - } - catch(RetryException ex) - { - // Clear request handler and retry. - _proxy.__setRequestHandler(_handler, null); - } - catch(Ice.LocalException ex) - { - if(_observer != null) - { - _observer.failed(ex.ice_name()); - } - // Clear request handler - _proxy.__setRequestHandler(_handler, null); - _handler = null; - throw ex; - } - } + _proxy.__getBatchRequestQueue().prepareBatchRequest(_os); break; } } @@ -149,10 +125,10 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase } @Override - public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException + public int invokeRemote(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException { _cachedConnection = connection; - return connection.sendAsyncRequest(this, compress, response); + return connection.sendAsyncRequest(this, compress, response, 0); } @Override @@ -164,7 +140,7 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase // Disable caching by marking the streams as cached! _state |= StateCachedBuffers; } - return handler.invokeAsyncRequest(this, _synchronous); + return handler.invokeAsyncRequest(this, 0, _synchronous); } @Override @@ -173,15 +149,12 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase int mode = _proxy.__reference().getMode(); if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) { - if(_handler != null) - { - // - // If we didn't finish a batch oneway or datagram request, we - // must notify the connection about that we give up ownership - // of the batch stream. - // - _handler.abortBatchRequest(); - } + // + // If we didn't finish a batch oneway or datagram request, we + // must notify the connection about that we give up ownership + // of the batch stream. + // + _proxy.__getBatchRequestQueue().abortBatchRequest(_os); } super.abort(ex); @@ -192,23 +165,25 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase int mode = _proxy.__reference().getMode(); if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) { - if(_handler != null) - { - _sentSynchronously = true; - _handler.finishBatchRequest(_os); - finished(true); - } - return; // Don't call sent/completed callback for batch AMI requests + // + // NOTE: we don't call sent/completed callbacks for batch AMI requests + // + _sentSynchronously = true; + _proxy.__getBatchRequestQueue().finishBatchRequest(_os, _proxy, getOperation()); + finished(true); + } + else + { + // + // NOTE: invokeImpl doesn't throw so this can be called from the + // try block with the catch block calling abort() in case of an + // exception. + // + invokeImpl(true); // userThread = true } - - // - // NOTE: invokeImpl doesn't throw so this can be called from the - // try block with the catch block calling abort() in case of an - // exception. - // - invokeImpl(true); // userThread = true } + @Override public final boolean completed(BasicStream is) { // @@ -218,14 +193,14 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase // assert(_proxy.ice_isTwoway()); // Can only be called for twoways. - + if(_childObserver != null) { _childObserver.reply(is.size() - Protocol.headerSize - 4); _childObserver.detach(); _childObserver = null; } - + byte replyStatus; try { @@ -236,14 +211,14 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase } _is.swap(is); replyStatus = _is.readByte(); - + switch(replyStatus) { case ReplyStatus.replyOK: { break; } - + case ReplyStatus.replyUserException: { if(_observer != null) @@ -252,14 +227,14 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase } break; } - + case ReplyStatus.replyObjectNotExist: case ReplyStatus.replyFacetNotExist: case ReplyStatus.replyOperationNotExist: { Ice.Identity id = new Ice.Identity(); id.__read(_is); - + // // For compatibility with the old FacetPath. // @@ -277,9 +252,9 @@ public class OutgoingAsync extends ProxyOutgoingAsyncBase { facet = ""; } - + String operation = _is.readString(); - + Ice.RequestFailedException ex = null; switch(replyStatus) { diff --git a/java/src/Ice/src/main/java/IceInternal/OutgoingAsyncBase.java b/java/src/Ice/src/main/java/IceInternal/OutgoingAsyncBase.java index e89bf01f5c9..48b231b22da 100644 --- a/java/src/Ice/src/main/java/IceInternal/OutgoingAsyncBase.java +++ b/java/src/Ice/src/main/java/IceInternal/OutgoingAsyncBase.java @@ -16,31 +16,20 @@ package IceInternal; // public abstract class OutgoingAsyncBase extends IceInternal.AsyncResultI { - public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException - { - assert(false); // This should be overriden if this object is used with a request handler - return AsyncStatus.Queued; - } - - public int invokeCollocated(CollocatedRequestHandler handler) - { - assert(false); // This should be overriden if this object is used with a request handler - return AsyncStatus.Queued; - } - public boolean sent() { return sent(true); } - public boolean completed(Ice.Exception ex) + public boolean completed(BasicStream is) { - return finished(ex); + assert(false); // Must be implemented by classes that handle responses + return false; } - public void retryException(Ice.Exception ex) + public boolean completed(Ice.Exception ex) { - assert(false); + return finished(ex); } public final void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId) @@ -55,7 +44,7 @@ public abstract class OutgoingAsyncBase extends IceInternal.AsyncResultI } } } - + public final void attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId) { if(_observer != null) diff --git a/java/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java b/java/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java index 865be583cc2..78c9f69b174 100644 --- a/java/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java +++ b/java/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java @@ -28,19 +28,28 @@ public class ProxyFlushBatch extends ProxyOutgoingAsyncBase { super(prx, operation, callback); _observer = ObserverHelper.get(prx, operation); + _batchRequestNum = prx.__getBatchRequestQueue().swap(_os); } @Override - public int send(Ice.ConnectionI connection, boolean compress, boolean response) + public int invokeRemote(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException { + if(_batchRequestNum == 0) + { + return sent() ? AsyncStatus.Sent | AsyncStatus.InvokeSentCallback : AsyncStatus.Sent; + } _cachedConnection = connection; - return connection.flushAsyncBatchRequests(this); + return connection.sendAsyncRequest(this, compress, false, _batchRequestNum); } @Override public int invokeCollocated(CollocatedRequestHandler handler) { - return handler.invokeAsyncBatchRequests(this); + if(_batchRequestNum == 0) + { + return sent() ? AsyncStatus.Sent | AsyncStatus.InvokeSentCallback : AsyncStatus.Sent; + } + return handler.invokeAsyncRequest(this, _batchRequestNum, false); } public void invoke() @@ -48,18 +57,6 @@ public class ProxyFlushBatch extends ProxyOutgoingAsyncBase Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol())); invokeImpl(true); // userThread = true } - - @Override - protected void handleRetryException(Ice.Exception exc) - { - _proxy.__setRequestHandler(_handler, null); // Clear request handler - throw exc; // No retries, we want to notify the user of potentially lost batch requests - } - - @Override - protected int handleException(Ice.Exception exc) - { - _proxy.__setRequestHandler(_handler, null); // Clear request handler - throw exc; // No retries, we want to notify the user of potentially lost batch requests - } + + protected int _batchRequestNum; } diff --git a/java/src/Ice/src/main/java/IceInternal/ProxyGetConnection.java b/java/src/Ice/src/main/java/IceInternal/ProxyGetConnection.java index 16069d4c423..753a921eb75 100644 --- a/java/src/Ice/src/main/java/IceInternal/ProxyGetConnection.java +++ b/java/src/Ice/src/main/java/IceInternal/ProxyGetConnection.java @@ -31,7 +31,7 @@ public class ProxyGetConnection extends ProxyOutgoingAsyncBase } @Override - public int send(Ice.ConnectionI connection, boolean compress, boolean response) + public int invokeRemote(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException { _cachedConnection = connection; diff --git a/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java b/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java index 8a3a3d1e45e..77d5c1cf0fc 100644 --- a/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java +++ b/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java @@ -30,6 +30,10 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase } } + public abstract int invokeRemote(Ice.ConnectionI con, boolean compress, boolean response) throws RetryException; + + public abstract int invokeCollocated(CollocatedRequestHandler handler); + @Override public Ice.ObjectPrx getProxy() { @@ -45,7 +49,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase _childObserver.detach(); _childObserver = null; } - + // // NOTE: at this point, synchronization isn't needed, no other threads should be // calling on the callback. @@ -66,7 +70,6 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase } } - @Override public void retryException(Ice.Exception ex) { try @@ -77,7 +80,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase // require could end up waiting for the flush of the // connection to be done. // - handleRetryException(ex); + _proxy.__updateRequestHandler(_handler, null); // Clear request handler and always retry. _instance.retryQueue().add(this, 0); } catch(Ice.Exception exc) @@ -93,7 +96,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase { invokeImpl(false); } - + public void cancelable(final CancellationHandler handler) { if(_proxy.__reference().getInvocationTimeout() == -2 && _cachedConnection != null) @@ -114,7 +117,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase } super.cancelable(handler); } - + public void abort(Ice.Exception ex) { assert(_childObserver == null); @@ -141,7 +144,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase _cnt = 0; _sent = false; } - + protected ProxyOutgoingAsyncBase(Ice.ObjectPrxHelperBase prx, String op, CallbackBase delegate, BasicStream os) { super(prx.ice_getCommunicator(), prx.__reference().getInstance(), op, delegate, os); @@ -162,7 +165,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase } return r; } - + protected void invokeImpl(boolean userThread) { try @@ -196,6 +199,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase try { _sent = false; + _handler = null; _handler = _proxy.__getRequestHandler(); int status = _handler.sendAsyncRequest(this); if((status & AsyncStatus.Sent) > 0) @@ -220,7 +224,7 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase } catch(RetryException ex) { - handleRetryException(ex.get()); + _proxy.__updateRequestHandler(_handler, null); // Clear request handler and always retry. } catch(Ice.Exception ex) { @@ -248,8 +252,8 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase // // If called from the user thread we re-throw, the exception // will be catch by the caller and abort() will be called. - // - if(userThread) + // + if(userThread) { throw ex; } @@ -297,18 +301,13 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase return super.finished(ok); } - protected void handleRetryException(Ice.Exception exc) - { - _proxy.__setRequestHandler(_handler, null); // Clear request handler and always retry. - } - protected int handleException(Ice.Exception exc) { Ice.Holder<Integer> interval = new Ice.Holder<Integer>(); _cnt = _proxy.__handleException(exc, _handler, _mode, _sent, interval, _cnt); return interval.value; } - + final protected Ice.ObjectPrxHelperBase _proxy; protected RequestHandler _handler; protected Ice.OperationMode _mode; diff --git a/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java index 1b5a87578cb..6952862e508 100644 --- a/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java @@ -24,29 +24,13 @@ public class QueueRequestHandler implements RequestHandler } @Override - public RequestHandler - connect(final Ice.ObjectPrxHelperBase proxy) - { - _executor.executeNoThrow(new Callable<Void>() - { - @Override - public Void call() - { - _delegate.connect(proxy); - return null; - } - }); - return this; - } - - @Override - public RequestHandler + public RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler) { // // Only update to new handler if the previous handler matches this one. // - if(previousHandler == this) + if(previousHandler == this || previousHandler == _delegate) { if(newHandler != null) { @@ -59,55 +43,10 @@ public class QueueRequestHandler implements RequestHandler } return this; } - - @Override - public void - prepareBatchRequest(final BasicStream out) throws RetryException - { - _executor.execute(new Callable<Void>() - { - @Override - public Void call() throws RetryException - { - _delegate.prepareBatchRequest(out); - return null; - } - }); - } - - @Override - public void - finishBatchRequest(final BasicStream out) - { - _executor.executeNoThrow(new Callable<Void>() - { - @Override - public Void call() throws RetryException - { - _delegate.finishBatchRequest(out); - return null; - } - }); - } - - @Override - public void - abortBatchRequest() - { - _executor.executeNoThrow(new Callable<Void>() - { - @Override - public Void call() - { - _delegate.abortBatchRequest(); - return null; - } - }); - } @Override public int - sendAsyncRequest(final OutgoingAsyncBase out) throws RetryException + sendAsyncRequest(final ProxyOutgoingAsyncBase out) throws RetryException { return _executor.execute(new Callable<Integer>() { @@ -148,14 +87,6 @@ public class QueueRequestHandler implements RequestHandler return _delegate.getConnection(); } - @Override - public ConnectionI - waitForConnection() - throws InterruptedException, RetryException - { - return _delegate.waitForConnection(); - } - private final RequestHandler _delegate; private final QueueExecutorService _executor; } diff --git a/java/src/Ice/src/main/java/IceInternal/Reference.java b/java/src/Ice/src/main/java/IceInternal/Reference.java index 8e70e0b438d..d06d45080e2 100644 --- a/java/src/Ice/src/main/java/IceInternal/Reference.java +++ b/java/src/Ice/src/main/java/IceInternal/Reference.java @@ -412,7 +412,9 @@ public abstract class Reference implements Cloneable // public abstract java.util.Map<String, String> toProperty(String prefix); - public abstract void getConnection(GetConnectionCallback callback); + public abstract RequestHandler getRequestHandler(Ice.ObjectPrxHelperBase proxy); + + public abstract BatchRequestQueue getBatchRequestQueue(); @Override public boolean diff --git a/java/src/Ice/src/main/java/IceInternal/RequestHandler.java b/java/src/Ice/src/main/java/IceInternal/RequestHandler.java index 4130f27217f..370bbff0901 100644 --- a/java/src/Ice/src/main/java/IceInternal/RequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/RequestHandler.java @@ -11,20 +11,12 @@ package IceInternal; public interface RequestHandler extends CancellationHandler { - RequestHandler connect(Ice.ObjectPrxHelperBase proxy); RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler); - void prepareBatchRequest(BasicStream out) - throws RetryException; - void finishBatchRequest(BasicStream out); - void abortBatchRequest(); - - int sendAsyncRequest(OutgoingAsyncBase out) + int sendAsyncRequest(ProxyOutgoingAsyncBase out) throws RetryException; Reference getReference(); Ice.ConnectionI getConnection(); - Ice.ConnectionI waitForConnection() - throws InterruptedException, RetryException; } diff --git a/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java b/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java index 5e45d69b8a5..1c505f883e0 100644 --- a/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java +++ b/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java @@ -11,6 +11,7 @@ package IceInternal; import java.util.Map; import java.util.HashMap; +import java.util.concurrent.Callable; public final class RequestHandlerFactory { @@ -19,19 +20,20 @@ public final class RequestHandlerFactory _instance = instance; } - public RequestHandler - getRequestHandler(Reference ref, Ice.ObjectPrxHelperBase proxy) + public RequestHandler + getRequestHandler(final RoutableReference ref, Ice.ObjectPrxHelperBase proxy) { if(ref.getCollocationOptimized()) { Ice.ObjectAdapter adapter = _instance.objectAdapterFactory().findObjectAdapter(proxy); if(adapter != null) { - return new CollocatedRequestHandler(ref, adapter); + return proxy.__setRequestHandler(new CollocatedRequestHandler(ref, adapter)); } } - RequestHandler handler; + ConnectRequestHandler handler = null; + boolean connect = false; if(ref.getCacheConnection()) { synchronized(this) @@ -41,25 +43,40 @@ public final class RequestHandlerFactory { handler = new ConnectRequestHandler(ref, proxy); _handlers.put(ref, handler); + connect = true; } } } else { handler = new ConnectRequestHandler(ref, proxy); + connect = true; } - if(_instance.queueRequests()) + if(connect) { - return new QueueRequestHandler(_instance, handler); - } - else - { - return handler; + if(_instance.queueRequests()) + { + final ConnectRequestHandler h = handler; + _instance.getQueueExecutor().executeNoThrow(new Callable<Void>() + { + @Override + public Void call() + { + ref.getConnection(h); + return null; + } + }); + } + else + { + ref.getConnection(handler); + } } + return proxy.__setRequestHandler(handler.connect(proxy)); } - void + void removeRequestHandler(Reference ref, RequestHandler handler) { if(ref.getCacheConnection()) @@ -75,5 +92,5 @@ public final class RequestHandlerFactory } private final Instance _instance; - private final Map<Reference, RequestHandler> _handlers = new HashMap<Reference, RequestHandler>(); + private final Map<Reference, ConnectRequestHandler> _handlers = new HashMap<Reference, ConnectRequestHandler>(); } diff --git a/java/src/Ice/src/main/java/IceInternal/RoutableReference.java b/java/src/Ice/src/main/java/IceInternal/RoutableReference.java index 130dc8e9d0b..4181ffda251 100644 --- a/java/src/Ice/src/main/java/IceInternal/RoutableReference.java +++ b/java/src/Ice/src/main/java/IceInternal/RoutableReference.java @@ -496,6 +496,19 @@ public class RoutableReference extends Reference } @Override + public RequestHandler + getRequestHandler(Ice.ObjectPrxHelperBase proxy) + { + return getInstance().requestHandlerFactory().getRequestHandler(this, proxy); + } + + @Override + public BatchRequestQueue + getBatchRequestQueue() + { + return new BatchRequestQueue(getInstance(), getMode() == Reference.ModeBatchDatagram); + } + public void getConnection(final GetConnectionCallback callback) { diff --git a/java/test/src/main/java/test/Ice/ami/AMI.java b/java/test/src/main/java/test/Ice/ami/AMI.java index 6a1e709f66b..db1cb054a94 100644 --- a/java/test/src/main/java/test/Ice/ami/AMI.java +++ b/java/test/src/main/java/test/Ice/ami/AMI.java @@ -1818,7 +1818,7 @@ public class AMI TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); b1.opBatch(); b1.ice_getConnection().close(false); - final FlushExCallback cb = new FlushExCallback(); + final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = b1.begin_ice_flushBatchRequests( new Ice.Callback() { @@ -1835,9 +1835,9 @@ public class AMI } }); cb.check(); - test(!r.isSent()); + test(r.isSent()); test(r.isCompleted()); - test(p.opBatchCount() == 0); + test(p.waitForBatch(1)); } { @@ -1877,9 +1877,10 @@ public class AMI // test(p.opBatchCount() == 0); TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); + b1.ice_getConnection(); b1.opBatch(); b1.ice_getConnection().close(false); - final FlushExCallback cb = new FlushExCallback(); + final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = b1.begin_ice_flushBatchRequests( new Ice.Callback_Object_ice_flushBatchRequests() { @@ -1896,9 +1897,9 @@ public class AMI } }); cb.check(); - test(!r.isSent()); + test(r.isSent()); test(r.isCompleted()); - test(p.opBatchCount() == 0); + test(p.waitForBatch(1)); } } out.println("ok"); @@ -1913,7 +1914,8 @@ public class AMI // AsyncResult. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy( + p.ice_getIdentity()).ice_batchOneway()); b1.opBatch(); b1.opBatch(); final FlushCallback cb = new FlushCallback(); @@ -1943,7 +1945,8 @@ public class AMI // AsyncResult exception. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy( + p.ice_getIdentity()).ice_batchOneway()); b1.opBatch(); b1.ice_getConnection().close(false); final FlushExCallback cb = new FlushExCallback(); @@ -1973,7 +1976,8 @@ public class AMI // Type-safe. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy( + p.ice_getIdentity()).ice_batchOneway()); b1.opBatch(); b1.opBatch(); final FlushCallback cb = new FlushCallback(); @@ -2003,7 +2007,8 @@ public class AMI // Type-safe exception. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy( + p.ice_getIdentity()).ice_batchOneway()); b1.opBatch(); b1.ice_getConnection().close(false); final FlushExCallback cb = new FlushExCallback(); @@ -2038,7 +2043,8 @@ public class AMI // AsyncResult - 1 connection. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy( + p.ice_getIdentity()).ice_batchOneway()); b1.opBatch(); b1.opBatch(); final FlushCallback cb = new FlushCallback(); @@ -2068,7 +2074,8 @@ public class AMI // AsyncResult exception - 1 connection. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy( + p.ice_getIdentity()).ice_batchOneway()); b1.opBatch(); b1.ice_getConnection().close(false); final FlushCallback cb = new FlushCallback(); @@ -2098,8 +2105,10 @@ public class AMI // AsyncResult - 2 connections. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); - TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast( + p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); + TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast( + p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); b2.ice_getConnection(); // Ensure connection is established. b1.opBatch(); b1.opBatch(); @@ -2135,8 +2144,10 @@ public class AMI // Exceptions should not be reported. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); - TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast( + p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); + TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast( + p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); b2.ice_getConnection(); // Ensure connection is established. b1.opBatch(); b2.opBatch(); @@ -2170,8 +2181,10 @@ public class AMI // The sent callback should be invoked even if all connections fail. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); - TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast( + p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); + TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast( + p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); b2.ice_getConnection(); // Ensure connection is established. b1.opBatch(); b2.opBatch(); @@ -2204,7 +2217,8 @@ public class AMI // Type-safe - 1 connection. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast( + p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); b1.opBatch(); b1.opBatch(); final FlushCallback cb = new FlushCallback(); @@ -2234,7 +2248,8 @@ public class AMI // Type-safe exception - 1 connection. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast( + p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); b1.opBatch(); b1.ice_getConnection().close(false); final FlushCallback cb = new FlushCallback(); @@ -2264,8 +2279,10 @@ public class AMI // 2 connections. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); - TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast( + p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); + TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast( + p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); b2.ice_getConnection(); // Ensure connection is established. b1.opBatch(); b1.opBatch(); @@ -2301,8 +2318,10 @@ public class AMI // Exceptions should not be reported. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); - TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast( + p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); + TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast( + p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); b2.ice_getConnection(); // Ensure connection is established. b1.opBatch(); b2.opBatch(); @@ -2336,8 +2355,10 @@ public class AMI // The sent callback should be invoked even if all connections fail. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); - TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast( + p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); + TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast( + p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); b2.ice_getConnection(); // Ensure connection is established. b1.opBatch(); b2.opBatch(); diff --git a/java/test/src/main/java/test/Ice/ami/lambda/AMI.java b/java/test/src/main/java/test/Ice/ami/lambda/AMI.java index 09b3cfbe23b..d246212bda3 100644 --- a/java/test/src/main/java/test/Ice/ami/lambda/AMI.java +++ b/java/test/src/main/java/test/Ice/ami/lambda/AMI.java @@ -846,15 +846,15 @@ public class AMI TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); b1.opBatch(); b1.ice_getConnection().close(false); - final FlushExCallback cb = new FlushExCallback(); + final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = b1.begin_ice_flushBatchRequests( null, (Ice.Exception ex) -> cb.exception(ex), (boolean sentSynchronously) -> cb.sent(sentSynchronously)); cb.check(); - test(!r.isSent()); + test(r.isSent()); test(r.isCompleted()); - test(p.opBatchCount() == 0); + test(p.waitForBatch(1)); } } out.println("ok"); @@ -869,7 +869,8 @@ public class AMI // Type-safe. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy( + p.ice_getIdentity()).ice_batchOneway()); b1.opBatch(); b1.opBatch(); final FlushCallback cb = new FlushCallback(); @@ -897,7 +898,8 @@ public class AMI // Type-safe exception. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy( + p.ice_getIdentity()).ice_batchOneway()); b1.opBatch(); b1.ice_getConnection().close(false); final FlushExCallback cb = new FlushExCallback(); @@ -921,7 +923,8 @@ public class AMI // Type-safe - 1 connection. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy( + p.ice_getIdentity()).ice_batchOneway()); b1.opBatch(); b1.opBatch(); final FlushCallback cb = new FlushCallback(); @@ -940,7 +943,8 @@ public class AMI // Type-safe exception - 1 connection. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy( + p.ice_getIdentity()).ice_batchOneway()); b1.opBatch(); b1.ice_getConnection().close(false); final FlushCallback cb = new FlushCallback(); @@ -959,8 +963,10 @@ public class AMI // 2 connections. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); - TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy( + p.ice_getIdentity()).ice_batchOneway()); + TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast( + p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); b2.ice_getConnection(); // Ensure connection is established. b1.opBatch(); b1.opBatch(); @@ -985,8 +991,10 @@ public class AMI // Exceptions should not be reported. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); - TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy( + p.ice_getIdentity()).ice_batchOneway()); + TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast( + p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); b2.ice_getConnection(); // Ensure connection is established. b1.opBatch(); b2.opBatch(); @@ -1009,8 +1017,10 @@ public class AMI // The sent callback should be invoked even if all connections fail. // test(p.opBatchCount() == 0); - TestIntfPrx b1 = (TestIntfPrx)p.ice_batchOneway(); - TestIntfPrx b2 = (TestIntfPrx)p.ice_connectionId("2").ice_batchOneway(); + TestIntfPrx b1 = TestIntfPrxHelper.uncheckedCast(p.ice_getConnection().createProxy( + p.ice_getIdentity()).ice_batchOneway()); + TestIntfPrx b2 = TestIntfPrxHelper.uncheckedCast( + p.ice_connectionId("2").ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway()); b2.ice_getConnection(); // Ensure connection is established. b1.opBatch(); b2.opBatch(); diff --git a/java/test/src/main/java/test/Ice/background/AllTests.java b/java/test/src/main/java/test/Ice/background/AllTests.java index 6b890a32c8a..87eb9d4ed7a 100644 --- a/java/test/src/main/java/test/Ice/background/AllTests.java +++ b/java/test/src/main/java/test/Ice/background/AllTests.java @@ -726,7 +726,16 @@ public class AllTests configuration.initializeSocketStatus(IceInternal.SocketOperation.Write); background.ice_getCachedConnection().close(true); - background.ice_ping(); + + try + { + background.ice_ping(); + } + catch(Ice.LocalException ex) + { + test(false); // Something's wrong with retries. + } + configuration.initializeSocketStatus(IceInternal.SocketOperation.None); ctl.initializeException(true); @@ -932,16 +941,6 @@ public class AllTests // // First send small requests to test without auto-flushing. // - backgroundBatchOneway.ice_ping(); - backgroundBatchOneway.ice_getConnection().close(false); - try - { - backgroundBatchOneway.ice_ping(); - test(false); - } - catch(Ice.CloseConnectionException ex) - { - } ctl.holdAdapter(); backgroundBatchOneway.op(); backgroundBatchOneway.op(); @@ -949,20 +948,11 @@ public class AllTests backgroundBatchOneway.op(); ctl.resumeAdapter(); backgroundBatchOneway.ice_flushBatchRequests(); + backgroundBatchOneway.ice_getConnection().close(false); // // Send bigger requests to test with auto-flushing. // - backgroundBatchOneway.ice_ping(); - backgroundBatchOneway.ice_getConnection().close(false); - try - { - backgroundBatchOneway.ice_ping(); - test(false); - } - catch(Ice.CloseConnectionException ex) - { - } ctl.holdAdapter(); backgroundBatchOneway.opWithPayload(seq); backgroundBatchOneway.opWithPayload(seq); @@ -970,21 +960,11 @@ public class AllTests backgroundBatchOneway.opWithPayload(seq); ctl.resumeAdapter(); backgroundBatchOneway.ice_flushBatchRequests(); + backgroundBatchOneway.ice_getConnection().close(false); // // Then try the same thing with async flush. // - - backgroundBatchOneway.ice_ping(); - backgroundBatchOneway.ice_getConnection().close(false); - try - { - backgroundBatchOneway.ice_ping(); - test(false); - } - catch(Ice.CloseConnectionException ex) - { - } ctl.holdAdapter(); backgroundBatchOneway.op(); backgroundBatchOneway.op(); @@ -994,16 +974,6 @@ public class AllTests backgroundBatchOneway.begin_ice_flushBatchRequests(); backgroundBatchOneway.ice_getConnection().close(false); - backgroundBatchOneway.ice_ping(); - backgroundBatchOneway.ice_getConnection().close(false); - try - { - backgroundBatchOneway.ice_ping(); - test(false); - } - catch(Ice.CloseConnectionException ex) - { - } ctl.holdAdapter(); backgroundBatchOneway.opWithPayload(seq); backgroundBatchOneway.opWithPayload(seq); @@ -1011,15 +981,6 @@ public class AllTests backgroundBatchOneway.opWithPayload(seq); ctl.resumeAdapter(); r = backgroundBatchOneway.begin_ice_flushBatchRequests(); - // - // We can't close the connection before ensuring all the batches - // have been sent since with auto-flushing the close connection - // message might be sent once the first call opWithPayload is sent - // and before the flushBatchRequests (this would therefore result - // in the flush to report a CloseConnectionException). Instead we - // wait for the first flush to complete. - // - //backgroundBatchOneway.ice_getConnection().close(false); backgroundBatchOneway.end_ice_flushBatchRequests(r); backgroundBatchOneway.ice_getConnection().close(false); } diff --git a/java/test/src/main/java/test/Ice/invoke/AllTests.java b/java/test/src/main/java/test/Ice/invoke/AllTests.java index b07b9805346..de577181c4f 100644 --- a/java/test/src/main/java/test/Ice/invoke/AllTests.java +++ b/java/test/src/main/java/test/Ice/invoke/AllTests.java @@ -239,6 +239,7 @@ public class AllTests Ice.ObjectPrx base = communicator.stringToProxy(ref); MyClassPrx cl = MyClassPrxHelper.checkedCast(base); MyClassPrx oneway = MyClassPrxHelper.uncheckedCast(cl.ice_oneway()); + MyClassPrx batchOneway = MyClassPrxHelper.uncheckedCast(cl.ice_batchOneway()); out.print("testing ice_invoke... "); out.flush(); @@ -249,6 +250,12 @@ public class AllTests test(false); } + test(batchOneway.ice_invoke("opOneway", Ice.OperationMode.Normal, null, null)); + test(batchOneway.ice_invoke("opOneway", Ice.OperationMode.Normal, null, null)); + test(batchOneway.ice_invoke("opOneway", Ice.OperationMode.Normal, null, null)); + test(batchOneway.ice_invoke("opOneway", Ice.OperationMode.Normal, null, null)); + batchOneway.ice_flushBatchRequests(); + Ice.OutputStream outS = Ice.Util.createOutputStream(communicator); outS.startEncapsulation(); outS.writeString(testString); diff --git a/java/test/src/main/java/test/Ice/metrics/Client.java b/java/test/src/main/java/test/Ice/metrics/Client.java index d009e428b79..0f2e565f939 100644 --- a/java/test/src/main/java/test/Ice/metrics/Client.java +++ b/java/test/src/main/java/test/Ice/metrics/Client.java @@ -43,6 +43,7 @@ public class Client extends test.Util.Application initData.properties.setProperty("Ice.Warn.Connections", "0"); initData.properties.setProperty("Ice.MessageSizeMax", "50000"); initData.properties.setProperty("Ice.Default.Host", "127.0.0.1"); + initData.observer = _observer; return initData; } diff --git a/java/test/src/main/java/test/Ice/operations/BatchOneways.java b/java/test/src/main/java/test/Ice/operations/BatchOneways.java index 6f914b46f97..6982d89a4c3 100644 --- a/java/test/src/main/java/test/Ice/operations/BatchOneways.java +++ b/java/test/src/main/java/test/Ice/operations/BatchOneways.java @@ -25,32 +25,60 @@ class BatchOneways } } - static void - batchOneways(MyClassPrx p, PrintWriter out) + static class BatchRequestInterceptorI implements Ice.BatchRequestInterceptor { - final byte[] bs1 = new byte[10 * 1024]; - final byte[] bs2 = new byte[99 * 1024]; - - try + public void + enqueue(Ice.BatchRequest request, int count, int size) { - p.opByteSOneway(bs1); - } - catch(Ice.MemoryLimitException ex) - { - test(false); + test(request.getOperation().equals("opByteSOneway") || request.getOperation().equals("ice_ping")); + test(request.getProxy().ice_isBatchOneway()); + + if(count > 0) + { + test(_lastRequestSize + _size == size); + } + _count = count; + _size = size; + + if(_size + request.getSize() > 25000) + { + request.getProxy().begin_ice_flushBatchRequests(); + _size = 18; // header + } + + if(_enabled) + { + _lastRequestSize = request.getSize(); + ++_count; + request.enqueue(); + } } - try + public void + setEnqueue(boolean enabled) { - p.opByteSOneway(bs2); + _enabled = enabled; } - catch(Ice.MemoryLimitException ex) + + public int + count() { - test(false); + return _count; } + private boolean _enabled; + private int _count; + private int _size; + private int _lastRequestSize; + }; + + static void + batchOneways(MyClassPrx p, PrintWriter out) + { + final byte[] bs1 = new byte[10 * 1024]; + MyClassPrx batch = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway()); - batch.ice_flushBatchRequests(); + batch.ice_flushBatchRequests(); // Empty flush p.opByteSOnewayCallCount(); // Reset the call count @@ -81,39 +109,22 @@ class BatchOneways if(batch.ice_getConnection() != null) { - batch.ice_getConnection().flushBatchRequests(); - - MyClassPrx batch2 = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway()); + MyClassPrx batch1 = (MyClassPrx)p.ice_batchOneway(); + MyClassPrx batch2 = (MyClassPrx)p.ice_batchOneway(); - batch.ice_ping(); + batch1.ice_ping(); batch2.ice_ping(); - batch.ice_flushBatchRequests(); - batch.ice_getConnection().close(false); - batch.ice_ping(); + batch1.ice_flushBatchRequests(); + batch1.ice_getConnection().close(false); + batch1.ice_ping(); batch2.ice_ping(); - batch.ice_getConnection(); + batch1.ice_getConnection(); batch2.ice_getConnection(); - batch.ice_ping(); - batch.ice_getConnection().close(false); - try - { - batch.ice_ping(); - test(false); - } - catch(Ice.CloseConnectionException ex) - { - } - try - { - batch2.ice_ping(); - test(false); - } - catch(Ice.CloseConnectionException ex) - { - } - batch.ice_ping(); + batch1.ice_ping(); + batch1.ice_getConnection().close(false); + batch1.ice_ping(); batch2.ice_ping(); } @@ -122,11 +133,49 @@ class BatchOneways Ice.ObjectPrx batch3 = batch.ice_identity(identity); batch3.ice_ping(); batch3.ice_flushBatchRequests(); - + // Make sure that a bogus batch request doesn't cause troubles to other ones. batch3.ice_ping(); batch.ice_ping(); batch.ice_flushBatchRequests(); batch.ice_ping(); + + if(batch.ice_getConnection() != null) + { + Ice.InitializationData initData = new Ice.InitializationData(); + initData.properties = p.ice_getCommunicator().getProperties()._clone(); + BatchRequestInterceptorI interceptor = new BatchRequestInterceptorI(); + initData.batchRequestInterceptor = interceptor; + Ice.Communicator ic = Ice.Util.initialize(initData); + + batch = MyClassPrxHelper.uncheckedCast(ic.stringToProxy(p.toString()).ice_batchOneway()); + + test(interceptor.count() == 0); + batch.ice_ping(); + batch.ice_ping(); + batch.ice_ping(); + test(interceptor.count() == 0); + + interceptor.setEnqueue(true); + batch.ice_ping(); + batch.ice_ping(); + batch.ice_ping(); + test(interceptor.count() == 3); + + batch.ice_flushBatchRequests(); + batch.ice_ping(); + test(interceptor.count() == 1); + + batch.opByteSOneway(bs1); + test(interceptor.count() == 2); + batch.opByteSOneway(bs1); + test(interceptor.count() == 3); + + batch.opByteSOneway(bs1); // This should trigger the flush + batch.ice_ping(); + test(interceptor.count() == 2); + + ic.destroy(); + } } } diff --git a/java/test/src/main/java/test/Ice/operations/BatchOnewaysAMI.java b/java/test/src/main/java/test/Ice/operations/BatchOnewaysAMI.java index 83f664a5ee9..717712f279e 100644 --- a/java/test/src/main/java/test/Ice/operations/BatchOnewaysAMI.java +++ b/java/test/src/main/java/test/Ice/operations/BatchOnewaysAMI.java @@ -62,42 +62,13 @@ class BatchOnewaysAMI static void batchOneways(MyClassPrx p, PrintWriter out) { final byte[] bs1 = new byte[10 * 1024]; - final byte[] bs2 = new byte[99 * 1024]; - - final Callback cb = new Callback(); - p.begin_opByteSOneway(bs1, new Callback_MyClass_opByteSOneway() - { - @Override - public void exception(LocalException ex) - { - test(false); - } - - @Override - public void response() - { - cb.called(); - } - }); - cb.check(); - p.begin_opByteSOneway(bs2, new Callback_MyClass_opByteSOneway() - { - @Override - public void exception(LocalException ex) - { - test(false); - } - - @Override - public void response() - { - cb.called(); - } - }); - cb.check(); MyClassPrx batch = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway()); - batch.end_ice_flushBatchRequests(batch.begin_ice_flushBatchRequests()); + batch.end_ice_flushBatchRequests(batch.begin_ice_flushBatchRequests()); // Empty flush + + test(batch.begin_ice_flushBatchRequests().isCompleted()); // Empty flush + test(batch.begin_ice_flushBatchRequests().isSent()); // Empty flush + test(batch.begin_ice_flushBatchRequests().sentSynchronously()); // Empty flush for(int i = 0; i < 30; ++i) { @@ -116,10 +87,21 @@ class BatchOnewaysAMI }); } - if(batch.ice_getConnection() != null) + int count = 0; + while(count < 27) // 3 * 9 requests auto-flushed. { - batch.ice_getConnection().end_flushBatchRequests(batch.ice_getConnection().begin_flushBatchRequests()); + count += p.opByteSOnewayCallCount(); + try + { + Thread.sleep(10); + } + catch(InterruptedException ex) + { + } + } + if(batch.ice_getConnection() != null) + { MyClassPrx batch2 = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway()); batch.begin_ice_ping(); @@ -134,44 +116,8 @@ class BatchOnewaysAMI batch.begin_ice_ping(); batch.ice_getConnection().close(false); - batch.begin_ice_ping(new Ice.Callback_Object_ice_ping() - { - - @Override - public void response() - { - test(false); - } - - @Override - public void exception(LocalException ex) - { - test(ex instanceof Ice.CloseConnectionException); - cb.called(); - } - - }); - cb.check(); - batch2.begin_ice_ping(new Ice.Callback_Object_ice_ping() - { - - @Override - public void response() - { - test(false); - } - - @Override - public void exception(LocalException ex) - { - test(ex instanceof Ice.CloseConnectionException); - cb.called(); - } - - }); - cb.check(); - batch.begin_ice_ping(); - batch2.begin_ice_ping(); + batch.begin_ice_ping().throwLocalException(); + batch2.begin_ice_ping().throwLocalException(); } Ice.Identity identity = new Ice.Identity(); |