diff options
author | Benoit Foucher <benoit@zeroc.com> | 2017-02-06 11:17:34 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2017-02-06 11:17:34 +0100 |
commit | 18ab8207bd14def950fd399c60d9ee54fab75d3b (patch) | |
tree | a82af333127184acc6be6e0969919cb20be5e8b3 /java-compat/src | |
parent | Fixed ICE-7548 - getAdminProxy no longer returns 0 if synchronization is in p... (diff) | |
download | ice-18ab8207bd14def950fd399c60d9ee54fab75d3b.tar.bz2 ice-18ab8207bd14def950fd399c60d9ee54fab75d3b.tar.xz ice-18ab8207bd14def950fd399c60d9ee54fab75d3b.zip |
Fixed ICE-7169 and ICE-7375 - add option to specify if batch requests flushed with the communicator/connection should be compressed
Diffstat (limited to 'java-compat/src')
12 files changed, 156 insertions, 70 deletions
diff --git a/java-compat/src/Ice/src/main/java/Ice/CommunicatorI.java b/java-compat/src/Ice/src/main/java/Ice/CommunicatorI.java index e4eea1ecf6d..6eda6d0aeaf 100644 --- a/java-compat/src/Ice/src/main/java/Ice/CommunicatorI.java +++ b/java-compat/src/Ice/src/main/java/Ice/CommunicatorI.java @@ -212,39 +212,40 @@ public final class CommunicatorI implements Communicator @Override public void - flushBatchRequests() + flushBatchRequests(Ice.CompressBatch compressBatch) { - end_flushBatchRequests(begin_flushBatchRequests()); + end_flushBatchRequests(begin_flushBatchRequests(compressBatch)); } @Override public AsyncResult - begin_flushBatchRequests() + begin_flushBatchRequests(Ice.CompressBatch compressBatch) { - return begin_flushBatchRequestsInternal(null); + return begin_flushBatchRequestsInternal(compressBatch, null); } @Override public AsyncResult - begin_flushBatchRequests(Callback cb) + begin_flushBatchRequests(Ice.CompressBatch compressBatch, Callback cb) { - return begin_flushBatchRequestsInternal(cb); + return begin_flushBatchRequestsInternal(compressBatch, cb); } @Override public AsyncResult - begin_flushBatchRequests(Callback_Communicator_flushBatchRequests cb) + begin_flushBatchRequests(Ice.CompressBatch compressBatch, Callback_Communicator_flushBatchRequests cb) { - return begin_flushBatchRequestsInternal(cb); + return begin_flushBatchRequestsInternal(compressBatch, cb); } @Override public AsyncResult - begin_flushBatchRequests(IceInternal.Functional_VoidCallback responseCb, + begin_flushBatchRequests(Ice.CompressBatch compressBatch, + IceInternal.Functional_VoidCallback responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb, IceInternal.Functional_BoolCallback sentCb) { - return begin_flushBatchRequestsInternal( + return begin_flushBatchRequestsInternal(compressBatch, new IceInternal.Functional_CallbackBase(false, exceptionCb, sentCb) { @Override @@ -265,7 +266,7 @@ public final class CommunicatorI implements Communicator private static final String _flushBatchRequests_name = "flushBatchRequests"; private Ice.AsyncResult - begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb) + begin_flushBatchRequestsInternal(Ice.CompressBatch compressBatch, IceInternal.CallbackBase cb) { IceInternal.OutgoingConnectionFactory connectionFactory = _instance.outgoingConnectionFactory(); IceInternal.ObjectAdapterFactory adapterFactory = _instance.objectAdapterFactory(); @@ -279,8 +280,8 @@ public final class CommunicatorI implements Communicator _flushBatchRequests_name, cb); - connectionFactory.flushAsyncBatchRequests(result); - adapterFactory.flushAsyncBatchRequests(result); + connectionFactory.flushAsyncBatchRequests(compressBatch, result); + adapterFactory.flushAsyncBatchRequests(compressBatch, result); // // Inform the callback that we have finished initiating all of the diff --git a/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java b/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java index 84131a38cfc..0130c676dd5 100644 --- a/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java +++ b/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java @@ -420,58 +420,62 @@ public final class ConnectionI extends IceInternal.EventHandler } @Override - public void flushBatchRequests() + public void flushBatchRequests(Ice.CompressBatch compressBatch) { - end_flushBatchRequests(begin_flushBatchRequests()); + end_flushBatchRequests(begin_flushBatchRequests(compressBatch)); } private static final String _flushBatchRequests_name = "flushBatchRequests"; @Override - public Ice.AsyncResult begin_flushBatchRequests() + public Ice.AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch) { - return begin_flushBatchRequestsInternal(null); + return begin_flushBatchRequestsInternal(compressBatch, null); } @Override - public Ice.AsyncResult begin_flushBatchRequests(Callback cb) + public Ice.AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch, Callback cb) { - return begin_flushBatchRequestsInternal(cb); + return begin_flushBatchRequestsInternal(compressBatch, cb); } @Override - public Ice.AsyncResult begin_flushBatchRequests(Callback_Connection_flushBatchRequests cb) + public Ice.AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch, + Callback_Connection_flushBatchRequests cb) { - return begin_flushBatchRequestsInternal(cb); + return begin_flushBatchRequestsInternal(compressBatch, cb); } @Override - public AsyncResult begin_flushBatchRequests(IceInternal.Functional_VoidCallback responseCb, - IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb, - IceInternal.Functional_BoolCallback sentCb) - { - return begin_flushBatchRequestsInternal(new IceInternal.Functional_CallbackBase(false, exceptionCb, sentCb) - { - @Override - public final void _iceCompleted(AsyncResult result) - { - try - { - result.getConnection().end_flushBatchRequests(result); - } - catch(Exception ex) - { - _exceptionCb.apply(ex); - } - } - }); - } - - private Ice.AsyncResult begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb) + public AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch, + IceInternal.Functional_VoidCallback responseCb, + IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb, + IceInternal.Functional_BoolCallback sentCb) + { + return begin_flushBatchRequestsInternal(compressBatch, + new IceInternal.Functional_CallbackBase(false, exceptionCb, sentCb) + { + @Override + public final void _iceCompleted(AsyncResult result) + { + try + { + result.getConnection().end_flushBatchRequests(result); + } + catch(Exception ex) + { + _exceptionCb.apply(ex); + } + } + }); + } + + private Ice.AsyncResult begin_flushBatchRequestsInternal(Ice.CompressBatch compressBatch, + IceInternal.CallbackBase cb) { IceInternal.ConnectionFlushBatch result = new IceInternal.ConnectionFlushBatch(this, _communicator, _instance, _flushBatchRequests_name, cb); - result.invoke(); + result.invoke(compressBatch); return result; } diff --git a/java-compat/src/Ice/src/main/java/Ice/ObjectAdapterI.java b/java-compat/src/Ice/src/main/java/Ice/ObjectAdapterI.java index 9ab45240784..4b4029abbd2 100644 --- a/java-compat/src/Ice/src/main/java/Ice/ObjectAdapterI.java +++ b/java-compat/src/Ice/src/main/java/Ice/ObjectAdapterI.java @@ -746,7 +746,7 @@ public final class ObjectAdapterI implements ObjectAdapter } public void - flushAsyncBatchRequests(IceInternal.CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(Ice.CompressBatch compressBatch, IceInternal.CommunicatorFlushBatch outAsync) { List<IncomingConnectionFactory> f; synchronized(this) @@ -755,7 +755,7 @@ public final class ObjectAdapterI implements ObjectAdapter } for(IncomingConnectionFactory p : f) { - p.flushAsyncBatchRequests(outAsync); + p.flushAsyncBatchRequests(compressBatch, outAsync); } } diff --git a/java-compat/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java b/java-compat/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java index 3db2ac67177..6d7f2030ac0 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java @@ -23,7 +23,7 @@ public class BatchRequestQueue @Override public void enqueue() { - enqueueBatchRequest(); + enqueueBatchRequest(_proxy); } @Override @@ -59,6 +59,7 @@ public class BatchRequestQueue _batchStream = new Ice.OutputStream(instance, Protocol.currentProtocolEncoding); _batchStream.writeBlob(Protocol.requestBatchHdr); _batchMarker = _batchStream.size(); + _batchCompress = false; _request = new BatchRequestI(); _maxSize = instance.batchAutoFlushSize(); @@ -112,6 +113,11 @@ public class BatchRequestQueue } else { + Boolean compress = ((Ice.ObjectPrxHelperBase)proxy)._getReference().getCompressOverride(); + if(compress != null) + { + _batchCompress |= compress.booleanValue(); + } _batchMarker = _batchStream.size(); ++_batchRequestNum; } @@ -141,7 +147,7 @@ public class BatchRequestQueue } synchronized public int - swap(Ice.OutputStream os) + swap(Ice.OutputStream os, Ice.BooleanHolder compress) { if(_batchRequestNum == 0) { @@ -161,12 +167,17 @@ public class BatchRequestQueue } int requestNum = _batchRequestNum; + if(compress != null) + { + compress.value = _batchCompress; + } _batchStream.swap(os); // // Reset the batch. // _batchRequestNum = 0; + _batchCompress = false; _batchStream.writeBlob(Protocol.requestBatchHdr); _batchMarker = _batchStream.size(); if(lastRequest != null) @@ -218,9 +229,14 @@ public class BatchRequestQueue } } - private void enqueueBatchRequest() + private void enqueueBatchRequest(Ice.ObjectPrx proxy) { assert(_batchMarker < _batchStream.size()); + Boolean compress = ((Ice.ObjectPrxHelperBase)proxy)._getReference().getCompressOverride(); + if(compress != null) + { + _batchCompress |= compress.booleanValue(); + } _batchMarker = _batchStream.size(); ++_batchRequestNum; } @@ -231,6 +247,7 @@ public class BatchRequestQueue private boolean _batchStreamCanFlush; private int _batchRequestNum; private int _batchMarker; + private boolean _batchCompress; private BatchRequestI _request; private Ice.LocalException _exception; private int _maxSize; diff --git a/java-compat/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java b/java-compat/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java index 309c9dd6a55..d6da14b8f24 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java @@ -43,7 +43,7 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI _useCount = 1; } - public void flushConnection(final Ice.ConnectionI con) + public void flushConnection(final Ice.ConnectionI con, final Ice.CompressBatch compressBatch) { class FlushBatch extends OutgoingAsyncBase { @@ -96,7 +96,8 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI try { final FlushBatch flushBatch = new FlushBatch(); - final int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs()); + final Ice.BooleanHolder compress = new Ice.BooleanHolder(); + final int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs(), compress); if(batchRequestNum == 0) { flushBatch.sent(); @@ -108,14 +109,40 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI @Override public Void call() throws RetryException { - con.sendAsyncRequest(flushBatch, false, false, batchRequestNum); + boolean comp; + if(compressBatch == Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = compress.value; + } + con.sendAsyncRequest(flushBatch, comp, false, batchRequestNum); return null; } }); } else { - con.sendAsyncRequest(flushBatch, false, false, batchRequestNum); + boolean comp; + if(compressBatch == Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = compress.value; + } + con.sendAsyncRequest(flushBatch, comp, false, batchRequestNum); } } catch(RetryException ex) diff --git a/java-compat/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java b/java-compat/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java index 7824d3154c1..e1839020fa6 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java @@ -42,12 +42,12 @@ public class ConnectionFlushBatch extends OutgoingAsyncBase return _connection; } - public void invoke() + public void invoke(final Ice.CompressBatch compressBatch) { try { - final int batchRequestNum = _connection.getBatchRequestQueue().swap(_os); - + final Ice.BooleanHolder compress = new Ice.BooleanHolder(); + final int batchRequestNum = _connection.getBatchRequestQueue().swap(_os, compress); int status; if(batchRequestNum == 0) { @@ -64,13 +64,39 @@ public class ConnectionFlushBatch extends OutgoingAsyncBase @Override public Integer call() throws RetryException { - return _connection.sendAsyncRequest(ConnectionFlushBatch.this, false, false, batchRequestNum); + boolean comp; + if(compressBatch == Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = compress.value; + } + return _connection.sendAsyncRequest(ConnectionFlushBatch.this, comp, false, batchRequestNum); } }); } else { - status = _connection.sendAsyncRequest(this, false, false, batchRequestNum); + boolean comp; + if(compressBatch == Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = compress.value; + } + status = _connection.sendAsyncRequest(this, comp, false, batchRequestNum); } if((status & AsyncStatus.Sent) > 0) diff --git a/java-compat/src/Ice/src/main/java/IceInternal/FixedReference.java b/java-compat/src/Ice/src/main/java/IceInternal/FixedReference.java index ebc59c5e690..b9e84ce5d0e 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/FixedReference.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/FixedReference.java @@ -250,7 +250,7 @@ public class FixedReference extends Reference _fixedConnection.throwException(); // Throw in case our connection is already destroyed. - boolean compress; + boolean compress = false; if(defaultsAndOverrides.overrideCompress) { compress = defaultsAndOverrides.overrideCompressValue; @@ -259,10 +259,6 @@ public class FixedReference extends Reference { compress = _compress; } - else - { - compress = _fixedConnection.endpoint().compress(); - } RequestHandler handler = new ConnectionRequestHandler(this, _fixedConnection, compress); if(getInstance().queueRequests()) diff --git a/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java b/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java index 108df4607e0..60514dfd230 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java @@ -196,13 +196,13 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice } public void - flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync) { for(Ice.ConnectionI c : connections()) // connections() is synchronized, no need to synchronize here. { try { - outAsync.flushConnection(c); + outAsync.flushConnection(c, compressBatch); } catch(Ice.LocalException ex) { diff --git a/java-compat/src/Ice/src/main/java/IceInternal/ObjectAdapterFactory.java b/java-compat/src/Ice/src/main/java/IceInternal/ObjectAdapterFactory.java index bfec67de6a6..c44a0b2ef40 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/ObjectAdapterFactory.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/ObjectAdapterFactory.java @@ -218,7 +218,7 @@ public final class ObjectAdapterFactory } public void - flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync) { java.util.List<Ice.ObjectAdapterI> adapters; synchronized(this) @@ -228,7 +228,7 @@ public final class ObjectAdapterFactory for(Ice.ObjectAdapterI adapter : adapters) { - adapter.flushAsyncBatchRequests(outAsync); + adapter.flushAsyncBatchRequests(compressBatch, outAsync); } } diff --git a/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java b/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java index b51856cf3e6..3102f3a500f 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java @@ -271,7 +271,7 @@ public final class OutgoingConnectionFactory } public void - flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync) { java.util.List<Ice.ConnectionI> c = new java.util.LinkedList<Ice.ConnectionI>(); @@ -296,7 +296,7 @@ public final class OutgoingConnectionFactory { try { - outAsync.flushConnection(conn); + outAsync.flushConnection(conn, compressBatch); } catch(Ice.LocalException ex) { diff --git a/java-compat/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java b/java-compat/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java index 7a53d1b2c24..c83f46d964a 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java @@ -28,7 +28,7 @@ public class ProxyFlushBatch extends ProxyOutgoingAsyncBase { super(prx, operation, callback); _observer = ObserverHelper.get(prx, operation); - _batchRequestNum = prx._getBatchRequestQueue().swap(_os); + _batchRequestNum = prx._getBatchRequestQueue().swap(_os, null); } @Override diff --git a/java-compat/src/Ice/src/main/java/IceInternal/Reference.java b/java-compat/src/Ice/src/main/java/IceInternal/Reference.java index a1869b1fcda..c3b0e1b8d76 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/Reference.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/Reference.java @@ -247,6 +247,21 @@ public abstract class Reference implements Cloneable return _hashValue; } + public java.lang.Boolean + getCompressOverride() + { + DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCompress) + { + return Boolean.valueOf(defaultsAndOverrides.overrideCompressValue); + } + else if(_overrideCompress) + { + return Boolean.valueOf(_compress); + } + return null; // Null indicates that compress is not overriden. + } + // // Utility methods // |