summaryrefslogtreecommitdiff
path: root/csharp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2017-02-06 11:17:34 +0100
committerBenoit Foucher <benoit@zeroc.com>2017-02-06 11:17:34 +0100
commit18ab8207bd14def950fd399c60d9ee54fab75d3b (patch)
treea82af333127184acc6be6e0969919cb20be5e8b3 /csharp/src
parentFixed ICE-7548 - getAdminProxy no longer returns 0 if synchronization is in p... (diff)
downloadice-18ab8207bd14def950fd399c60d9ee54fab75d3b.tar.bz2
ice-18ab8207bd14def950fd399c60d9ee54fab75d3b.tar.xz
ice-18ab8207bd14def950fd399c60d9ee54fab75d3b.zip
Fixed ICE-7169 and ICE-7375 - add option to specify if batch requests flushed with the communicator/connection should be compressed
Diffstat (limited to 'csharp/src')
-rw-r--r--csharp/src/Ice/BatchRequestQueue.cs20
-rw-r--r--csharp/src/Ice/CommunicatorI.cs23
-rw-r--r--csharp/src/Ice/ConnectionFactory.cs8
-rw-r--r--csharp/src/Ice/ConnectionI.cs15
-rw-r--r--csharp/src/Ice/ObjectAdapterFactory.cs32
-rw-r--r--csharp/src/Ice/ObjectAdapterI.cs4
-rw-r--r--csharp/src/Ice/OutgoingAsync.cs49
-rw-r--r--csharp/src/Ice/Reference.cs25
8 files changed, 121 insertions, 55 deletions
diff --git a/csharp/src/Ice/BatchRequestQueue.cs b/csharp/src/Ice/BatchRequestQueue.cs
index 5507e879933..a2d3ee13857 100644
--- a/csharp/src/Ice/BatchRequestQueue.cs
+++ b/csharp/src/Ice/BatchRequestQueue.cs
@@ -27,7 +27,7 @@ namespace IceInternal
public void enqueue()
{
- _queue.enqueueBatchRequest();
+ _queue.enqueueBatchRequest(_proxy);
}
public Ice.ObjectPrx getProxy()
@@ -118,6 +118,11 @@ namespace IceInternal
}
else
{
+ bool compress;
+ if(((Ice.ObjectPrxHelperBase)proxy).iceReference().getCompressOverride(out compress))
+ {
+ _batchCompress |= compress;
+ }
_batchMarker = _batchStream.size();
++_batchRequestNum;
}
@@ -150,12 +155,13 @@ namespace IceInternal
}
public int
- swap(Ice.OutputStream os)
+ swap(Ice.OutputStream os, out bool compress)
{
lock(this)
{
if(_batchRequestNum == 0)
{
+ compress = false;
return 0;
}
@@ -172,12 +178,14 @@ namespace IceInternal
}
int requestNum = _batchRequestNum;
+ compress = _batchCompress;
_batchStream.swap(os);
//
// Reset the batch.
//
_batchRequestNum = 0;
+ _batchCompress = false;
_batchStream.writeBlob(Protocol.requestBatchHdr);
_batchMarker = _batchStream.size();
if(lastRequest != null)
@@ -221,9 +229,14 @@ namespace IceInternal
}
}
- internal void enqueueBatchRequest()
+ internal void enqueueBatchRequest(Ice.ObjectPrx proxy)
{
Debug.Assert(_batchMarker < _batchStream.size());
+ bool compress;
+ if(((Ice.ObjectPrxHelperBase)proxy).iceReference().getCompressOverride(out compress))
+ {
+ _batchCompress |= compress;
+ }
_batchMarker = _batchStream.size();
++_batchRequestNum;
}
@@ -234,6 +247,7 @@ namespace IceInternal
private bool _batchStreamCanFlush;
private int _batchRequestNum;
private int _batchMarker;
+ private bool _batchCompress;
private BatchRequestI _request;
private Ice.LocalException _exception;
private int _maxSize;
diff --git a/csharp/src/Ice/CommunicatorI.cs b/csharp/src/Ice/CommunicatorI.cs
index 73a9d0b0dbb..1528a116a06 100644
--- a/csharp/src/Ice/CommunicatorI.cs
+++ b/csharp/src/Ice/CommunicatorI.cs
@@ -163,23 +163,24 @@ namespace Ice
return _instance.pluginManager();
}
- public void flushBatchRequests()
+ public void flushBatchRequests(Ice.CompressBatch compressBatch)
{
- flushBatchRequestsAsync().Wait();
+ flushBatchRequestsAsync(compressBatch).Wait();
}
- public Task flushBatchRequestsAsync(IProgress<bool> progress = null,
+ public Task flushBatchRequestsAsync(Ice.CompressBatch compressBatch,
+ IProgress<bool> progress = null,
CancellationToken cancel = new CancellationToken())
{
var completed = new FlushBatchTaskCompletionCallback(progress, cancel);
var outgoing = new CommunicatorFlushBatchAsync(_instance, completed);
- outgoing.invoke(_flushBatchRequests_name);
+ outgoing.invoke(_flushBatchRequests_name, compressBatch);
return completed.Task;
}
- public AsyncResult begin_flushBatchRequests()
+ public AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch)
{
- return begin_flushBatchRequests(null, null);
+ return begin_flushBatchRequests(compressBatch, null, null);
}
private const string _flushBatchRequests_name = "flushBatchRequests";
@@ -214,11 +215,15 @@ namespace Ice
}
};
- public AsyncResult begin_flushBatchRequests(AsyncCallback cb, object cookie)
+ public AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch, AsyncCallback cb, object cookie)
{
- var result = new CommunicatorFlushBatchCompletionCallback(this, _instance, _flushBatchRequests_name, cookie, cb);
+ var result = new CommunicatorFlushBatchCompletionCallback(this,
+ _instance,
+ _flushBatchRequests_name,
+ cookie,
+ cb);
var outgoing = new CommunicatorFlushBatchAsync(_instance, result);
- outgoing.invoke(_flushBatchRequests_name);
+ outgoing.invoke(_flushBatchRequests_name, compressBatch);
return result;
}
diff --git a/csharp/src/Ice/ConnectionFactory.cs b/csharp/src/Ice/ConnectionFactory.cs
index 936f2b3da57..51e9c992e1e 100644
--- a/csharp/src/Ice/ConnectionFactory.cs
+++ b/csharp/src/Ice/ConnectionFactory.cs
@@ -255,7 +255,7 @@ namespace IceInternal
}
}
- public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync)
+ public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)
{
ICollection<Ice.ConnectionI> c = new List<Ice.ConnectionI>();
@@ -280,7 +280,7 @@ namespace IceInternal
{
try
{
- outAsync.flushConnection(conn);
+ outAsync.flushConnection(conn, compressBatch);
}
catch(Ice.LocalException)
{
@@ -1292,7 +1292,7 @@ namespace IceInternal
}
}
- public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync)
+ public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)
{
//
// connections() is synchronized, no need to synchronize here.
@@ -1301,7 +1301,7 @@ namespace IceInternal
{
try
{
- outAsync.flushConnection(connection);
+ outAsync.flushConnection(connection, compressBatch);
}
catch(Ice.LocalException)
{
diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs
index 63a5148742a..a1cbdde8fcf 100644
--- a/csharp/src/Ice/ConnectionI.cs
+++ b/csharp/src/Ice/ConnectionI.cs
@@ -473,9 +473,9 @@ namespace Ice
return _batchRequestQueue;
}
- public void flushBatchRequests()
+ public void flushBatchRequests(CompressBatch compressBatch)
{
- flushBatchRequestsAsync().Wait();
+ flushBatchRequestsAsync(compressBatch).Wait();
}
private class ConnectionFlushBatchCompletionCallback : AsyncResultCompletionCallback
@@ -517,21 +517,24 @@ namespace Ice
private Connection _connection;
}
- public Task flushBatchRequestsAsync(IProgress<bool> progress = null,
+ public Task flushBatchRequestsAsync(CompressBatch compressBatch,
+ IProgress<bool> progress = null,
CancellationToken cancel = new CancellationToken())
{
var completed = new FlushBatchTaskCompletionCallback(progress, cancel);
var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
- outgoing.invoke(_flushBatchRequests_name);
+ outgoing.invoke(_flushBatchRequests_name, compressBatch);
return completed.Task;
}
- public AsyncResult begin_flushBatchRequests(AsyncCallback cb = null, object cookie = null)
+ public AsyncResult begin_flushBatchRequests(CompressBatch compressBatch,
+ AsyncCallback cb = null,
+ object cookie = null)
{
var result = new ConnectionFlushBatchCompletionCallback(this, _communicator, _instance,
_flushBatchRequests_name, cookie, cb);
var outgoing = new ConnectionFlushBatchAsync(this, _instance, result);
- outgoing.invoke(_flushBatchRequests_name);
+ outgoing.invoke(_flushBatchRequests_name, compressBatch);
return result;
}
diff --git a/csharp/src/Ice/ObjectAdapterFactory.cs b/csharp/src/Ice/ObjectAdapterFactory.cs
index 17ccfa91b1c..0abdea67a62 100644
--- a/csharp/src/Ice/ObjectAdapterFactory.cs
+++ b/csharp/src/Ice/ObjectAdapterFactory.cs
@@ -28,10 +28,10 @@ namespace IceInternal
}
adapters = new List<Ice.ObjectAdapterI>(_adapters);
-
+
_instance = null;
_communicator = null;
-
+
System.Threading.Monitor.PulseAll(this);
}
@@ -44,7 +44,7 @@ namespace IceInternal
adapter.deactivate();
}
}
-
+
public void waitForShutdown()
{
List<Ice.ObjectAdapterI> adapters;
@@ -57,7 +57,7 @@ namespace IceInternal
{
System.Threading.Monitor.Wait(this);
}
-
+
adapters = new List<Ice.ObjectAdapterI>(_adapters);
}
@@ -110,13 +110,13 @@ namespace IceInternal
{
adapters = new List<Ice.ObjectAdapterI>(_adapters);
}
-
+
foreach(Ice.ObjectAdapterI adapter in adapters)
{
adapter.updateConnectionObservers();
}
}
-
+
public void
updateThreadObservers()
{
@@ -125,13 +125,13 @@ namespace IceInternal
{
adapters = new List<Ice.ObjectAdapterI>(_adapters);
}
-
+
foreach(Ice.ObjectAdapterI adapter in adapters)
{
adapter.updateThreadObservers();
}
}
-
+
public Ice.ObjectAdapter createObjectAdapter(string name, Ice.RouterPrx router)
{
lock(this)
@@ -140,7 +140,7 @@ namespace IceInternal
{
throw new Ice.CommunicatorDestroyedException();
}
-
+
Ice.ObjectAdapterI adapter = null;
if(name.Length == 0)
{
@@ -163,7 +163,7 @@ namespace IceInternal
return adapter;
}
}
-
+
public Ice.ObjectAdapter findObjectAdapter(Ice.ObjectPrx proxy)
{
List<Ice.ObjectAdapterI> adapters;
@@ -173,10 +173,10 @@ namespace IceInternal
{
return null;
}
-
+
adapters = new List<Ice.ObjectAdapterI>(_adapters);
}
-
+
foreach(Ice.ObjectAdapterI adapter in adapters)
{
try
@@ -209,7 +209,7 @@ namespace IceInternal
}
}
- public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync)
+ public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)
{
List<Ice.ObjectAdapterI> adapters;
lock(this)
@@ -219,10 +219,10 @@ namespace IceInternal
foreach(Ice.ObjectAdapterI adapter in adapters)
{
- adapter.flushAsyncBatchRequests(outAsync);
+ adapter.flushAsyncBatchRequests(compressBatch, outAsync);
}
}
-
+
//
// Only for use by Instance.
//
@@ -233,7 +233,7 @@ namespace IceInternal
_adapterNamesInUse = new HashSet<string>();
_adapters = new List<Ice.ObjectAdapterI>();
}
-
+
private Instance _instance;
private Ice.Communicator _communicator;
private HashSet<string> _adapterNamesInUse;
diff --git a/csharp/src/Ice/ObjectAdapterI.cs b/csharp/src/Ice/ObjectAdapterI.cs
index 5929b46fd3c..01892c30401 100644
--- a/csharp/src/Ice/ObjectAdapterI.cs
+++ b/csharp/src/Ice/ObjectAdapterI.cs
@@ -688,7 +688,7 @@ namespace Ice
}
}
- public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync)
+ public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)
{
List<IncomingConnectionFactory> f;
lock(this)
@@ -698,7 +698,7 @@ namespace Ice
foreach(IncomingConnectionFactory factory in f)
{
- factory.flushAsyncBatchRequests(outAsync);
+ factory.flushAsyncBatchRequests(compressBatch, outAsync);
}
}
diff --git a/csharp/src/Ice/OutgoingAsync.cs b/csharp/src/Ice/OutgoingAsync.cs
index db0e76d2bc7..74828721b6a 100644
--- a/csharp/src/Ice/OutgoingAsync.cs
+++ b/csharp/src/Ice/OutgoingAsync.cs
@@ -1140,7 +1140,8 @@ namespace IceInternal
{
Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.iceReference().getProtocol()));
observer_ = ObserverHelper.get(proxy_, operation, null);
- _batchRequestNum = proxy_.iceGetBatchRequestQueue().swap(os_);
+ bool compress; // Not used for proxy flush batch requests.
+ _batchRequestNum = proxy_.iceGetBatchRequestQueue().swap(os_, out compress);
invokeImpl(true); // userThread = true
}
@@ -1198,13 +1199,14 @@ namespace IceInternal
_connection = connection;
}
- public void invoke(string operation)
+ public void invoke(string operation, Ice.CompressBatch compressBatch)
{
observer_ = ObserverHelper.get(instance_, operation);
try
{
int status;
- int batchRequestNum = _connection.getBatchRequestQueue().swap(os_);
+ bool compress;
+ int batchRequestNum = _connection.getBatchRequestQueue().swap(os_, out compress);
if(batchRequestNum == 0)
{
status = AsyncStatusSent;
@@ -1215,7 +1217,20 @@ namespace IceInternal
}
else
{
- status = _connection.sendAsyncRequest(this, false, false, batchRequestNum);
+ bool comp;
+ if(compressBatch == Ice.CompressBatch.Yes)
+ {
+ comp = true;
+ }
+ else if(compressBatch == Ice.CompressBatch.No)
+ {
+ comp = false;
+ }
+ else
+ {
+ comp = compress;
+ }
+ status = _connection.sendAsyncRequest(this, comp, false, batchRequestNum);
}
if((status & AsyncStatusSent) != 0)
@@ -1311,7 +1326,7 @@ namespace IceInternal
_useCount = 1;
}
- public void flushConnection(Ice.ConnectionI con)
+ public void flushConnection(Ice.ConnectionI con, Ice.CompressBatch compressBatch)
{
lock(this)
{
@@ -1321,14 +1336,28 @@ namespace IceInternal
try
{
var flushBatch = new FlushBatch(this, instance_, _observer);
- int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs());
+ bool compress;
+ int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs(), out compress);
if(batchRequestNum == 0)
{
flushBatch.sent();
}
else
{
- con.sendAsyncRequest(flushBatch, false, false, batchRequestNum);
+ bool comp;
+ if(compressBatch == Ice.CompressBatch.Yes)
+ {
+ comp = true;
+ }
+ else if(compressBatch == Ice.CompressBatch.No)
+ {
+ comp = false;
+ }
+ else
+ {
+ comp = compress;
+ }
+ con.sendAsyncRequest(flushBatch, comp, false, batchRequestNum);
}
}
catch(Ice.LocalException)
@@ -1338,15 +1367,15 @@ namespace IceInternal
}
}
- public void invoke(string operation)
+ public void invoke(string operation, Ice.CompressBatch compressBatch)
{
_observer = ObserverHelper.get(instance_, operation);
if(_observer != null)
{
_observer.attach();
}
- instance_.outgoingConnectionFactory().flushAsyncBatchRequests(this);
- instance_.objectAdapterFactory().flushAsyncBatchRequests(this);
+ instance_.outgoingConnectionFactory().flushAsyncBatchRequests(compressBatch, this);
+ instance_.objectAdapterFactory().flushAsyncBatchRequests(compressBatch, this);
check(true);
}
diff --git a/csharp/src/Ice/Reference.cs b/csharp/src/Ice/Reference.cs
index cfd1019cc74..f786dd5e18e 100644
--- a/csharp/src/Ice/Reference.cs
+++ b/csharp/src/Ice/Reference.cs
@@ -237,6 +237,25 @@ namespace IceInternal
}
}
+ public bool getCompressOverride(out bool compress)
+ {
+ DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ compress = defaultsAndOverrides.overrideCompressValue;
+ }
+ else if(overrideCompress_)
+ {
+ compress = compress_;
+ }
+ else
+ {
+ compress = false;
+ return false;
+ }
+ return true;
+ }
+
public abstract bool isIndirect();
public abstract bool isWellKnown();
@@ -709,7 +728,7 @@ namespace IceInternal
_fixedConnection.throwException(); // Throw in case our connection is already destroyed.
- bool compress;
+ bool compress = false;
if(defaultsAndOverrides.overrideCompress)
{
compress = defaultsAndOverrides.overrideCompressValue;
@@ -718,10 +737,6 @@ namespace IceInternal
{
compress = compress_;
}
- else
- {
- compress = _fixedConnection.endpoint().compress();
- }
return proxy.iceSetRequestHandler(new ConnectionRequestHandler(this, _fixedConnection, compress));
}