diff options
Diffstat (limited to 'java/src')
12 files changed, 128 insertions, 44 deletions
diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/CommunicatorI.java b/java/src/Ice/src/main/java/com/zeroc/Ice/CommunicatorI.java index 90d1d89467f..48cfd306144 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/CommunicatorI.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/CommunicatorI.java @@ -211,18 +211,18 @@ public final class CommunicatorI implements Communicator } @Override - public void flushBatchRequests() + public void flushBatchRequests(CompressBatch compressBatch) { - _iceI_flushBatchRequestsAsync().waitForResponse(); + _iceI_flushBatchRequestsAsync(compressBatch).waitForResponse(); } @Override - public java.util.concurrent.CompletableFuture<Void> flushBatchRequestsAsync() + public java.util.concurrent.CompletableFuture<Void> flushBatchRequestsAsync(CompressBatch compressBatch) { - return _iceI_flushBatchRequestsAsync(); + return _iceI_flushBatchRequestsAsync(compressBatch); } - public com.zeroc.IceInternal.CommunicatorFlushBatch _iceI_flushBatchRequestsAsync() + public com.zeroc.IceInternal.CommunicatorFlushBatch _iceI_flushBatchRequestsAsync(CompressBatch compressBatch) { com.zeroc.IceInternal.OutgoingConnectionFactory connectionFactory = _instance.outgoingConnectionFactory(); com.zeroc.IceInternal.ObjectAdapterFactory adapterFactory = _instance.objectAdapterFactory(); @@ -234,8 +234,8 @@ public final class CommunicatorI implements Communicator com.zeroc.IceInternal.CommunicatorFlushBatch f = new com.zeroc.IceInternal.CommunicatorFlushBatch(this, _instance); - connectionFactory.flushAsyncBatchRequests(f); - adapterFactory.flushAsyncBatchRequests(f); + connectionFactory.flushAsyncBatchRequests(compressBatch, f); + adapterFactory.flushAsyncBatchRequests(compressBatch, f); // // Inform the callback that we have finished initiating all of the diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java b/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java index 13b2d7a1dcc..4634a5ca4ec 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java @@ -344,8 +344,7 @@ public final class ConnectionI extends com.zeroc.IceInternal.EventHandler } synchronized public int - sendAsyncRequest(OutgoingAsyncBase out, boolean compress, boolean response, - int batchRequestNum) + sendAsyncRequest(OutgoingAsyncBase out, boolean compress, boolean response, int batchRequestNum) throws com.zeroc.IceInternal.RetryException { final OutputStream os = out.getOs(); @@ -431,17 +430,17 @@ public final class ConnectionI extends com.zeroc.IceInternal.EventHandler } @Override - public void flushBatchRequests() + public void flushBatchRequests(CompressBatch compressBatch) { - ObjectPrx.waitForResponseForCompletion(flushBatchRequestsAsync()); + ObjectPrx.waitForResponseForCompletion(flushBatchRequestsAsync(compressBatch)); } @Override - public java.util.concurrent.CompletableFuture<Void> flushBatchRequestsAsync() + public java.util.concurrent.CompletableFuture<Void> flushBatchRequestsAsync(CompressBatch compressBatch) { com.zeroc.IceInternal.ConnectionFlushBatch f = new com.zeroc.IceInternal.ConnectionFlushBatch(this, _communicator, _instance); - f.invoke(); + f.invoke(compressBatch); return f; } diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/ObjectAdapterI.java b/java/src/Ice/src/main/java/com/zeroc/Ice/ObjectAdapterI.java index de83357a228..2149b60a207 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/ObjectAdapterI.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/ObjectAdapterI.java @@ -746,7 +746,8 @@ public final class ObjectAdapterI implements ObjectAdapter } public void - flushAsyncBatchRequests(com.zeroc.IceInternal.CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(com.zeroc.Ice.CompressBatch compressBatch, + com.zeroc.IceInternal.CommunicatorFlushBatch outAsync) { List<IncomingConnectionFactory> f; synchronized(this) @@ -755,7 +756,7 @@ public final class ObjectAdapterI implements ObjectAdapter } for(IncomingConnectionFactory p : f) { - p.flushAsyncBatchRequests(outAsync); + p.flushAsyncBatchRequests(compressBatch, outAsync); } } diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/BatchRequestQueue.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/BatchRequestQueue.java index 3bb4d920c16..ed776c67636 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/BatchRequestQueue.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/BatchRequestQueue.java @@ -23,7 +23,7 @@ public class BatchRequestQueue @Override public void enqueue() { - enqueueBatchRequest(); + enqueueBatchRequest(_proxy); } @Override @@ -59,6 +59,7 @@ public class BatchRequestQueue _batchStream = new com.zeroc.Ice.OutputStream(instance, Protocol.currentProtocolEncoding); _batchStream.writeBlob(Protocol.requestBatchHdr); _batchMarker = _batchStream.size(); + _batchCompress = false; _request = new BatchRequestI(); _maxSize = instance.batchAutoFlushSize(); @@ -112,6 +113,11 @@ public class BatchRequestQueue } else { + Boolean compress = proxy._getReference().getCompressOverride(); + if(compress != null) + { + _batchCompress |= compress.booleanValue(); + } _batchMarker = _batchStream.size(); ++_batchRequestNum; } @@ -140,12 +146,18 @@ public class BatchRequestQueue } } - synchronized public int + public class SwapResult + { + public int batchRequestNum; + public boolean compress; + }; + + synchronized public SwapResult swap(com.zeroc.Ice.OutputStream os) { if(_batchRequestNum == 0) { - return 0; + return null; } waitStreamInUse(true); @@ -160,20 +172,23 @@ public class BatchRequestQueue _batchStream.resize(_batchMarker); } - int requestNum = _batchRequestNum; + SwapResult result = new SwapResult(); + result.batchRequestNum = _batchRequestNum; + result.compress = _batchCompress; _batchStream.swap(os); // // Reset the batch. // _batchRequestNum = 0; + _batchCompress = false; _batchStream.writeBlob(Protocol.requestBatchHdr); _batchMarker = _batchStream.size(); if(lastRequest != null) { _batchStream.writeBlob(lastRequest); } - return requestNum; + return result; } synchronized public void @@ -218,9 +233,14 @@ public class BatchRequestQueue } } - private void enqueueBatchRequest() + private void enqueueBatchRequest(com.zeroc.Ice.ObjectPrx proxy) { assert(_batchMarker < _batchStream.size()); + Boolean compress = proxy._getReference().getCompressOverride(); + if(compress != null) + { + _batchCompress |= compress.booleanValue(); + } _batchMarker = _batchStream.size(); ++_batchRequestNum; } @@ -231,6 +251,7 @@ public class BatchRequestQueue private boolean _batchStreamCanFlush; private int _batchRequestNum; private int _batchMarker; + private boolean _batchCompress; private BatchRequestI _request; private com.zeroc.Ice.LocalException _exception; private int _maxSize; diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/CommunicatorFlushBatch.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/CommunicatorFlushBatch.java index a7c4dc65b38..c6193bb48aa 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/CommunicatorFlushBatch.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/CommunicatorFlushBatch.java @@ -36,7 +36,7 @@ public class CommunicatorFlushBatch extends InvocationFutureI<Void> complete(null); } - public void flushConnection(final com.zeroc.Ice.ConnectionI con) + public void flushConnection(final com.zeroc.Ice.ConnectionI con, final com.zeroc.Ice.CompressBatch compressBatch) { class FlushBatch extends OutgoingAsyncBaseI<Void> { @@ -106,8 +106,8 @@ public class CommunicatorFlushBatch extends InvocationFutureI<Void> try { final FlushBatch flushBatch = new FlushBatch(); - final int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs()); - if(batchRequestNum == 0) + final BatchRequestQueue.SwapResult r = con.getBatchRequestQueue().swap(flushBatch.getOs()); + if(r == null) { flushBatch.sent(); } @@ -118,14 +118,40 @@ public class CommunicatorFlushBatch extends InvocationFutureI<Void> @Override public Void call() throws RetryException { - con.sendAsyncRequest(flushBatch, false, false, batchRequestNum); + boolean comp = false; + if(compressBatch == com.zeroc.Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == com.zeroc.Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = r.compress; + } + con.sendAsyncRequest(flushBatch, comp, false, r.batchRequestNum); return null; } }); } else { - con.sendAsyncRequest(flushBatch, false, false, batchRequestNum); + boolean comp = false; + if(compressBatch == com.zeroc.Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == com.zeroc.Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = r.compress; + } + con.sendAsyncRequest(flushBatch, comp, false, r.batchRequestNum); } } catch(RetryException ex) diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ConnectionFlushBatch.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ConnectionFlushBatch.java index 2d4a35a0f82..588905e9ba5 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ConnectionFlushBatch.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ConnectionFlushBatch.java @@ -45,14 +45,13 @@ public class ConnectionFlushBatch extends OutgoingAsyncBaseI<Void> super.markCompleted(); } - public void invoke() + public void invoke(com.zeroc.Ice.CompressBatch compressBatch) { try { - final int batchRequestNum = _connection.getBatchRequestQueue().swap(_os); - + final BatchRequestQueue.SwapResult r = _connection.getBatchRequestQueue().swap(_os); int status; - if(batchRequestNum == 0) + if(r == null) { status = AsyncStatus.Sent; if(sent()) @@ -68,13 +67,39 @@ public class ConnectionFlushBatch extends OutgoingAsyncBaseI<Void> public Integer call() throws RetryException { - return _connection.sendAsyncRequest(ConnectionFlushBatch.this, false, false, batchRequestNum); + boolean comp = false; + if(compressBatch == com.zeroc.Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == com.zeroc.Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = r.compress; + } + return _connection.sendAsyncRequest(ConnectionFlushBatch.this, comp, false, r.batchRequestNum); } }); } else { - status = _connection.sendAsyncRequest(this, false, false, batchRequestNum); + boolean comp = false; + if(compressBatch == com.zeroc.Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == com.zeroc.Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = r.compress; + } + status = _connection.sendAsyncRequest(this, comp, false, r.batchRequestNum); } if((status & AsyncStatus.Sent) > 0) diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/FixedReference.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/FixedReference.java index cfa3d1e871b..a2a3de32109 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/FixedReference.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/FixedReference.java @@ -251,7 +251,7 @@ public class FixedReference extends Reference _fixedConnection.throwException(); // Throw in case our connection is already destroyed. - boolean compress; + boolean compress = false; if(defaultsAndOverrides.overrideCompress) { compress = defaultsAndOverrides.overrideCompressValue; @@ -260,10 +260,6 @@ public class FixedReference extends Reference { compress = _compress; } - else - { - compress = _fixedConnection.endpoint().compress(); - } RequestHandler handler = new ConnectionRequestHandler(this, _fixedConnection, compress); if(getInstance().queueRequests()) diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/IncomingConnectionFactory.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/IncomingConnectionFactory.java index 98dd813fb66..2aacbaebdc3 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/IncomingConnectionFactory.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/IncomingConnectionFactory.java @@ -192,13 +192,13 @@ public final class IncomingConnectionFactory extends EventHandler implements Con } public void - flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(com.zeroc.Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync) { for(ConnectionI c : connections()) // connections() is synchronized, no need to synchronize here. { try { - outAsync.flushConnection(c); + outAsync.flushConnection(c, compressBatch); } catch(com.zeroc.Ice.LocalException ex) { diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ObjectAdapterFactory.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ObjectAdapterFactory.java index 94d1417a663..ca27509d5cc 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ObjectAdapterFactory.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ObjectAdapterFactory.java @@ -221,7 +221,7 @@ public final class ObjectAdapterFactory } public void - flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(com.zeroc.Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync) { java.util.List<ObjectAdapterI> adapters; synchronized(this) @@ -231,7 +231,7 @@ public final class ObjectAdapterFactory for(ObjectAdapterI adapter : adapters) { - adapter.flushAsyncBatchRequests(outAsync); + adapter.flushAsyncBatchRequests(compressBatch, outAsync); } } diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/OutgoingConnectionFactory.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/OutgoingConnectionFactory.java index 9bd778ecfb4..99f29b8cbd1 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/OutgoingConnectionFactory.java @@ -277,7 +277,7 @@ public final class OutgoingConnectionFactory } public void - flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(com.zeroc.Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync) { java.util.List<ConnectionI> c = new java.util.LinkedList<>(); @@ -302,7 +302,7 @@ public final class OutgoingConnectionFactory { try { - outAsync.flushConnection(conn); + outAsync.flushConnection(conn, compressBatch); } catch(LocalException ex) { diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ProxyFlushBatch.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ProxyFlushBatch.java index 527a26658bd..870b6be3099 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ProxyFlushBatch.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ProxyFlushBatch.java @@ -15,7 +15,8 @@ public class ProxyFlushBatch extends ProxyOutgoingAsyncBaseI<Void> { super(prx, "ice_flushBatchRequests"); _observer = ObserverHelper.get(prx, "ice_flushBatchRequests"); - _batchRequestNum = prx._getBatchRequestQueue().swap(_os); + BatchRequestQueue.SwapResult r = prx._getBatchRequestQueue().swap(_os); + _batchRequestNum = r != null ? r.batchRequestNum : 0; } @Override diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Reference.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Reference.java index 51cfc65c06e..43a9befc735 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Reference.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Reference.java @@ -247,6 +247,21 @@ public abstract class Reference implements Cloneable return _hashValue; } + public java.lang.Boolean + getCompressOverride() + { + DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCompress) + { + return Boolean.valueOf(defaultsAndOverrides.overrideCompressValue); + } + else if(_overrideCompress) + { + return Boolean.valueOf(_compress); + } + return null; // Null indicates that compress is not overriden. + } + // // Utility methods // |