diff options
author | Benoit Foucher <benoit@zeroc.com> | 2017-02-06 11:17:34 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2017-02-06 11:17:34 +0100 |
commit | 18ab8207bd14def950fd399c60d9ee54fab75d3b (patch) | |
tree | a82af333127184acc6be6e0969919cb20be5e8b3 /csharp/src | |
parent | Fixed ICE-7548 - getAdminProxy no longer returns 0 if synchronization is in p... (diff) | |
download | ice-18ab8207bd14def950fd399c60d9ee54fab75d3b.tar.bz2 ice-18ab8207bd14def950fd399c60d9ee54fab75d3b.tar.xz ice-18ab8207bd14def950fd399c60d9ee54fab75d3b.zip |
Fixed ICE-7169 and ICE-7375 - add option to specify if batch requests flushed with the communicator/connection should be compressed
Diffstat (limited to 'csharp/src')
-rw-r--r-- | csharp/src/Ice/BatchRequestQueue.cs | 20 | ||||
-rw-r--r-- | csharp/src/Ice/CommunicatorI.cs | 23 | ||||
-rw-r--r-- | csharp/src/Ice/ConnectionFactory.cs | 8 | ||||
-rw-r--r-- | csharp/src/Ice/ConnectionI.cs | 15 | ||||
-rw-r--r-- | csharp/src/Ice/ObjectAdapterFactory.cs | 32 | ||||
-rw-r--r-- | csharp/src/Ice/ObjectAdapterI.cs | 4 | ||||
-rw-r--r-- | csharp/src/Ice/OutgoingAsync.cs | 49 | ||||
-rw-r--r-- | csharp/src/Ice/Reference.cs | 25 |
8 files changed, 121 insertions, 55 deletions
diff --git a/csharp/src/Ice/BatchRequestQueue.cs b/csharp/src/Ice/BatchRequestQueue.cs index 5507e879933..a2d3ee13857 100644 --- a/csharp/src/Ice/BatchRequestQueue.cs +++ b/csharp/src/Ice/BatchRequestQueue.cs @@ -27,7 +27,7 @@ namespace IceInternal public void enqueue() { - _queue.enqueueBatchRequest(); + _queue.enqueueBatchRequest(_proxy); } public Ice.ObjectPrx getProxy() @@ -118,6 +118,11 @@ namespace IceInternal } else { + bool compress; + if(((Ice.ObjectPrxHelperBase)proxy).iceReference().getCompressOverride(out compress)) + { + _batchCompress |= compress; + } _batchMarker = _batchStream.size(); ++_batchRequestNum; } @@ -150,12 +155,13 @@ namespace IceInternal } public int - swap(Ice.OutputStream os) + swap(Ice.OutputStream os, out bool compress) { lock(this) { if(_batchRequestNum == 0) { + compress = false; return 0; } @@ -172,12 +178,14 @@ namespace IceInternal } int requestNum = _batchRequestNum; + compress = _batchCompress; _batchStream.swap(os); // // Reset the batch. // _batchRequestNum = 0; + _batchCompress = false; _batchStream.writeBlob(Protocol.requestBatchHdr); _batchMarker = _batchStream.size(); if(lastRequest != null) @@ -221,9 +229,14 @@ namespace IceInternal } } - internal void enqueueBatchRequest() + internal void enqueueBatchRequest(Ice.ObjectPrx proxy) { Debug.Assert(_batchMarker < _batchStream.size()); + bool compress; + if(((Ice.ObjectPrxHelperBase)proxy).iceReference().getCompressOverride(out compress)) + { + _batchCompress |= compress; + } _batchMarker = _batchStream.size(); ++_batchRequestNum; } @@ -234,6 +247,7 @@ namespace IceInternal private bool _batchStreamCanFlush; private int _batchRequestNum; private int _batchMarker; + private bool _batchCompress; private BatchRequestI _request; private Ice.LocalException _exception; private int _maxSize; diff --git a/csharp/src/Ice/CommunicatorI.cs b/csharp/src/Ice/CommunicatorI.cs index 73a9d0b0dbb..1528a116a06 100644 --- a/csharp/src/Ice/CommunicatorI.cs +++ b/csharp/src/Ice/CommunicatorI.cs @@ -163,23 +163,24 @@ namespace Ice return _instance.pluginManager(); } - public void flushBatchRequests() + public void flushBatchRequests(Ice.CompressBatch compressBatch) { - flushBatchRequestsAsync().Wait(); + flushBatchRequestsAsync(compressBatch).Wait(); } - public Task flushBatchRequestsAsync(IProgress<bool> progress = null, + public Task flushBatchRequestsAsync(Ice.CompressBatch compressBatch, + IProgress<bool> progress = null, CancellationToken cancel = new CancellationToken()) { var completed = new FlushBatchTaskCompletionCallback(progress, cancel); var outgoing = new CommunicatorFlushBatchAsync(_instance, completed); - outgoing.invoke(_flushBatchRequests_name); + outgoing.invoke(_flushBatchRequests_name, compressBatch); return completed.Task; } - public AsyncResult begin_flushBatchRequests() + public AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch) { - return begin_flushBatchRequests(null, null); + return begin_flushBatchRequests(compressBatch, null, null); } private const string _flushBatchRequests_name = "flushBatchRequests"; @@ -214,11 +215,15 @@ namespace Ice } }; - public AsyncResult begin_flushBatchRequests(AsyncCallback cb, object cookie) + public AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch, AsyncCallback cb, object cookie) { - var result = new CommunicatorFlushBatchCompletionCallback(this, _instance, _flushBatchRequests_name, cookie, cb); + var result = new CommunicatorFlushBatchCompletionCallback(this, + _instance, + _flushBatchRequests_name, + cookie, + cb); var outgoing = new CommunicatorFlushBatchAsync(_instance, result); - outgoing.invoke(_flushBatchRequests_name); + outgoing.invoke(_flushBatchRequests_name, compressBatch); return result; } diff --git a/csharp/src/Ice/ConnectionFactory.cs b/csharp/src/Ice/ConnectionFactory.cs index 936f2b3da57..51e9c992e1e 100644 --- a/csharp/src/Ice/ConnectionFactory.cs +++ b/csharp/src/Ice/ConnectionFactory.cs @@ -255,7 +255,7 @@ namespace IceInternal } } - public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync) + public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync) { ICollection<Ice.ConnectionI> c = new List<Ice.ConnectionI>(); @@ -280,7 +280,7 @@ namespace IceInternal { try { - outAsync.flushConnection(conn); + outAsync.flushConnection(conn, compressBatch); } catch(Ice.LocalException) { @@ -1292,7 +1292,7 @@ namespace IceInternal } } - public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync) + public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync) { // // connections() is synchronized, no need to synchronize here. @@ -1301,7 +1301,7 @@ namespace IceInternal { try { - outAsync.flushConnection(connection); + outAsync.flushConnection(connection, compressBatch); } catch(Ice.LocalException) { diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs index 63a5148742a..a1cbdde8fcf 100644 --- a/csharp/src/Ice/ConnectionI.cs +++ b/csharp/src/Ice/ConnectionI.cs @@ -473,9 +473,9 @@ namespace Ice return _batchRequestQueue; } - public void flushBatchRequests() + public void flushBatchRequests(CompressBatch compressBatch) { - flushBatchRequestsAsync().Wait(); + flushBatchRequestsAsync(compressBatch).Wait(); } private class ConnectionFlushBatchCompletionCallback : AsyncResultCompletionCallback @@ -517,21 +517,24 @@ namespace Ice private Connection _connection; } - public Task flushBatchRequestsAsync(IProgress<bool> progress = null, + public Task flushBatchRequestsAsync(CompressBatch compressBatch, + IProgress<bool> progress = null, CancellationToken cancel = new CancellationToken()) { var completed = new FlushBatchTaskCompletionCallback(progress, cancel); var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed); - outgoing.invoke(_flushBatchRequests_name); + outgoing.invoke(_flushBatchRequests_name, compressBatch); return completed.Task; } - public AsyncResult begin_flushBatchRequests(AsyncCallback cb = null, object cookie = null) + public AsyncResult begin_flushBatchRequests(CompressBatch compressBatch, + AsyncCallback cb = null, + object cookie = null) { var result = new ConnectionFlushBatchCompletionCallback(this, _communicator, _instance, _flushBatchRequests_name, cookie, cb); var outgoing = new ConnectionFlushBatchAsync(this, _instance, result); - outgoing.invoke(_flushBatchRequests_name); + outgoing.invoke(_flushBatchRequests_name, compressBatch); return result; } diff --git a/csharp/src/Ice/ObjectAdapterFactory.cs b/csharp/src/Ice/ObjectAdapterFactory.cs index 17ccfa91b1c..0abdea67a62 100644 --- a/csharp/src/Ice/ObjectAdapterFactory.cs +++ b/csharp/src/Ice/ObjectAdapterFactory.cs @@ -28,10 +28,10 @@ namespace IceInternal } adapters = new List<Ice.ObjectAdapterI>(_adapters); - + _instance = null; _communicator = null; - + System.Threading.Monitor.PulseAll(this); } @@ -44,7 +44,7 @@ namespace IceInternal adapter.deactivate(); } } - + public void waitForShutdown() { List<Ice.ObjectAdapterI> adapters; @@ -57,7 +57,7 @@ namespace IceInternal { System.Threading.Monitor.Wait(this); } - + adapters = new List<Ice.ObjectAdapterI>(_adapters); } @@ -110,13 +110,13 @@ namespace IceInternal { adapters = new List<Ice.ObjectAdapterI>(_adapters); } - + foreach(Ice.ObjectAdapterI adapter in adapters) { adapter.updateConnectionObservers(); } } - + public void updateThreadObservers() { @@ -125,13 +125,13 @@ namespace IceInternal { adapters = new List<Ice.ObjectAdapterI>(_adapters); } - + foreach(Ice.ObjectAdapterI adapter in adapters) { adapter.updateThreadObservers(); } } - + public Ice.ObjectAdapter createObjectAdapter(string name, Ice.RouterPrx router) { lock(this) @@ -140,7 +140,7 @@ namespace IceInternal { throw new Ice.CommunicatorDestroyedException(); } - + Ice.ObjectAdapterI adapter = null; if(name.Length == 0) { @@ -163,7 +163,7 @@ namespace IceInternal return adapter; } } - + public Ice.ObjectAdapter findObjectAdapter(Ice.ObjectPrx proxy) { List<Ice.ObjectAdapterI> adapters; @@ -173,10 +173,10 @@ namespace IceInternal { return null; } - + adapters = new List<Ice.ObjectAdapterI>(_adapters); } - + foreach(Ice.ObjectAdapterI adapter in adapters) { try @@ -209,7 +209,7 @@ namespace IceInternal } } - public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync) + public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync) { List<Ice.ObjectAdapterI> adapters; lock(this) @@ -219,10 +219,10 @@ namespace IceInternal foreach(Ice.ObjectAdapterI adapter in adapters) { - adapter.flushAsyncBatchRequests(outAsync); + adapter.flushAsyncBatchRequests(compressBatch, outAsync); } } - + // // Only for use by Instance. // @@ -233,7 +233,7 @@ namespace IceInternal _adapterNamesInUse = new HashSet<string>(); _adapters = new List<Ice.ObjectAdapterI>(); } - + private Instance _instance; private Ice.Communicator _communicator; private HashSet<string> _adapterNamesInUse; diff --git a/csharp/src/Ice/ObjectAdapterI.cs b/csharp/src/Ice/ObjectAdapterI.cs index 5929b46fd3c..01892c30401 100644 --- a/csharp/src/Ice/ObjectAdapterI.cs +++ b/csharp/src/Ice/ObjectAdapterI.cs @@ -688,7 +688,7 @@ namespace Ice } } - public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync) + public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync) { List<IncomingConnectionFactory> f; lock(this) @@ -698,7 +698,7 @@ namespace Ice foreach(IncomingConnectionFactory factory in f) { - factory.flushAsyncBatchRequests(outAsync); + factory.flushAsyncBatchRequests(compressBatch, outAsync); } } diff --git a/csharp/src/Ice/OutgoingAsync.cs b/csharp/src/Ice/OutgoingAsync.cs index db0e76d2bc7..74828721b6a 100644 --- a/csharp/src/Ice/OutgoingAsync.cs +++ b/csharp/src/Ice/OutgoingAsync.cs @@ -1140,7 +1140,8 @@ namespace IceInternal { Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.iceReference().getProtocol())); observer_ = ObserverHelper.get(proxy_, operation, null); - _batchRequestNum = proxy_.iceGetBatchRequestQueue().swap(os_); + bool compress; // Not used for proxy flush batch requests. + _batchRequestNum = proxy_.iceGetBatchRequestQueue().swap(os_, out compress); invokeImpl(true); // userThread = true } @@ -1198,13 +1199,14 @@ namespace IceInternal _connection = connection; } - public void invoke(string operation) + public void invoke(string operation, Ice.CompressBatch compressBatch) { observer_ = ObserverHelper.get(instance_, operation); try { int status; - int batchRequestNum = _connection.getBatchRequestQueue().swap(os_); + bool compress; + int batchRequestNum = _connection.getBatchRequestQueue().swap(os_, out compress); if(batchRequestNum == 0) { status = AsyncStatusSent; @@ -1215,7 +1217,20 @@ namespace IceInternal } else { - status = _connection.sendAsyncRequest(this, false, false, batchRequestNum); + bool comp; + if(compressBatch == Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = compress; + } + status = _connection.sendAsyncRequest(this, comp, false, batchRequestNum); } if((status & AsyncStatusSent) != 0) @@ -1311,7 +1326,7 @@ namespace IceInternal _useCount = 1; } - public void flushConnection(Ice.ConnectionI con) + public void flushConnection(Ice.ConnectionI con, Ice.CompressBatch compressBatch) { lock(this) { @@ -1321,14 +1336,28 @@ namespace IceInternal try { var flushBatch = new FlushBatch(this, instance_, _observer); - int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs()); + bool compress; + int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs(), out compress); if(batchRequestNum == 0) { flushBatch.sent(); } else { - con.sendAsyncRequest(flushBatch, false, false, batchRequestNum); + bool comp; + if(compressBatch == Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = compress; + } + con.sendAsyncRequest(flushBatch, comp, false, batchRequestNum); } } catch(Ice.LocalException) @@ -1338,15 +1367,15 @@ namespace IceInternal } } - public void invoke(string operation) + public void invoke(string operation, Ice.CompressBatch compressBatch) { _observer = ObserverHelper.get(instance_, operation); if(_observer != null) { _observer.attach(); } - instance_.outgoingConnectionFactory().flushAsyncBatchRequests(this); - instance_.objectAdapterFactory().flushAsyncBatchRequests(this); + instance_.outgoingConnectionFactory().flushAsyncBatchRequests(compressBatch, this); + instance_.objectAdapterFactory().flushAsyncBatchRequests(compressBatch, this); check(true); } diff --git a/csharp/src/Ice/Reference.cs b/csharp/src/Ice/Reference.cs index cfd1019cc74..f786dd5e18e 100644 --- a/csharp/src/Ice/Reference.cs +++ b/csharp/src/Ice/Reference.cs @@ -237,6 +237,25 @@ namespace IceInternal } } + public bool getCompressOverride(out bool compress) + { + DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCompress) + { + compress = defaultsAndOverrides.overrideCompressValue; + } + else if(overrideCompress_) + { + compress = compress_; + } + else + { + compress = false; + return false; + } + return true; + } + public abstract bool isIndirect(); public abstract bool isWellKnown(); @@ -709,7 +728,7 @@ namespace IceInternal _fixedConnection.throwException(); // Throw in case our connection is already destroyed. - bool compress; + bool compress = false; if(defaultsAndOverrides.overrideCompress) { compress = defaultsAndOverrides.overrideCompressValue; @@ -718,10 +737,6 @@ namespace IceInternal { compress = compress_; } - else - { - compress = _fixedConnection.endpoint().compress(); - } return proxy.iceSetRequestHandler(new ConnectionRequestHandler(this, _fixedConnection, compress)); } |