summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG-3.7.md6
-rw-r--r--cpp/include/Ice/OutgoingAsync.h86
-rw-r--r--cpp/include/Ice/Proxy.h43
-rw-r--r--cpp/src/Glacier2/RequestQueue.cpp2
-rw-r--r--cpp/src/Ice/BatchRequestQueue.cpp24
-rw-r--r--cpp/src/Ice/BatchRequestQueue.h5
-rw-r--r--cpp/src/Ice/CommunicatorI.cpp197
-rw-r--r--cpp/src/Ice/CommunicatorI.h51
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp14
-rw-r--r--cpp/src/Ice/ConnectionFactory.h4
-rw-r--r--cpp/src/Ice/ConnectionI.cpp125
-rw-r--r--cpp/src/Ice/ConnectionI.h18
-rw-r--r--cpp/src/Ice/ObjectAdapterFactory.cpp7
-rw-r--r--cpp/src/Ice/ObjectAdapterFactory.h2
-rw-r--r--cpp/src/Ice/ObjectAdapterI.cpp5
-rw-r--r--cpp/src/Ice/ObjectAdapterI.h2
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp293
-rw-r--r--cpp/src/Ice/Proxy.cpp88
-rw-r--r--cpp/src/Ice/Reference.cpp25
-rw-r--r--cpp/src/Ice/Reference.h4
-rw-r--r--cpp/src/slice2cpp/Gen.cpp7
-rw-r--r--cpp/src/slice2objc/Gen.cpp32
-rw-r--r--cpp/test/Ice/ami/AllTests.cpp67
-rw-r--r--cpp/test/Ice/operations/BatchOneways.cpp48
-rw-r--r--csharp/src/Ice/BatchRequestQueue.cs20
-rw-r--r--csharp/src/Ice/CommunicatorI.cs23
-rw-r--r--csharp/src/Ice/ConnectionFactory.cs8
-rw-r--r--csharp/src/Ice/ConnectionI.cs15
-rw-r--r--csharp/src/Ice/ObjectAdapterFactory.cs32
-rw-r--r--csharp/src/Ice/ObjectAdapterI.cs4
-rw-r--r--csharp/src/Ice/OutgoingAsync.cs49
-rw-r--r--csharp/src/Ice/Reference.cs25
-rw-r--r--csharp/test/Ice/ami/AllTests.cs76
-rw-r--r--csharp/test/Ice/operations/BatchOneways.cs36
-rw-r--r--java-compat/src/Ice/src/main/java/Ice/CommunicatorI.java27
-rw-r--r--java-compat/src/Ice/src/main/java/Ice/ConnectionI.java66
-rw-r--r--java-compat/src/Ice/src/main/java/Ice/ObjectAdapterI.java4
-rw-r--r--java-compat/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java23
-rw-r--r--java-compat/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java35
-rw-r--r--java-compat/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java36
-rw-r--r--java-compat/src/Ice/src/main/java/IceInternal/FixedReference.java6
-rw-r--r--java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java4
-rw-r--r--java-compat/src/Ice/src/main/java/IceInternal/ObjectAdapterFactory.java4
-rw-r--r--java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java4
-rw-r--r--java-compat/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java2
-rw-r--r--java-compat/src/Ice/src/main/java/IceInternal/Reference.java15
-rw-r--r--java-compat/test/src/main/java/test/Ice/ami/AMI.java18
-rw-r--r--java-compat/test/src/main/java/test/Ice/ami/lambda/AMI.java8
-rw-r--r--java-compat/test/src/main/java/test/Ice/interrupt/AllTests.java10
-rw-r--r--java-compat/test/src/main/java/test/Ice/operations/BatchOneways.java41
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/Ice/CommunicatorI.java14
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java11
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/Ice/ObjectAdapterI.java5
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/IceInternal/BatchRequestQueue.java33
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/IceInternal/CommunicatorFlushBatch.java36
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/IceInternal/ConnectionFlushBatch.java37
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/IceInternal/FixedReference.java6
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/IceInternal/IncomingConnectionFactory.java4
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/IceInternal/ObjectAdapterFactory.java4
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/IceInternal/OutgoingConnectionFactory.java4
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/IceInternal/ProxyFlushBatch.java3
-rw-r--r--java/src/Ice/src/main/java/com/zeroc/IceInternal/Reference.java15
-rw-r--r--java/test/src/main/java/test/Ice/ami/AMI.java22
-rw-r--r--java/test/src/main/java/test/Ice/interrupt/AllTests.java9
-rw-r--r--java/test/src/main/java/test/Ice/operations/BatchOneways.java41
-rw-r--r--js/src/Ice/ConnectRequestHandler.js11
-rw-r--r--js/src/Ice/ConnectionI.js30
-rw-r--r--js/src/Ice/ConnectionRequestHandler.js5
-rw-r--r--js/src/Ice/DefaultsAndOverrides.js7
-rw-r--r--js/src/Ice/IncomingAsync.js19
-rw-r--r--js/src/Ice/ObjectPrx.js19
-rw-r--r--js/src/Ice/OutgoingAsync.js20
-rw-r--r--js/src/Ice/OutgoingConnectionFactory.js36
-rw-r--r--js/src/Ice/Reference.js79
-rw-r--r--js/src/Ice/RequestHandlerFactory.js4
-rw-r--r--js/test/Ice/proxy/Client.js5
-rw-r--r--objective-c/src/Ice/CommunicatorI.mm18
-rw-r--r--objective-c/src/Ice/ConnectionI.mm18
-rw-r--r--objective-c/test/Ice/ami/AllTests.m27
-rw-r--r--php/src/php5/Communicator.cpp18
-rw-r--r--php/src/php5/Connection.cpp14
-rw-r--r--php/test/Ice/operations/Client.php11
-rw-r--r--python/modules/IcePy/Communicator.cpp64
-rw-r--r--python/modules/IcePy/Connection.cpp66
-rw-r--r--python/python/Ice.py12
-rw-r--r--python/test/Ice/ami/AllTests.py46
-rw-r--r--python/test/Ice/operations/BatchOneways.py3
-rw-r--r--ruby/src/IceRuby/Communicator.cpp16
-rw-r--r--ruby/src/IceRuby/Connection.cpp17
-rw-r--r--ruby/test/Ice/operations/BatchOneways.rb4
-rwxr-xr-xscripts/Expect.py2
-rw-r--r--slice/Ice/Communicator.ice6
-rw-r--r--slice/Ice/Connection.ice29
93 files changed, 1618 insertions, 978 deletions
diff --git a/CHANGELOG-3.7.md b/CHANGELOG-3.7.md
index 8a9fa92ec08..d1a8814430b 100644
--- a/CHANGELOG-3.7.md
+++ b/CHANGELOG-3.7.md
@@ -17,6 +17,12 @@ These are the changes since Ice 3.6.3.
## General Changes
+- The communicator and connection flushBatchRequests method now takes
+ an additional argument to specify whether or not the batch requests
+ to flush should be compressed. See the documentation of the
+ Ice::CompressBatch enumeration for the different options available
+ to specify whether or not the batch should be compressed.
+
- The UDP server endpoint now supports specifying `--interface *` to join the
multicast group on using all the local interfaces. It's also now the default
behavior if no `--interface` option is specified.
diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h
index f401975a569..c82884f5ab9 100644
--- a/cpp/include/Ice/OutgoingAsync.h
+++ b/cpp/include/Ice/OutgoingAsync.h
@@ -304,92 +304,6 @@ protected:
bool _synchronous;
};
-//
-// Class for handling the proxy's begin_ice_flushBatchRequest request.
-//
-class ICE_API ProxyFlushBatchAsync : public ProxyOutgoingAsyncBase
-{
-public:
-
- ProxyFlushBatchAsync(const Ice::ObjectPrxPtr&);
-
- virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool);
- virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*);
-
- void invoke(const std::string&);
-
-private:
-
- int _batchRequestNum;
-};
-typedef IceUtil::Handle<ProxyFlushBatchAsync> ProxyFlushBatchAsyncPtr;
-
-//
-// Class for handling the proxy's begin_ice_getConnection request.
-//
-class ICE_API ProxyGetConnection : public ProxyOutgoingAsyncBase
-{
-public:
-
- ProxyGetConnection(const Ice::ObjectPrxPtr&);
-
- virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool);
- virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*);
-
- virtual Ice::ConnectionPtr getConnection() const;
-
- void invoke(const std::string&);
-};
-typedef IceUtil::Handle<ProxyGetConnection> ProxyGetConnectionPtr;
-
-//
-// Class for handling Ice::Connection::begin_flushBatchRequests
-//
-class ICE_API ConnectionFlushBatchAsync : public OutgoingAsyncBase
-{
-public:
-
- ConnectionFlushBatchAsync(const Ice::ConnectionIPtr&, const InstancePtr&);
-
- virtual Ice::ConnectionPtr getConnection() const;
-
- void invoke(const std::string&);
-
-private:
-
- const Ice::ConnectionIPtr _connection;
-};
-typedef IceUtil::Handle<ConnectionFlushBatchAsync> ConnectionFlushBatchAsyncPtr;
-
-//
-// Class for handling Ice::Communicator::begin_flushBatchRequests
-//
-class ICE_API CommunicatorFlushBatchAsync : public OutgoingAsyncBase
-{
-public:
-
- virtual ~CommunicatorFlushBatchAsync();
-
- CommunicatorFlushBatchAsync(const InstancePtr&);
-
- void flushConnection(const Ice::ConnectionIPtr&);
- void invoke(const std::string&);
-
-#ifdef ICE_CPP11_MAPPING
- std::shared_ptr<CommunicatorFlushBatchAsync> shared_from_this()
- {
- return std::static_pointer_cast<CommunicatorFlushBatchAsync>(OutgoingAsyncBase::shared_from_this());
- }
-#endif
-
-private:
-
- void check(bool);
-
- int _useCount;
- InvocationObserver _observer;
-};
-
}
namespace IceInternal
diff --git a/cpp/include/Ice/Proxy.h b/cpp/include/Ice/Proxy.h
index e996c1324ab..461c873ccdd 100644
--- a/cpp/include/Ice/Proxy.h
+++ b/cpp/include/Ice/Proxy.h
@@ -38,6 +38,49 @@ ICE_API extern const Context noExplicitContext;
}
+namespace IceInternal
+{
+
+//
+// Class for handling the proxy's begin_ice_flushBatchRequest request.
+//
+class ICE_API ProxyFlushBatchAsync : public ProxyOutgoingAsyncBase
+{
+public:
+
+ ProxyFlushBatchAsync(const Ice::ObjectPrxPtr&);
+
+ virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool);
+ virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*);
+
+ void invoke(const std::string&);
+
+private:
+
+ int _batchRequestNum;
+};
+typedef IceUtil::Handle<ProxyFlushBatchAsync> ProxyFlushBatchAsyncPtr;
+
+//
+// Class for handling the proxy's begin_ice_getConnection request.
+//
+class ICE_API ProxyGetConnection : public ProxyOutgoingAsyncBase
+{
+public:
+
+ ProxyGetConnection(const Ice::ObjectPrxPtr&);
+
+ virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool);
+ virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*);
+
+ virtual Ice::ConnectionPtr getConnection() const;
+
+ void invoke(const std::string&);
+};
+typedef IceUtil::Handle<ProxyGetConnection> ProxyGetConnectionPtr;
+
+}
+
#ifdef ICE_CPP11_MAPPING // C++11 mapping
namespace IceInternal
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp
index 5acc66dbfa9..423b52b4e87 100644
--- a/cpp/src/Glacier2/RequestQueue.cpp
+++ b/cpp/src/Glacier2/RequestQueue.cpp
@@ -374,7 +374,7 @@ Glacier2::RequestQueue::flush()
if(flushBatchRequests)
{
- Ice::AsyncResultPtr result = _connection->begin_flushBatchRequests(_flushCallback);
+ Ice::AsyncResultPtr result = _connection->begin_flushBatchRequests(Ice::BasedOnProxy, _flushCallback);
if(!result->sentSynchronously() && !result->isCompleted())
{
_pendingSend = true;
diff --git a/cpp/src/Ice/BatchRequestQueue.cpp b/cpp/src/Ice/BatchRequestQueue.cpp
index 5eb2758b00e..50b69a5436e 100644
--- a/cpp/src/Ice/BatchRequestQueue.cpp
+++ b/cpp/src/Ice/BatchRequestQueue.cpp
@@ -10,6 +10,7 @@
#include <Ice/BatchRequestQueue.h>
#include <Ice/Instance.h>
#include <Ice/Properties.h>
+#include <Ice/Reference.h>
using namespace std;
using namespace Ice;
@@ -34,7 +35,7 @@ public:
virtual void
enqueue() const
{
- _queue.enqueueBatchRequest();
+ _queue.enqueueBatchRequest(_proxy);
}
virtual int
@@ -70,6 +71,7 @@ BatchRequestQueue::BatchRequestQueue(const InstancePtr& instance, bool datagram)
_batchStream(instance.get(), Ice::currentProtocolEncoding),
_batchStreamInUse(false),
_batchStreamCanFlush(false),
+ _batchCompress(false),
_batchRequestNum(0)
{
_batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
@@ -101,7 +103,9 @@ BatchRequestQueue::prepareBatchRequest(OutputStream* os)
}
void
-BatchRequestQueue::finishBatchRequest(OutputStream* os, const Ice::ObjectPrxPtr& proxy, const std::string& operation)
+BatchRequestQueue::finishBatchRequest(OutputStream* os,
+ const Ice::ObjectPrxPtr& proxy,
+ const std::string& operation)
{
//
// No need for synchronization, no other threads are supposed
@@ -135,6 +139,11 @@ BatchRequestQueue::finishBatchRequest(OutputStream* os, const Ice::ObjectPrxPtr&
}
else
{
+ bool compress;
+ if(proxy->_getReference()->getCompressOverride(compress))
+ {
+ _batchCompress |= compress;
+ }
_batchMarker = _batchStream.b.size();
++_batchRequestNum;
}
@@ -170,7 +179,7 @@ BatchRequestQueue::abortBatchRequest(OutputStream* os)
}
int
-BatchRequestQueue::swap(OutputStream* os)
+BatchRequestQueue::swap(OutputStream* os, bool& compress)
{
Lock sync(*this);
if(_batchRequestNum == 0)
@@ -189,11 +198,13 @@ BatchRequestQueue::swap(OutputStream* os)
int requestNum = _batchRequestNum;
_batchStream.swap(*os);
+ compress = _batchCompress;
//
// Reset the batch.
//
_batchRequestNum = 0;
+ _batchCompress = false;
_batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
_batchMarker = _batchStream.b.size();
if(!lastRequest.empty())
@@ -231,9 +242,14 @@ BatchRequestQueue::waitStreamInUse(bool flush)
}
void
-BatchRequestQueue::enqueueBatchRequest()
+BatchRequestQueue::enqueueBatchRequest(const Ice::ObjectPrxPtr& proxy)
{
assert(_batchMarker < _batchStream.b.size());
+ bool compress;
+ if(proxy->_getReference()->getCompressOverride(compress))
+ {
+ _batchCompress |= compress;
+ }
_batchMarker = _batchStream.b.size();
++_batchRequestNum;
}
diff --git a/cpp/src/Ice/BatchRequestQueue.h b/cpp/src/Ice/BatchRequestQueue.h
index 878b2a2ef6d..9bdc2b90790 100644
--- a/cpp/src/Ice/BatchRequestQueue.h
+++ b/cpp/src/Ice/BatchRequestQueue.h
@@ -33,12 +33,12 @@ public:
void finishBatchRequest(Ice::OutputStream*, const Ice::ObjectPrxPtr&, const std::string&);
void abortBatchRequest(Ice::OutputStream*);
- int swap(Ice::OutputStream*);
+ int swap(Ice::OutputStream*, bool&);
void destroy(const Ice::LocalException&);
bool isEmpty();
- void enqueueBatchRequest();
+ void enqueueBatchRequest(const Ice::ObjectPrxPtr&);
private:
@@ -52,6 +52,7 @@ private:
Ice::OutputStream _batchStream;
bool _batchStreamInUse;
bool _batchStreamCanFlush;
+ bool _batchCompress;
int _batchRequestNum;
size_t _batchMarker;
IceInternal::UniquePtr<Ice::LocalException> _exception;
diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp
index c47a3299c19..9b9e00ce239 100644
--- a/cpp/src/Ice/CommunicatorI.cpp
+++ b/cpp/src/Ice/CommunicatorI.cpp
@@ -29,6 +29,168 @@ using namespace std;
using namespace Ice;
using namespace IceInternal;
+#ifndef ICE_CPP11_MAPPING
+IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; }
+#endif
+
+CommunicatorFlushBatchAsync::~CommunicatorFlushBatchAsync()
+{
+ // Out of line to avoid weak vtable
+}
+
+CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const InstancePtr& instance) :
+ OutgoingAsyncBase(instance)
+{
+ //
+ // _useCount is initialized to 1 to prevent premature callbacks.
+ // The caller must invoke ready() after all flush requests have
+ // been initiated.
+ //
+ _useCount = 1;
+}
+
+void
+CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con, Ice::CompressBatch compressBatch)
+{
+ class FlushBatch : public OutgoingAsyncBase
+ {
+ public:
+
+ FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync,
+ const InstancePtr& instance,
+ InvocationObserver& observer) :
+ OutgoingAsyncBase(instance), _outAsync(outAsync), _observer(observer)
+ {
+ }
+
+ virtual bool
+ sent()
+ {
+ _childObserver.detach();
+ _outAsync->check(false);
+ return false;
+ }
+
+ virtual bool
+ exception(const Exception& ex)
+ {
+ _childObserver.failed(ex.ice_id());
+ _childObserver.detach();
+ _outAsync->check(false);
+ return false;
+ }
+
+ virtual InvocationObserver&
+ getObserver()
+ {
+ return _observer;
+ }
+
+ virtual bool handleSent(bool, bool)
+ {
+ return false;
+ }
+
+ virtual bool handleException(const Ice::Exception&)
+ {
+ return false;
+ }
+
+ virtual bool handleResponse(bool)
+ {
+ return false;
+ }
+
+ virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const
+ {
+ assert(false);
+ }
+
+ virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const
+ {
+ assert(false);
+ }
+
+ virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const
+ {
+ assert(false);
+ }
+
+ private:
+
+ const CommunicatorFlushBatchAsyncPtr _outAsync;
+ InvocationObserver& _observer;
+ };
+
+ {
+ Lock sync(_m);
+ ++_useCount;
+ }
+
+ try
+ {
+ OutgoingAsyncBasePtr flushBatch = ICE_MAKE_SHARED(FlushBatch, ICE_SHARED_FROM_THIS, _instance, _observer);
+ bool compress;
+ int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs(), compress);
+ if(batchRequestNum == 0)
+ {
+ flushBatch->sent();
+ }
+ else
+ {
+ if(compressBatch == Ice::Yes)
+ {
+ compress = true;
+ }
+ else if(compressBatch == Ice::No)
+ {
+ compress = false;
+ }
+ con->sendAsyncRequest(flushBatch, compress, false, batchRequestNum);
+ }
+ }
+ catch(const LocalException&)
+ {
+ check(false);
+ throw;
+ }
+}
+
+void
+CommunicatorFlushBatchAsync::invoke(const string& operation, CompressBatch compressBatch)
+{
+ _observer.attach(_instance.get(), operation);
+ _instance->outgoingConnectionFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS, compressBatch);
+ _instance->objectAdapterFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS, compressBatch);
+ check(true);
+}
+
+void
+CommunicatorFlushBatchAsync::check(bool userThread)
+{
+ {
+ Lock sync(_m);
+ assert(_useCount > 0);
+ if(--_useCount > 0)
+ {
+ return;
+ }
+ }
+
+ if(sentImpl(true))
+ {
+ if(userThread)
+ {
+ _sentSynchronously = true;
+ invokeSent();
+ }
+ else
+ {
+ invokeSentAsync();
+ }
+ }
+}
+
void
Ice::CommunicatorI::destroy() ICE_NOEXCEPT
{
@@ -210,13 +372,15 @@ const ::std::string flushBatchRequests_name = "flushBatchRequests";
#ifdef ICE_CPP11_MAPPING
void
-Ice::CommunicatorI::flushBatchRequests()
+Ice::CommunicatorI::flushBatchRequests(CompressBatch compress)
{
- Communicator::flushBatchRequestsAsync().get();
+ Communicator::flushBatchRequestsAsync(compress).get();
}
::std::function<void()>
-Ice::CommunicatorI::flushBatchRequestsAsync(function<void(exception_ptr)> ex, function<void(bool)> sent)
+Ice::CommunicatorI::flushBatchRequestsAsync(CompressBatch compress,
+ function<void(exception_ptr)> ex,
+ function<void(bool)> sent)
{
class CommunicatorFlushBatchLambda : public CommunicatorFlushBatchAsync, public LambdaInvoke
{
@@ -230,39 +394,44 @@ Ice::CommunicatorI::flushBatchRequestsAsync(function<void(exception_ptr)> ex, fu
}
};
auto outAsync = make_shared<CommunicatorFlushBatchLambda>(_instance, ex, sent);
- outAsync->invoke(flushBatchRequests_name);
+ outAsync->invoke(flushBatchRequests_name, compress);
return [outAsync]() { outAsync->cancel(); };
}
#else
void
-Ice::CommunicatorI::flushBatchRequests()
+Ice::CommunicatorI::flushBatchRequests(CompressBatch compress)
{
- end_flushBatchRequests(begin_flushBatchRequests());
+ end_flushBatchRequests(begin_flushBatchRequests(compress));
}
AsyncResultPtr
-Ice::CommunicatorI::begin_flushBatchRequests()
+Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress)
{
- return _iceI_begin_flushBatchRequests(::IceInternal::dummyCallback, 0);
+ return _iceI_begin_flushBatchRequests(compress, ::IceInternal::dummyCallback, 0);
}
AsyncResultPtr
-Ice::CommunicatorI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie)
+Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress,
+ const CallbackPtr& cb,
+ const LocalObjectPtr& cookie)
{
- return _iceI_begin_flushBatchRequests(cb, cookie);
+ return _iceI_begin_flushBatchRequests(compress, cb, cookie);
}
AsyncResultPtr
-Ice::CommunicatorI::begin_flushBatchRequests(const Callback_Communicator_flushBatchRequestsPtr& cb,
+Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress,
+ const Callback_Communicator_flushBatchRequestsPtr& cb,
const LocalObjectPtr& cookie)
{
- return _iceI_begin_flushBatchRequests(cb, cookie);
+ return _iceI_begin_flushBatchRequests(compress, cb, cookie);
}
AsyncResultPtr
-Ice::CommunicatorI::_iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr& cb, const LocalObjectPtr& cookie)
+Ice::CommunicatorI::_iceI_begin_flushBatchRequests(CompressBatch compress,
+ const IceInternal::CallbackBasePtr& cb,
+ const LocalObjectPtr& cookie)
{
class CommunicatorFlushBatchAsyncWithCallback : public CommunicatorFlushBatchAsync, public CallbackCompletion
{
@@ -294,7 +463,7 @@ Ice::CommunicatorI::_iceI_begin_flushBatchRequests(const IceInternal::CallbackBa
};
CommunicatorFlushBatchAsyncPtr result = new CommunicatorFlushBatchAsyncWithCallback(this, _instance, cb, cookie);
- result->invoke(flushBatchRequests_name);
+ result->invoke(flushBatchRequests_name, compress);
return result;
}
diff --git a/cpp/src/Ice/CommunicatorI.h b/cpp/src/Ice/CommunicatorI.h
index c1ef6279068..5c9035261f8 100644
--- a/cpp/src/Ice/CommunicatorI.h
+++ b/cpp/src/Ice/CommunicatorI.h
@@ -16,6 +16,41 @@
#include <Ice/Initialize.h>
#include <Ice/Communicator.h>
#include <Ice/CommunicatorAsync.h>
+#include <Ice/OutgoingAsync.h>
+
+namespace IceInternal
+{
+
+//
+// Class for handling Ice::Communicator::begin_flushBatchRequests
+//
+class CommunicatorFlushBatchAsync : public OutgoingAsyncBase
+{
+public:
+
+ virtual ~CommunicatorFlushBatchAsync();
+
+ CommunicatorFlushBatchAsync(const InstancePtr&);
+
+ void flushConnection(const Ice::ConnectionIPtr&, Ice::CompressBatch);
+ void invoke(const std::string&, Ice::CompressBatch);
+
+#ifdef ICE_CPP11_MAPPING
+ std::shared_ptr<CommunicatorFlushBatchAsync> shared_from_this()
+ {
+ return std::static_pointer_cast<CommunicatorFlushBatchAsync>(OutgoingAsyncBase::shared_from_this());
+ }
+#endif
+
+private:
+
+ void check(bool);
+
+ int _useCount;
+ InvocationObserver _observer;
+};
+
+}
namespace Ice
{
@@ -68,16 +103,18 @@ public:
virtual ValueFactoryManagerPtr getValueFactoryManager() const;
- virtual void flushBatchRequests();
+ virtual void flushBatchRequests(CompressBatch);
#ifdef ICE_CPP11_MAPPING
virtual ::std::function<void()>
- flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)>,
+ flushBatchRequestsAsync(CompressBatch,
+ ::std::function<void(::std::exception_ptr)>,
::std::function<void(bool)> = nullptr);
#else
- virtual AsyncResultPtr begin_flushBatchRequests();
- virtual AsyncResultPtr begin_flushBatchRequests(const CallbackPtr&, const LocalObjectPtr& = 0);
- virtual AsyncResultPtr begin_flushBatchRequests(const Callback_Communicator_flushBatchRequestsPtr&,
+ virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch);
+ virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch, const CallbackPtr&, const LocalObjectPtr& = 0);
+ virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch,
+ const Callback_Communicator_flushBatchRequestsPtr&,
const LocalObjectPtr& = 0);
virtual void end_flushBatchRequests(const AsyncResultPtr&);
@@ -113,7 +150,9 @@ private:
friend ICE_API ::IceUtil::TimerPtr IceInternal::getInstanceTimer(const ::Ice::CommunicatorPtr&);
#ifndef ICE_CPP11_MAPPING
- AsyncResultPtr _iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
+ AsyncResultPtr _iceI_begin_flushBatchRequests(CompressBatch,
+ const IceInternal::CallbackBasePtr&,
+ const LocalObjectPtr&);
#endif
const ::IceInternal::InstancePtr _instance;
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 755ab969003..0234c1c33dd 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -25,6 +25,7 @@
#include <Ice/LocalException.h>
#include <Ice/Functional.h>
#include <Ice/OutgoingAsync.h>
+#include <Ice/CommunicatorI.h>
#include <IceUtil/Random.h>
#include <iterator>
@@ -223,7 +224,8 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
}
void
-IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore,
+IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts,
+ bool hasMore,
Ice::EndpointSelectionType selType,
const CreateConnectionCallbackPtr& callback)
{
@@ -335,7 +337,8 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad
}
void
-IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync)
+IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync,
+ Ice::CompressBatch compress)
{
list<ConnectionIPtr> c;
@@ -355,7 +358,7 @@ IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const Communicat
{
try
{
- outAsync->flushConnection(*p);
+ outAsync->flushConnection(*p, compress);
}
catch(const LocalException&)
{
@@ -1276,7 +1279,8 @@ IceInternal::IncomingConnectionFactory::connections() const
}
void
-IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync)
+IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync,
+ Ice::CompressBatch compress)
{
list<ConnectionIPtr> c = connections(); // connections() is synchronized, so no need to synchronize here.
@@ -1284,7 +1288,7 @@ IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const Communicat
{
try
{
- outAsync->flushConnection(*p);
+ outAsync->flushConnection(*p, compress);
}
catch(const LocalException&)
{
diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h
index 69331e04f77..b8058ab593e 100644
--- a/cpp/src/Ice/ConnectionFactory.h
+++ b/cpp/src/Ice/ConnectionFactory.h
@@ -66,7 +66,7 @@ public:
void create(const std::vector<EndpointIPtr>&, bool, Ice::EndpointSelectionType, const CreateConnectionCallbackPtr&);
void setRouterInfo(const RouterInfoPtr&);
void removeAdapter(const Ice::ObjectAdapterPtr&);
- void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&);
+ void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&, Ice::CompressBatch);
OutgoingConnectionFactory(const Ice::CommunicatorPtr&, const InstancePtr&);
virtual ~OutgoingConnectionFactory();
@@ -191,7 +191,7 @@ public:
EndpointIPtr endpoint() const;
std::list<Ice::ConnectionIPtr> connections() const;
- void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&);
+ void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&, Ice::CompressBatch);
//
// Operations from EventHandler
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 23388d399c6..598f4c983d3 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -133,6 +133,25 @@ private:
const bool _close;
};
+//
+// Class for handling Ice::Connection::begin_flushBatchRequests
+//
+class ConnectionFlushBatchAsync : public OutgoingAsyncBase
+{
+public:
+
+ ConnectionFlushBatchAsync(const Ice::ConnectionIPtr&, const InstancePtr&);
+
+ virtual Ice::ConnectionPtr getConnection() const;
+
+ void invoke(const std::string&, Ice::CompressBatch);
+
+private:
+
+ const Ice::ConnectionIPtr _connection;
+};
+typedef IceUtil::Handle<ConnectionFlushBatchAsync> ConnectionFlushBatchAsyncPtr;
+
ConnectionState connectionStateMap[] = {
ConnectionStateValidating, // StateNotInitialized
ConnectionStateValidating, // StateNotValidated
@@ -146,6 +165,72 @@ ConnectionState connectionStateMap[] = {
}
+ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, const InstancePtr& instance) :
+ OutgoingAsyncBase(instance), _connection(connection)
+{
+}
+
+ConnectionPtr
+ConnectionFlushBatchAsync::getConnection() const
+{
+ return _connection;
+}
+
+void
+ConnectionFlushBatchAsync::invoke(const string& operation, Ice::CompressBatch compressBatch)
+{
+ _observer.attach(_instance.get(), operation);
+ try
+ {
+ AsyncStatus status;
+ bool compress;
+ int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os, compress);
+ if(batchRequestNum == 0)
+ {
+ status = AsyncStatusSent;
+ if(sent())
+ {
+ status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
+ }
+ }
+ else
+ {
+ if(compressBatch == Ice::Yes)
+ {
+ compress = true;
+ }
+ else if(compressBatch == Ice::No)
+ {
+ compress = false;
+ }
+ status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, batchRequestNum);
+ }
+
+ if(status & AsyncStatusSent)
+ {
+ _sentSynchronously = true;
+ if(status & AsyncStatusInvokeSentCallback)
+ {
+ invokeSent();
+ }
+ }
+ }
+ catch(const RetryException& ex)
+ {
+ if(exception(*ex.get()))
+ {
+ invokeExceptionAsync();
+ }
+ }
+ catch(const Exception& ex)
+ {
+ if(exception(ex))
+ {
+ invokeExceptionAsync();
+ }
+ }
+}
+
Ice::ConnectionI::Observer::Observer() : _readStreamPos(0), _writeStreamPos(0)
{
}
@@ -698,13 +783,14 @@ Ice::ConnectionI::getBatchRequestQueue() const
#ifdef ICE_CPP11_MAPPING
void
-Ice::ConnectionI::flushBatchRequests()
+Ice::ConnectionI::flushBatchRequests(CompressBatch compress)
{
- Connection::flushBatchRequestsAsync().get();
+ Connection::flushBatchRequestsAsync(compress).get();
}
std::function<void()>
-Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)> ex,
+Ice::ConnectionI::flushBatchRequestsAsync(CompressBatch compress,
+ ::std::function<void(::std::exception_ptr)> ex,
::std::function<void(bool)> sent)
{
class ConnectionFlushBatchLambda : public ConnectionFlushBatchAsync, public LambdaInvoke
@@ -720,37 +806,42 @@ Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_
}
};
auto outAsync = make_shared<ConnectionFlushBatchLambda>(ICE_SHARED_FROM_THIS, _instance, ex, sent);
- outAsync->invoke(flushBatchRequests_name);
+ outAsync->invoke(flushBatchRequests_name, compress);
return [outAsync]() { outAsync->cancel(); };
}
#else
void
-Ice::ConnectionI::flushBatchRequests()
+Ice::ConnectionI::flushBatchRequests(CompressBatch compress)
{
- end_flushBatchRequests(begin_flushBatchRequests());
+ end_flushBatchRequests(begin_flushBatchRequests(compress));
}
AsyncResultPtr
-Ice::ConnectionI::begin_flushBatchRequests()
+Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress)
{
- return _iceI_begin_flushBatchRequests(dummyCallback, 0);
+ return _iceI_begin_flushBatchRequests(compress, dummyCallback, 0);
}
AsyncResultPtr
-Ice::ConnectionI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie)
+Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress,
+ const CallbackPtr& cb,
+ const LocalObjectPtr& cookie)
{
- return _iceI_begin_flushBatchRequests(cb, cookie);
+ return _iceI_begin_flushBatchRequests(compress, cb, cookie);
}
AsyncResultPtr
-Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchRequestsPtr& cb,
+Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress,
+ const Callback_Connection_flushBatchRequestsPtr& cb,
const LocalObjectPtr& cookie)
{
- return _iceI_begin_flushBatchRequests(cb, cookie);
+ return _iceI_begin_flushBatchRequests(compress, cb, cookie);
}
AsyncResultPtr
-Ice::ConnectionI::_iceI_begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
+Ice::ConnectionI::_iceI_begin_flushBatchRequests(CompressBatch compress,
+ const CallbackBasePtr& cb,
+ const LocalObjectPtr& cookie)
{
class ConnectionFlushBatchAsyncWithCallback : public ConnectionFlushBatchAsync, public CallbackCompletion
{
@@ -791,8 +882,12 @@ Ice::ConnectionI::_iceI_begin_flushBatchRequests(const CallbackBasePtr& cb, cons
Ice::ConnectionPtr _connection;
};
- ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this, _communicator, _instance, cb, cookie);
- result->invoke(flushBatchRequests_name);
+ ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this,
+ _communicator,
+ _instance,
+ cb,
+ cookie);
+ result->invoke(flushBatchRequests_name, compress);
return result;
}
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 7fb3781e880..bd6398362df 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -175,16 +175,18 @@ public:
IceInternal::BatchRequestQueuePtr getBatchRequestQueue() const;
- virtual void flushBatchRequests();
+ virtual void flushBatchRequests(CompressBatch);
#ifdef ICE_CPP11_MAPPING
virtual std::function<void()>
- flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)>,
- ::std::function<void(bool)> = nullptr);
+ flushBatchRequestsAsync(CompressBatch,
+ ::std::function<void(::std::exception_ptr)>,
+ ::std::function<void(bool)> = nullptr);
#else
- virtual AsyncResultPtr begin_flushBatchRequests();
- virtual AsyncResultPtr begin_flushBatchRequests(const CallbackPtr&, const LocalObjectPtr& = 0);
- virtual AsyncResultPtr begin_flushBatchRequests(const Callback_Connection_flushBatchRequestsPtr&,
+ virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch);
+ virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch, const CallbackPtr&, const LocalObjectPtr& = 0);
+ virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch,
+ const Callback_Connection_flushBatchRequestsPtr&,
const LocalObjectPtr& = 0);
virtual void end_flushBatchRequests(const AsyncResultPtr&);
@@ -320,7 +322,9 @@ private:
void reap();
#ifndef ICE_CPP11_MAPPING
- AsyncResultPtr _iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
+ AsyncResultPtr _iceI_begin_flushBatchRequests(CompressBatch,
+ const IceInternal::CallbackBasePtr&,
+ const LocalObjectPtr&);
AsyncResultPtr _iceI_begin_heartbeat(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
#endif
diff --git a/cpp/src/Ice/ObjectAdapterFactory.cpp b/cpp/src/Ice/ObjectAdapterFactory.cpp
index 47398bc8720..2e9dd66ff0c 100644
--- a/cpp/src/Ice/ObjectAdapterFactory.cpp
+++ b/cpp/src/Ice/ObjectAdapterFactory.cpp
@@ -134,7 +134,7 @@ IceInternal::ObjectAdapterFactory::updateObservers(void (ObjectAdapterI::*fn)())
adapters = _adapters;
}
#ifdef ICE_CPP11_MAPPING
- for_each(adapters.begin(), adapters.end(),
+ for_each(adapters.begin(), adapters.end(),
[fn](const ObjectAdapterIPtr& adapter)
{
(adapter.get() ->* fn)();
@@ -231,7 +231,8 @@ IceInternal::ObjectAdapterFactory::removeObjectAdapter(const ObjectAdapterPtr& a
}
void
-IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync) const
+IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync,
+ CompressBatch compressBatch) const
{
list<ObjectAdapterIPtr> adapters;
{
@@ -242,7 +243,7 @@ IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorFlu
for(list<ObjectAdapterIPtr>::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
{
- (*p)->flushAsyncBatchRequests(outAsync);
+ (*p)->flushAsyncBatchRequests(outAsync, compressBatch);
}
}
diff --git a/cpp/src/Ice/ObjectAdapterFactory.h b/cpp/src/Ice/ObjectAdapterFactory.h
index 93a6aede7df..523ec46b304 100644
--- a/cpp/src/Ice/ObjectAdapterFactory.h
+++ b/cpp/src/Ice/ObjectAdapterFactory.h
@@ -38,7 +38,7 @@ public:
::Ice::ObjectAdapterPtr createObjectAdapter(const std::string&, const Ice::RouterPrxPtr&);
::Ice::ObjectAdapterPtr findObjectAdapter(const ::Ice::ObjectPrxPtr&);
void removeObjectAdapter(const ::Ice::ObjectAdapterPtr&);
- void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&) const;
+ void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&, ::Ice::CompressBatch) const;
ObjectAdapterFactory(const InstancePtr&, const ::Ice::CommunicatorPtr&);
virtual ~ObjectAdapterFactory();
diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp
index ac60baa4a0c..e7c6e529095 100644
--- a/cpp/src/Ice/ObjectAdapterI.cpp
+++ b/cpp/src/Ice/ObjectAdapterI.cpp
@@ -9,6 +9,7 @@
#include <Ice/UUID.h>
#include <Ice/ObjectAdapterI.h>
+#include <Ice/CommunicatorI.h>
#include <Ice/ObjectAdapterFactory.h>
#include <Ice/Instance.h>
#include <Ice/Proxy.h>
@@ -786,7 +787,7 @@ Ice::ObjectAdapterI::isLocal(const ObjectPrxPtr& proxy) const
}
void
-Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync)
+Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync, CompressBatch compress)
{
vector<IncomingConnectionFactoryPtr> f;
{
@@ -796,7 +797,7 @@ Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPt
for(vector<IncomingConnectionFactoryPtr>::const_iterator p = f.begin(); p != f.end(); ++p)
{
- (*p)->flushAsyncBatchRequests(outAsync);
+ (*p)->flushAsyncBatchRequests(outAsync, compress);
}
}
diff --git a/cpp/src/Ice/ObjectAdapterI.h b/cpp/src/Ice/ObjectAdapterI.h
index 855d836f471..d86ce43235c 100644
--- a/cpp/src/Ice/ObjectAdapterI.h
+++ b/cpp/src/Ice/ObjectAdapterI.h
@@ -92,7 +92,7 @@ public:
bool isLocal(const ObjectPrxPtr&) const;
- void flushAsyncBatchRequests(const IceInternal::CommunicatorFlushBatchAsyncPtr&);
+ void flushAsyncBatchRequests(const IceInternal::CommunicatorFlushBatchAsyncPtr&, CompressBatch);
void updateConnectionObservers();
void updateThreadObservers();
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index bd82a31e9a5..15a2d819260 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -29,7 +29,6 @@ using namespace IceInternal;
IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; }
IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; }
IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; }
#endif
const unsigned char OutgoingAsyncBase::OK = 0x1;
@@ -1181,298 +1180,6 @@ OutgoingAsync::throwUserException()
#endif
-ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy) : ProxyOutgoingAsyncBase(proxy)
-{
-}
-
-AsyncStatus
-ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool)
-{
- if(_batchRequestNum == 0)
- {
- if(sent())
- {
- return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
- }
- else
- {
- return AsyncStatusSent;
- }
- }
- _cachedConnection = connection;
- return connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, _batchRequestNum);
-}
-
-AsyncStatus
-ProxyFlushBatchAsync::invokeCollocated(CollocatedRequestHandler* handler)
-{
- if(_batchRequestNum == 0)
- {
- if(sent())
- {
- return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
- }
- else
- {
- return AsyncStatusSent;
- }
- }
- return handler->invokeAsyncRequest(this, _batchRequestNum, false);
-}
-
-void
-ProxyFlushBatchAsync::invoke(const string& operation)
-{
- checkSupportedProtocol(getCompatibleProtocol(_proxy->_getReference()->getProtocol()));
- _observer.attach(_proxy, operation, ::Ice::noExplicitContext);
- _batchRequestNum = _proxy->_getBatchRequestQueue()->swap(&_os);
- invokeImpl(true); // userThread = true
-}
-
-ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx) : ProxyOutgoingAsyncBase(prx)
-{
-}
-
-AsyncStatus
-ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool)
-{
- _cachedConnection = connection;
- if(responseImpl(true))
- {
- invokeResponseAsync();
- }
- return AsyncStatusSent;
-}
-
-AsyncStatus
-ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*)
-{
- if(responseImpl(true))
- {
- invokeResponseAsync();
- }
- return AsyncStatusSent;
-}
-
-Ice::ConnectionPtr
-ProxyGetConnection::getConnection() const
-{
- return _cachedConnection;
-}
-
-void
-ProxyGetConnection::invoke(const string& operation)
-{
- _observer.attach(_proxy, operation, ::Ice::noExplicitContext);
- invokeImpl(true); // userThread = true
-}
-
-ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, const InstancePtr& instance) :
- OutgoingAsyncBase(instance), _connection(connection)
-{
-}
-
-ConnectionPtr
-ConnectionFlushBatchAsync::getConnection() const
-{
- return _connection;
-}
-
-void
-ConnectionFlushBatchAsync::invoke(const string& operation)
-{
- _observer.attach(_instance.get(), operation);
- try
- {
- AsyncStatus status;
- int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os);
- if(batchRequestNum == 0)
- {
- status = AsyncStatusSent;
- if(sent())
- {
- status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
- }
- }
- else
- {
- status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, false, false, batchRequestNum);
- }
-
- if(status & AsyncStatusSent)
- {
- _sentSynchronously = true;
- if(status & AsyncStatusInvokeSentCallback)
- {
- invokeSent();
- }
- }
- }
- catch(const RetryException& ex)
- {
- if(exception(*ex.get()))
- {
- invokeExceptionAsync();
- }
- }
- catch(const Exception& ex)
- {
- if(exception(ex))
- {
- invokeExceptionAsync();
- }
- }
-}
-
-CommunicatorFlushBatchAsync::~CommunicatorFlushBatchAsync()
-{
- // Out of line to avoid weak vtable
-}
-
-CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const InstancePtr& instance) :
- OutgoingAsyncBase(instance)
-{
- //
- // _useCount is initialized to 1 to prevent premature callbacks.
- // The caller must invoke ready() after all flush requests have
- // been initiated.
- //
- _useCount = 1;
-}
-
-void
-CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con)
-{
- class FlushBatch : public OutgoingAsyncBase
- {
- public:
-
- FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync,
- const InstancePtr& instance,
- InvocationObserver& observer) :
- OutgoingAsyncBase(instance), _outAsync(outAsync), _observer(observer)
- {
- }
-
- virtual bool
- sent()
- {
- _childObserver.detach();
- _outAsync->check(false);
- return false;
- }
-
- virtual bool
- exception(const Exception& ex)
- {
- _childObserver.failed(ex.ice_id());
- _childObserver.detach();
- _outAsync->check(false);
- return false;
- }
-
- virtual InvocationObserver&
- getObserver()
- {
- return _observer;
- }
-
- virtual bool handleSent(bool, bool)
- {
- return false;
- }
-
- virtual bool handleException(const Ice::Exception&)
- {
- return false;
- }
-
- virtual bool handleResponse(bool)
- {
- return false;
- }
-
- virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const
- {
- assert(false);
- }
-
- virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const
- {
- assert(false);
- }
-
- virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const
- {
- assert(false);
- }
-
- private:
-
- const CommunicatorFlushBatchAsyncPtr _outAsync;
- InvocationObserver& _observer;
- };
-
- {
- Lock sync(_m);
- ++_useCount;
- }
-
- try
- {
- OutgoingAsyncBasePtr flushBatch = ICE_MAKE_SHARED(FlushBatch, ICE_SHARED_FROM_THIS, _instance, _observer);
- int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs());
- if(batchRequestNum == 0)
- {
- flushBatch->sent();
- }
- else
- {
- con->sendAsyncRequest(flushBatch, false, false, batchRequestNum);
- }
- }
- catch(const LocalException&)
- {
- check(false);
- throw;
- }
-}
-
-void
-CommunicatorFlushBatchAsync::invoke(const string& operation)
-{
- _observer.attach(_instance.get(), operation);
- _instance->outgoingConnectionFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS);
- _instance->objectAdapterFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS);
- check(true);
-}
-
-void
-CommunicatorFlushBatchAsync::check(bool userThread)
-{
- {
- Lock sync(_m);
- assert(_useCount > 0);
- if(--_useCount > 0)
- {
- return;
- }
- }
-
- if(sentImpl(true))
- {
- if(userThread)
- {
- _sentSynchronously = true;
- invokeSent();
- }
- else
- {
- invokeSentAsync();
- }
- }
-}
-
#ifdef ICE_CPP11_MAPPING
bool
diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp
index 4c36e74e369..2535c05c1e4 100644
--- a/cpp/src/Ice/Proxy.cpp
+++ b/cpp/src/Ice/Proxy.cpp
@@ -14,6 +14,7 @@
#include <Ice/ObjectAdapterFactory.h>
#include <Ice/OutgoingAsync.h>
#include <Ice/Reference.h>
+#include <Ice/CollocatedRequestHandler.h>
#include <Ice/EndpointI.h>
#include <Ice/Instance.h>
#include <Ice/RouterInfo.h>
@@ -48,6 +49,93 @@ const string ice_flushBatchRequests_name = "ice_flushBatchRequests";
}
+ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy) : ProxyOutgoingAsyncBase(proxy)
+{
+}
+
+AsyncStatus
+ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool)
+{
+ if(_batchRequestNum == 0)
+ {
+ if(sent())
+ {
+ return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
+ }
+ else
+ {
+ return AsyncStatusSent;
+ }
+ }
+ _cachedConnection = connection;
+ return connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, _batchRequestNum);
+}
+
+AsyncStatus
+ProxyFlushBatchAsync::invokeCollocated(CollocatedRequestHandler* handler)
+{
+ if(_batchRequestNum == 0)
+ {
+ if(sent())
+ {
+ return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
+ }
+ else
+ {
+ return AsyncStatusSent;
+ }
+ }
+ return handler->invokeAsyncRequest(this, _batchRequestNum, false);
+}
+
+void
+ProxyFlushBatchAsync::invoke(const string& operation)
+{
+ checkSupportedProtocol(getCompatibleProtocol(_proxy->_getReference()->getProtocol()));
+ _observer.attach(_proxy, operation, ::Ice::noExplicitContext);
+ bool compress; // Ignore for proxy flushBatchRequests
+ _batchRequestNum = _proxy->_getBatchRequestQueue()->swap(&_os, compress);
+ invokeImpl(true); // userThread = true
+}
+
+ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx) : ProxyOutgoingAsyncBase(prx)
+{
+}
+
+AsyncStatus
+ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool)
+{
+ _cachedConnection = connection;
+ if(responseImpl(true))
+ {
+ invokeResponseAsync();
+ }
+ return AsyncStatusSent;
+}
+
+AsyncStatus
+ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*)
+{
+ if(responseImpl(true))
+ {
+ invokeResponseAsync();
+ }
+ return AsyncStatusSent;
+}
+
+Ice::ConnectionPtr
+ProxyGetConnection::getConnection() const
+{
+ return _cachedConnection;
+}
+
+void
+ProxyGetConnection::invoke(const string& operation)
+{
+ _observer.attach(_proxy, operation, ::Ice::noExplicitContext);
+ invokeImpl(true); // userThread = true
+}
+
#ifdef ICE_CPP11_MAPPING // C++11 mapping
namespace Ice
diff --git a/cpp/src/Ice/Reference.cpp b/cpp/src/Ice/Reference.cpp
index 188ad33e5ee..31cd7631479 100644
--- a/cpp/src/Ice/Reference.cpp
+++ b/cpp/src/Ice/Reference.cpp
@@ -174,6 +174,25 @@ IceInternal::Reference::changeCompress(bool newCompress) const
return r;
}
+bool
+IceInternal::Reference::getCompressOverride(bool& compress) const
+{
+ DefaultsAndOverridesPtr defaultsAndOverrides = getInstance()->defaultsAndOverrides();
+ if(defaultsAndOverrides->overrideCompress)
+ {
+ compress = defaultsAndOverrides->overrideCompressValue;
+ }
+ else if(_overrideCompress)
+ {
+ compress = _compress;
+ }
+ else
+ {
+ return false;
+ }
+ return true;
+}
+
Int
Reference::hash() const
{
@@ -791,7 +810,7 @@ IceInternal::FixedReference::getRequestHandler(const Ice::ObjectPrxPtr& proxy) c
_fixedConnection->throwException(); // Throw in case our connection is already destroyed.
- bool compress;
+ bool compress = false;
if(defaultsAndOverrides->overrideCompress)
{
compress = defaultsAndOverrides->overrideCompressValue;
@@ -800,10 +819,6 @@ IceInternal::FixedReference::getRequestHandler(const Ice::ObjectPrxPtr& proxy) c
{
compress = _compress;
}
- else
- {
- compress = _fixedConnection->endpoint()->compress();
- }
ReferencePtr ref = const_cast<FixedReference*>(this);
return proxy->_setRequestHandler(ICE_MAKE_SHARED(ConnectionRequestHandler, ref, _fixedConnection, compress));
diff --git a/cpp/src/Ice/Reference.h b/cpp/src/Ice/Reference.h
index a48a61c66e4..7a19ab3525f 100644
--- a/cpp/src/Ice/Reference.h
+++ b/cpp/src/Ice/Reference.h
@@ -41,7 +41,7 @@ class Reference : public IceUtil::Shared
{
public:
- class GetConnectionCallback
+ class GetConnectionCallback
#ifndef ICE_CPP11_MAPPING
: public virtual IceUtil::Shared
#endif
@@ -116,6 +116,8 @@ public:
int hash() const; // Conceptually const.
+ bool getCompressOverride(bool&) const;
+
//
// Utility methods.
//
diff --git a/cpp/src/slice2cpp/Gen.cpp b/cpp/src/slice2cpp/Gen.cpp
index 7beb1fb7371..0173509b925 100644
--- a/cpp/src/slice2cpp/Gen.cpp
+++ b/cpp/src/slice2cpp/Gen.cpp
@@ -6620,7 +6620,7 @@ Slice::Gen::Cpp11LocalObjectVisitor::visitOperation(const OperationPtr& p)
{
vector<string> paramsDeclAMI;
vector<string> outParamsDeclAMI;
-
+ vector<string> paramsArgAMI;
ParamDeclList paramList = p->parameters();
for(ParamDeclList::const_iterator r = paramList.begin(); r != paramList.end(); ++r)
{
@@ -6632,6 +6632,7 @@ Slice::Gen::Cpp11LocalObjectVisitor::visitOperation(const OperationPtr& p)
{
typeString = inputTypeToString((*r)->type(), (*r)->optional(), metaData, typeCtx);
paramsDeclAMI.push_back(typeString + ' ' + paramName);
+ paramsArgAMI.push_back(paramName);
}
}
@@ -6663,11 +6664,11 @@ Slice::Gen::Cpp11LocalObjectVisitor::visitOperation(const OperationPtr& p)
H << nl << name << "Async(";
H.useCurrentPosAsIndent();
- for(vector<string>::const_iterator i = paramsDeclAMI.begin(); i != paramsDeclAMI.end(); ++i)
+ for(vector<string>::const_iterator i = paramsArgAMI.begin(); i != paramsArgAMI.end(); ++i)
{
H << *i << ",";
}
- if(!paramsDeclAMI.empty())
+ if(!paramsArgAMI.empty())
{
H << nl;
}
diff --git a/cpp/src/slice2objc/Gen.cpp b/cpp/src/slice2objc/Gen.cpp
index 80b5609417e..7ad7d9883e6 100644
--- a/cpp/src/slice2objc/Gen.cpp
+++ b/cpp/src/slice2objc/Gen.cpp
@@ -1214,15 +1214,29 @@ Slice::Gen::TypesVisitor::visitOperation(const OperationPtr& p)
if(cl->isLocal() && (cl->hasMetaData("async-oneway") || p->hasMetaData("async-oneway")))
{
- // TODO: add support for parameters when needed.
- _H << nl << "-(id<ICEAsyncResult>) begin_" << name << deprecateSymbol << ";";
- _H << nl << "-(id<ICEAsyncResult>) begin_" << name << ":(void(^)(ICEException*))exception"
- << deprecateSymbol << ";";
- _H << nl << "-(id<ICEAsyncResult>) begin_" << name
- << ":(void(^)(ICEException*))exception sent:(void(^)(BOOL))sent"
- << deprecateSymbol << ";";
- _H << nl << "-(void) end_" << name << ":(id<ICEAsyncResult>)result"
- << deprecateSymbol << ";";
+ string marshalParams = getMarshalParams(p);
+ string unmarshalParams = getUnmarshalParams(p);
+
+ _H << nl << "-(id<ICEAsyncResult>) begin_" << name << marshalParams << deprecateSymbol << ";";
+ _H << nl << "-(id<ICEAsyncResult>) begin_" << name << marshalParams;
+ if(!marshalParams.empty())
+ {
+ _H << " exception";
+ }
+ _H << ":(void(^)(ICEException*))exception" << deprecateSymbol << ";";
+ _H << nl << "-(id<ICEAsyncResult>) begin_" << name << marshalParams;
+ if(!marshalParams.empty())
+ {
+ _H << " exception";
+ }
+ _H << ":(void(^)(ICEException*))exception sent:(void(^)(BOOL))sent" << deprecateSymbol << ";";
+
+ _H << nl << "-(void) end_" << name << unmarshalParams;
+ if(!unmarshalParams.empty())
+ {
+ _H << " result";
+ }
+ _H << ":(id<ICEAsyncResult>)result" << deprecateSymbol << ";";
}
}
diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp
index 3366c4fdc7e..d285f0498ab 100644
--- a/cpp/test/Ice/ami/AllTests.cpp
+++ b/cpp/test/Ice/ami/AllTests.cpp
@@ -1901,6 +1901,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
promise<void> promise;
b1->ice_getConnection()->flushBatchRequestsAsync(
+ Ice::CompressBatch::BasedOnProxy,
[&](const exception_ptr& ex)
{
promise.set_exception(ex);
@@ -1920,6 +1921,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
auto id = this_thread::get_id();
promise<void> promise;
p->ice_getConnection()->flushBatchRequestsAsync(
+ Ice::CompressBatch::BasedOnProxy,
[&](const exception_ptr& ex)
{
promise.set_exception(ex);
@@ -1943,6 +1945,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
promise<void> promise;
b1->ice_getConnection()->flushBatchRequestsAsync(
+ Ice::CompressBatch::BasedOnProxy,
[&](exception_ptr ex)
{
promise.set_exception(move(ex));
@@ -1977,7 +1980,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->opBatch();
- communicator->flushBatchRequestsAsync().get();
+ communicator->flushBatchRequestsAsync(Ice::CompressBatch::BasedOnProxy).get();
test(p->waitForBatch(2));
}
@@ -1994,6 +1997,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
promise<void> promise;
auto id = this_thread::get_id();
communicator->flushBatchRequestsAsync(
+ Ice::CompressBatch::BasedOnProxy,
[&](exception_ptr ex)
{
promise.set_exception(move(ex));
@@ -2022,6 +2026,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
promise<void> promise;
auto id = this_thread::get_id();
communicator->flushBatchRequestsAsync(
+ Ice::CompressBatch::BasedOnProxy,
[&](exception_ptr ex)
{
promise.set_exception(move(ex));
@@ -2056,6 +2061,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
promise<void> promise;
auto id = this_thread::get_id();
communicator->flushBatchRequestsAsync(
+ Ice::CompressBatch::BasedOnProxy,
[&](exception_ptr ex)
{
promise.set_exception(move(ex));
@@ -2093,6 +2099,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
promise<void> promise;
auto id = this_thread::get_id();
communicator->flushBatchRequestsAsync(
+ Ice::CompressBatch::BasedOnProxy,
[&](exception_ptr ex)
{
promise.set_exception(move(ex));
@@ -2287,7 +2294,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
//
// This test requires two threads in the server's thread pool: one will block in sleep() and the other
// will process the CloseConnection message.
- //
+ //
p->ice_ping();
auto con = p->ice_getConnection();
auto s = make_shared<promise<void>>();
@@ -2346,7 +2353,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
//
// Local case: start a lengthy operation and then close the connection forcefully on the client side.
// There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException.
- //
+ //
p->ice_ping();
auto con = p->ice_getConnection();
auto s = make_shared<promise<void>>();
@@ -3208,7 +3215,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->opBatch();
FlushCallbackPtr cb = new FlushCallback();
- Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
cb->check();
test(r->isSent());
@@ -3217,7 +3224,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
// Ensure it also works with a twoway proxy
cb = new FlushCallback();
- r = p->ice_getConnection()->begin_flushBatchRequests(
+ r = p->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
cb->check();
test(r->isSent());
@@ -3234,7 +3241,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->opBatch();
FlushCallbackPtr cb = new FlushCallback(cookie);
- b1->ice_getConnection()->begin_flushBatchRequests(
+ b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie);
cb->check();
test(p->waitForBatch(2));
@@ -3251,7 +3258,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushExCallbackPtr cb = new FlushExCallback();
- Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback(cb, &FlushExCallback::completedAsync, &FlushExCallback::sentAsync));
cb->check();
test(!r->isSent());
@@ -3270,7 +3277,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushExCallbackPtr cb = new FlushExCallback(cookie);
- b1->ice_getConnection()->begin_flushBatchRequests(
+ b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback(cb, &FlushExCallback::completedAsync, &FlushExCallback::sentAsync), cookie);
cb->check();
test(p->opBatchCount() == 0);
@@ -3286,7 +3293,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->opBatch();
FlushCallbackPtr cb = new FlushCallback();
- Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback_Connection_flushBatchRequests(cb, &FlushCallback::exception, &FlushCallback::sent));
cb->check();
test(r->isSent());
@@ -3304,7 +3311,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->opBatch();
FlushCallbackPtr cb = new FlushCallback(cookie);
- b1->ice_getConnection()->begin_flushBatchRequests(
+ b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback_Connection_flushBatchRequests(cb, &FlushCallback::exceptionWC,
&FlushCallback::sentWC), cookie);
cb->check();
@@ -3322,7 +3329,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushExCallbackPtr cb = new FlushExCallback();
- Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback_Connection_flushBatchRequests(cb, &FlushExCallback::exception,
&FlushExCallback::sent));
cb->check();
@@ -3342,7 +3349,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushExCallbackPtr cb = new FlushExCallback(cookie);
- b1->ice_getConnection()->begin_flushBatchRequests(
+ b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback_Connection_flushBatchRequests(cb, &FlushExCallback::exceptionWC,
&FlushExCallback::sentWC), cookie);
cb->check();
@@ -3365,7 +3372,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->opBatch();
FlushCallbackPtr cb = new FlushCallback();
- Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
cb->check();
test(r->isSent());
@@ -3383,7 +3390,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->opBatch();
FlushCallbackPtr cb = new FlushCallback(cookie);
- communicator->begin_flushBatchRequests(
+ communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie);
cb->check();
test(p->waitForBatch(2));
@@ -3400,7 +3407,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
- Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
cb->check();
test(r->isSent()); // Exceptions are ignored!
@@ -3419,7 +3426,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback(cookie);
- communicator->begin_flushBatchRequests(
+ communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie);
cb->check();
test(p->opBatchCount() == 0);
@@ -3442,7 +3449,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->opBatch();
b2->opBatch();
FlushCallbackPtr cb = new FlushCallback();
- Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
cb->check();
test(r->isSent());
@@ -3470,7 +3477,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->opBatch();
b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
- Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
cb->check();
test(r->isSent()); // Exceptions are ignored!
@@ -3498,7 +3505,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
- Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
cb->check();
test(r->isSent()); // Exceptions are ignored!
@@ -3516,7 +3523,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->opBatch();
FlushCallbackPtr cb = new FlushCallback();
- Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception,
&FlushCallback::sent));
cb->check();
@@ -3535,7 +3542,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->opBatch();
FlushCallbackPtr cb = new FlushCallback(cookie);
- communicator->begin_flushBatchRequests(
+ communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exceptionWC,
&FlushCallback::sentWC), cookie);
cb->check();
@@ -3553,7 +3560,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
- Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception,
&FlushCallback::sent));
cb->check();
@@ -3573,7 +3580,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->opBatch();
b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback(cookie);
- communicator->begin_flushBatchRequests(
+ communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exceptionWC,
&FlushCallback::sentWC), cookie);
cb->check();
@@ -3598,7 +3605,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->opBatch();
b2->opBatch();
FlushCallbackPtr cb = new FlushCallback();
- Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception,
&FlushCallback::sent));
cb->check();
@@ -3627,7 +3634,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->opBatch();
b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
- Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception,
&FlushCallback::sent));
cb->check();
@@ -3656,7 +3663,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
- Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
+ Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy,
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception,
&FlushCallback::sent));
cb->check();
@@ -3775,7 +3782,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Ice::ConnectionPtr con = p->ice_getConnection();
p2 = p->ice_batchOneway();
p2->ice_ping();
- r = con->begin_flushBatchRequests();
+ r = con->begin_flushBatchRequests(Ice::BasedOnProxy);
test(r->getConnection() == con);
test(r->getCommunicator() == communicator);
test(!r->getProxy()); // Expected
@@ -3786,7 +3793,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
//
p2 = p->ice_batchOneway();
p2->ice_ping();
- r = communicator->begin_flushBatchRequests();
+ r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy);
test(!r->getConnection()); // Expected
test(r->getCommunicator() == communicator);
test(!r->getProxy()); // Expected
@@ -3961,7 +3968,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
//
// This test requires two threads in the server's thread pool: one will block in sleep() and the other
// will process the CloseConnection message.
- //
+ //
p->ice_ping();
Ice::ConnectionPtr con = p->ice_getConnection();
Ice::AsyncResultPtr r = p->begin_sleep(100);
@@ -4009,7 +4016,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
//
// Local case: start a lengthy operation and then close the connection forcefully on the client side.
// There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException.
- //
+ //
p->ice_ping();
Ice::ConnectionPtr con = p->ice_getConnection();
Ice::AsyncResultPtr r = p->begin_sleep(100);
diff --git a/cpp/test/Ice/operations/BatchOneways.cpp b/cpp/test/Ice/operations/BatchOneways.cpp
index e3d261cf7e3..c7466821faa 100644
--- a/cpp/test/Ice/operations/BatchOneways.cpp
+++ b/cpp/test/Ice/operations/BatchOneways.cpp
@@ -89,6 +89,11 @@ batchOneways(const Test::MyClassPrxPtr& p)
Test::MyClassPrxPtr batch = ICE_UNCHECKED_CAST(Test::MyClassPrx, p->ice_batchOneway());
batch->ice_flushBatchRequests(); // Empty flush
+ if(batch->ice_getConnection())
+ {
+ batch->ice_getConnection()->flushBatchRequests(Ice::BasedOnProxy);
+ }
+ batch->ice_getCommunicator()->flushBatchRequests(Ice::BasedOnProxy);
int i;
p->opByteSOnewayCallCount(); // Reset the call count
@@ -154,11 +159,10 @@ batchOneways(const Test::MyClassPrxPtr& p)
BatchRequestInterceptorIPtr interceptor = ICE_MAKE_SHARED(BatchRequestInterceptorI);
#if defined(ICE_CPP11_MAPPING)
- initData.batchRequestInterceptor =
- [=](const Ice::BatchRequest& request, int count, int size)
- {
- interceptor->enqueue(request, count, size);
- };
+ initData.batchRequestInterceptor = [=](const Ice::BatchRequest& request, int count, int size)
+ {
+ interceptor->enqueue(request, count, size);
+ };
#else
initData.batchRequestInterceptor = interceptor;
#endif
@@ -195,4 +199,38 @@ batchOneways(const Test::MyClassPrxPtr& p)
ic->destroy();
}
+ if(batch->ice_getConnection() &&
+ p->ice_getCommunicator()->getProperties()->getProperty("Ice.Override.Compress") == "")
+ {
+ Ice::ObjectPrxPtr prx = batch->ice_getConnection()->createProxy(batch->ice_getIdentity())->ice_batchOneway();
+
+ Test::MyClassPrxPtr batch1 = ICE_UNCHECKED_CAST(Test::MyClassPrx, prx->ice_compress(false));
+ Test::MyClassPrxPtr batch2 = ICE_UNCHECKED_CAST(Test::MyClassPrx, prx->ice_compress(true));
+ Test::MyClassPrxPtr batch3 = ICE_UNCHECKED_CAST(Test::MyClassPrx, prx->ice_identity(identity));
+
+ batch1->opByteSOneway(bs1);
+ batch1->opByteSOneway(bs1);
+ batch1->opByteSOneway(bs1);
+ batch1->ice_getConnection()->flushBatchRequests(Ice::Yes);
+
+ batch2->opByteSOneway(bs1);
+ batch2->opByteSOneway(bs1);
+ batch2->opByteSOneway(bs1);
+ batch1->ice_getConnection()->flushBatchRequests(Ice::No);
+
+ batch1->opByteSOneway(bs1);
+ batch1->opByteSOneway(bs1);
+ batch1->opByteSOneway(bs1);
+ batch1->ice_getConnection()->flushBatchRequests(Ice::BasedOnProxy);
+
+ batch1->opByteSOneway(bs1);
+ batch2->opByteSOneway(bs1);
+ batch1->opByteSOneway(bs1);
+ batch1->ice_getConnection()->flushBatchRequests(Ice::BasedOnProxy);
+
+ batch1->opByteSOneway(bs1);
+ batch3->opByteSOneway(bs1);
+ batch1->opByteSOneway(bs1);
+ batch1->ice_getConnection()->flushBatchRequests(Ice::BasedOnProxy);
+ }
}
diff --git a/csharp/src/Ice/BatchRequestQueue.cs b/csharp/src/Ice/BatchRequestQueue.cs
index 5507e879933..a2d3ee13857 100644
--- a/csharp/src/Ice/BatchRequestQueue.cs
+++ b/csharp/src/Ice/BatchRequestQueue.cs
@@ -27,7 +27,7 @@ namespace IceInternal
public void enqueue()
{
- _queue.enqueueBatchRequest();
+ _queue.enqueueBatchRequest(_proxy);
}
public Ice.ObjectPrx getProxy()
@@ -118,6 +118,11 @@ namespace IceInternal
}
else
{
+ bool compress;
+ if(((Ice.ObjectPrxHelperBase)proxy).iceReference().getCompressOverride(out compress))
+ {
+ _batchCompress |= compress;
+ }
_batchMarker = _batchStream.size();
++_batchRequestNum;
}
@@ -150,12 +155,13 @@ namespace IceInternal
}
public int
- swap(Ice.OutputStream os)
+ swap(Ice.OutputStream os, out bool compress)
{
lock(this)
{
if(_batchRequestNum == 0)
{
+ compress = false;
return 0;
}
@@ -172,12 +178,14 @@ namespace IceInternal
}
int requestNum = _batchRequestNum;
+ compress = _batchCompress;
_batchStream.swap(os);
//
// Reset the batch.
//
_batchRequestNum = 0;
+ _batchCompress = false;
_batchStream.writeBlob(Protocol.requestBatchHdr);
_batchMarker = _batchStream.size();
if(lastRequest != null)
@@ -221,9 +229,14 @@ namespace IceInternal
}
}
- internal void enqueueBatchRequest()
+ internal void enqueueBatchRequest(Ice.ObjectPrx proxy)
{
Debug.Assert(_batchMarker < _batchStream.size());
+ bool compress;
+ if(((Ice.ObjectPrxHelperBase)proxy).iceReference().getCompressOverride(out compress))
+ {
+ _batchCompress |= compress;
+ }
_batchMarker = _batchStream.size();
++_batchRequestNum;
}
@@ -234,6 +247,7 @@ namespace IceInternal
private bool _batchStreamCanFlush;
private int _batchRequestNum;
private int _batchMarker;
+ private bool _batchCompress;
private BatchRequestI _request;
private Ice.LocalException _exception;
private int _maxSize;
diff --git a/csharp/src/Ice/CommunicatorI.cs b/csharp/src/Ice/CommunicatorI.cs
index 73a9d0b0dbb..1528a116a06 100644
--- a/csharp/src/Ice/CommunicatorI.cs
+++ b/csharp/src/Ice/CommunicatorI.cs
@@ -163,23 +163,24 @@ namespace Ice
return _instance.pluginManager();
}
- public void flushBatchRequests()
+ public void flushBatchRequests(Ice.CompressBatch compressBatch)
{
- flushBatchRequestsAsync().Wait();
+ flushBatchRequestsAsync(compressBatch).Wait();
}
- public Task flushBatchRequestsAsync(IProgress<bool> progress = null,
+ public Task flushBatchRequestsAsync(Ice.CompressBatch compressBatch,
+ IProgress<bool> progress = null,
CancellationToken cancel = new CancellationToken())
{
var completed = new FlushBatchTaskCompletionCallback(progress, cancel);
var outgoing = new CommunicatorFlushBatchAsync(_instance, completed);
- outgoing.invoke(_flushBatchRequests_name);
+ outgoing.invoke(_flushBatchRequests_name, compressBatch);
return completed.Task;
}
- public AsyncResult begin_flushBatchRequests()
+ public AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch)
{
- return begin_flushBatchRequests(null, null);
+ return begin_flushBatchRequests(compressBatch, null, null);
}
private const string _flushBatchRequests_name = "flushBatchRequests";
@@ -214,11 +215,15 @@ namespace Ice
}
};
- public AsyncResult begin_flushBatchRequests(AsyncCallback cb, object cookie)
+ public AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch, AsyncCallback cb, object cookie)
{
- var result = new CommunicatorFlushBatchCompletionCallback(this, _instance, _flushBatchRequests_name, cookie, cb);
+ var result = new CommunicatorFlushBatchCompletionCallback(this,
+ _instance,
+ _flushBatchRequests_name,
+ cookie,
+ cb);
var outgoing = new CommunicatorFlushBatchAsync(_instance, result);
- outgoing.invoke(_flushBatchRequests_name);
+ outgoing.invoke(_flushBatchRequests_name, compressBatch);
return result;
}
diff --git a/csharp/src/Ice/ConnectionFactory.cs b/csharp/src/Ice/ConnectionFactory.cs
index 936f2b3da57..51e9c992e1e 100644
--- a/csharp/src/Ice/ConnectionFactory.cs
+++ b/csharp/src/Ice/ConnectionFactory.cs
@@ -255,7 +255,7 @@ namespace IceInternal
}
}
- public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync)
+ public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)
{
ICollection<Ice.ConnectionI> c = new List<Ice.ConnectionI>();
@@ -280,7 +280,7 @@ namespace IceInternal
{
try
{
- outAsync.flushConnection(conn);
+ outAsync.flushConnection(conn, compressBatch);
}
catch(Ice.LocalException)
{
@@ -1292,7 +1292,7 @@ namespace IceInternal
}
}
- public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync)
+ public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)
{
//
// connections() is synchronized, no need to synchronize here.
@@ -1301,7 +1301,7 @@ namespace IceInternal
{
try
{
- outAsync.flushConnection(connection);
+ outAsync.flushConnection(connection, compressBatch);
}
catch(Ice.LocalException)
{
diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs
index 63a5148742a..a1cbdde8fcf 100644
--- a/csharp/src/Ice/ConnectionI.cs
+++ b/csharp/src/Ice/ConnectionI.cs
@@ -473,9 +473,9 @@ namespace Ice
return _batchRequestQueue;
}
- public void flushBatchRequests()
+ public void flushBatchRequests(CompressBatch compressBatch)
{
- flushBatchRequestsAsync().Wait();
+ flushBatchRequestsAsync(compressBatch).Wait();
}
private class ConnectionFlushBatchCompletionCallback : AsyncResultCompletionCallback
@@ -517,21 +517,24 @@ namespace Ice
private Connection _connection;
}
- public Task flushBatchRequestsAsync(IProgress<bool> progress = null,
+ public Task flushBatchRequestsAsync(CompressBatch compressBatch,
+ IProgress<bool> progress = null,
CancellationToken cancel = new CancellationToken())
{
var completed = new FlushBatchTaskCompletionCallback(progress, cancel);
var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
- outgoing.invoke(_flushBatchRequests_name);
+ outgoing.invoke(_flushBatchRequests_name, compressBatch);
return completed.Task;
}
- public AsyncResult begin_flushBatchRequests(AsyncCallback cb = null, object cookie = null)
+ public AsyncResult begin_flushBatchRequests(CompressBatch compressBatch,
+ AsyncCallback cb = null,
+ object cookie = null)
{
var result = new ConnectionFlushBatchCompletionCallback(this, _communicator, _instance,
_flushBatchRequests_name, cookie, cb);
var outgoing = new ConnectionFlushBatchAsync(this, _instance, result);
- outgoing.invoke(_flushBatchRequests_name);
+ outgoing.invoke(_flushBatchRequests_name, compressBatch);
return result;
}
diff --git a/csharp/src/Ice/ObjectAdapterFactory.cs b/csharp/src/Ice/ObjectAdapterFactory.cs
index 17ccfa91b1c..0abdea67a62 100644
--- a/csharp/src/Ice/ObjectAdapterFactory.cs
+++ b/csharp/src/Ice/ObjectAdapterFactory.cs
@@ -28,10 +28,10 @@ namespace IceInternal
}
adapters = new List<Ice.ObjectAdapterI>(_adapters);
-
+
_instance = null;
_communicator = null;
-
+
System.Threading.Monitor.PulseAll(this);
}
@@ -44,7 +44,7 @@ namespace IceInternal
adapter.deactivate();
}
}
-
+
public void waitForShutdown()
{
List<Ice.ObjectAdapterI> adapters;
@@ -57,7 +57,7 @@ namespace IceInternal
{
System.Threading.Monitor.Wait(this);
}
-
+
adapters = new List<Ice.ObjectAdapterI>(_adapters);
}
@@ -110,13 +110,13 @@ namespace IceInternal
{
adapters = new List<Ice.ObjectAdapterI>(_adapters);
}
-
+
foreach(Ice.ObjectAdapterI adapter in adapters)
{
adapter.updateConnectionObservers();
}
}
-
+
public void
updateThreadObservers()
{
@@ -125,13 +125,13 @@ namespace IceInternal
{
adapters = new List<Ice.ObjectAdapterI>(_adapters);
}
-
+
foreach(Ice.ObjectAdapterI adapter in adapters)
{
adapter.updateThreadObservers();
}
}
-
+
public Ice.ObjectAdapter createObjectAdapter(string name, Ice.RouterPrx router)
{
lock(this)
@@ -140,7 +140,7 @@ namespace IceInternal
{
throw new Ice.CommunicatorDestroyedException();
}
-
+
Ice.ObjectAdapterI adapter = null;
if(name.Length == 0)
{
@@ -163,7 +163,7 @@ namespace IceInternal
return adapter;
}
}
-
+
public Ice.ObjectAdapter findObjectAdapter(Ice.ObjectPrx proxy)
{
List<Ice.ObjectAdapterI> adapters;
@@ -173,10 +173,10 @@ namespace IceInternal
{
return null;
}
-
+
adapters = new List<Ice.ObjectAdapterI>(_adapters);
}
-
+
foreach(Ice.ObjectAdapterI adapter in adapters)
{
try
@@ -209,7 +209,7 @@ namespace IceInternal
}
}
- public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync)
+ public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)
{
List<Ice.ObjectAdapterI> adapters;
lock(this)
@@ -219,10 +219,10 @@ namespace IceInternal
foreach(Ice.ObjectAdapterI adapter in adapters)
{
- adapter.flushAsyncBatchRequests(outAsync);
+ adapter.flushAsyncBatchRequests(compressBatch, outAsync);
}
}
-
+
//
// Only for use by Instance.
//
@@ -233,7 +233,7 @@ namespace IceInternal
_adapterNamesInUse = new HashSet<string>();
_adapters = new List<Ice.ObjectAdapterI>();
}
-
+
private Instance _instance;
private Ice.Communicator _communicator;
private HashSet<string> _adapterNamesInUse;
diff --git a/csharp/src/Ice/ObjectAdapterI.cs b/csharp/src/Ice/ObjectAdapterI.cs
index 5929b46fd3c..01892c30401 100644
--- a/csharp/src/Ice/ObjectAdapterI.cs
+++ b/csharp/src/Ice/ObjectAdapterI.cs
@@ -688,7 +688,7 @@ namespace Ice
}
}
- public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync)
+ public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)
{
List<IncomingConnectionFactory> f;
lock(this)
@@ -698,7 +698,7 @@ namespace Ice
foreach(IncomingConnectionFactory factory in f)
{
- factory.flushAsyncBatchRequests(outAsync);
+ factory.flushAsyncBatchRequests(compressBatch, outAsync);
}
}
diff --git a/csharp/src/Ice/OutgoingAsync.cs b/csharp/src/Ice/OutgoingAsync.cs
index db0e76d2bc7..74828721b6a 100644
--- a/csharp/src/Ice/OutgoingAsync.cs
+++ b/csharp/src/Ice/OutgoingAsync.cs
@@ -1140,7 +1140,8 @@ namespace IceInternal
{
Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.iceReference().getProtocol()));
observer_ = ObserverHelper.get(proxy_, operation, null);
- _batchRequestNum = proxy_.iceGetBatchRequestQueue().swap(os_);
+ bool compress; // Not used for proxy flush batch requests.
+ _batchRequestNum = proxy_.iceGetBatchRequestQueue().swap(os_, out compress);
invokeImpl(true); // userThread = true
}
@@ -1198,13 +1199,14 @@ namespace IceInternal
_connection = connection;
}
- public void invoke(string operation)
+ public void invoke(string operation, Ice.CompressBatch compressBatch)
{
observer_ = ObserverHelper.get(instance_, operation);
try
{
int status;
- int batchRequestNum = _connection.getBatchRequestQueue().swap(os_);
+ bool compress;
+ int batchRequestNum = _connection.getBatchRequestQueue().swap(os_, out compress);
if(batchRequestNum == 0)
{
status = AsyncStatusSent;
@@ -1215,7 +1217,20 @@ namespace IceInternal
}
else
{
- status = _connection.sendAsyncRequest(this, false, false, batchRequestNum);
+ bool comp;
+ if(compressBatch == Ice.CompressBatch.Yes)
+ {
+ comp = true;
+ }
+ else if(compressBatch == Ice.CompressBatch.No)
+ {
+ comp = false;
+ }
+ else
+ {
+ comp = compress;
+ }
+ status = _connection.sendAsyncRequest(this, comp, false, batchRequestNum);
}
if((status & AsyncStatusSent) != 0)
@@ -1311,7 +1326,7 @@ namespace IceInternal
_useCount = 1;
}
- public void flushConnection(Ice.ConnectionI con)
+ public void flushConnection(Ice.ConnectionI con, Ice.CompressBatch compressBatch)
{
lock(this)
{
@@ -1321,14 +1336,28 @@ namespace IceInternal
try
{
var flushBatch = new FlushBatch(this, instance_, _observer);
- int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs());
+ bool compress;
+ int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs(), out compress);
if(batchRequestNum == 0)
{
flushBatch.sent();
}
else
{
- con.sendAsyncRequest(flushBatch, false, false, batchRequestNum);
+ bool comp;
+ if(compressBatch == Ice.CompressBatch.Yes)
+ {
+ comp = true;
+ }
+ else if(compressBatch == Ice.CompressBatch.No)
+ {
+ comp = false;
+ }
+ else
+ {
+ comp = compress;
+ }
+ con.sendAsyncRequest(flushBatch, comp, false, batchRequestNum);
}
}
catch(Ice.LocalException)
@@ -1338,15 +1367,15 @@ namespace IceInternal
}
}
- public void invoke(string operation)
+ public void invoke(string operation, Ice.CompressBatch compressBatch)
{
_observer = ObserverHelper.get(instance_, operation);
if(_observer != null)
{
_observer.attach();
}
- instance_.outgoingConnectionFactory().flushAsyncBatchRequests(this);
- instance_.objectAdapterFactory().flushAsyncBatchRequests(this);
+ instance_.outgoingConnectionFactory().flushAsyncBatchRequests(compressBatch, this);
+ instance_.objectAdapterFactory().flushAsyncBatchRequests(compressBatch, this);
check(true);
}
diff --git a/csharp/src/Ice/Reference.cs b/csharp/src/Ice/Reference.cs
index cfd1019cc74..f786dd5e18e 100644
--- a/csharp/src/Ice/Reference.cs
+++ b/csharp/src/Ice/Reference.cs
@@ -237,6 +237,25 @@ namespace IceInternal
}
}
+ public bool getCompressOverride(out bool compress)
+ {
+ DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ compress = defaultsAndOverrides.overrideCompressValue;
+ }
+ else if(overrideCompress_)
+ {
+ compress = compress_;
+ }
+ else
+ {
+ compress = false;
+ return false;
+ }
+ return true;
+ }
+
public abstract bool isIndirect();
public abstract bool isWellKnown();
@@ -709,7 +728,7 @@ namespace IceInternal
_fixedConnection.throwException(); // Throw in case our connection is already destroyed.
- bool compress;
+ bool compress = false;
if(defaultsAndOverrides.overrideCompress)
{
compress = defaultsAndOverrides.overrideCompressValue;
@@ -718,10 +737,6 @@ namespace IceInternal
{
compress = compress_;
}
- else
- {
- compress = _fixedConnection.endpoint().compress();
- }
return proxy.iceSetRequestHandler(new ConnectionRequestHandler(this, _fixedConnection, compress));
}
diff --git a/csharp/test/Ice/ami/AllTests.cs b/csharp/test/Ice/ami/AllTests.cs
index 0a515a0b2e9..d6f4f68e4d3 100644
--- a/csharp/test/Ice/ami/AllTests.cs
+++ b/csharp/test/Ice/ami/AllTests.cs
@@ -2147,6 +2147,7 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
SentCallback cb = new SentCallback();
Task t = b1.ice_getConnection().flushBatchRequestsAsync(
+ Ice.CompressBatch.BasedOnProxy,
progress:new Progress(
sentSyncrhonously =>
{
@@ -2168,6 +2169,7 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
Task t = b1.ice_getConnection().flushBatchRequestsAsync(
+ Ice.CompressBatch.BasedOnProxy,
progress: new Progress(
sentSynchronously =>
{
@@ -2189,7 +2191,10 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
b1.opBatch();
FlushCallback cb = new FlushCallback(cookie);
- Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(cb.completedAsync, cookie);
+ Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
+ cb.completedAsync,
+ cookie);
r.whenSent(cb.sentAsync);
cb.check();
test(r.isSent());
@@ -2207,7 +2212,10 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushExCallback cb = new FlushExCallback(cookie);
- Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(cb.completedAsync, cookie);
+ Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
+ cb.completedAsync,
+ cookie);
r.whenSent(cb.sentAsync);
cb.check();
test(!r.isSent());
@@ -2225,7 +2233,7 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
b1.opBatch();
FlushCallback cb = new FlushCallback();
- Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests();
+ Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(cb.exception);
r.whenSent(cb.sent);
cb.check();
@@ -2244,7 +2252,7 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushExCallback cb = new FlushExCallback();
- Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests();
+ Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(cb.exception);
r.whenSent(cb.sent);
cb.check();
@@ -2274,6 +2282,7 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
FlushCallback cb = new FlushCallback(cookie);
Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
(Ice.AsyncResult result) =>
{
cb.completedAsync(result);
@@ -2300,6 +2309,7 @@ public class AllTests : TestCommon.AllTests
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushExCallback cb = new FlushExCallback(cookie);
Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
(Ice.AsyncResult result) =>
{
cb.completedAsync(result);
@@ -2325,7 +2335,7 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
b1.opBatch();
FlushCallback cb = new FlushCallback();
- Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests();
+ Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(
(Ice.Exception ex) =>
{
@@ -2352,7 +2362,7 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushExCallback cb = new FlushExCallback();
- Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests();
+ Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(
(Ice.Exception ex) =>
{
@@ -2386,6 +2396,7 @@ public class AllTests : TestCommon.AllTests
SentCallback cb = new SentCallback();
Task t = communicator.flushBatchRequestsAsync(
+ Ice.CompressBatch.BasedOnProxy,
progress: new Progress(
sentSynchronously =>
{
@@ -2407,6 +2418,7 @@ public class AllTests : TestCommon.AllTests
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
SentCallback cb = new SentCallback();
Task t = communicator.flushBatchRequestsAsync(
+ Ice.CompressBatch.BasedOnProxy,
progress:new Progress(
sentSynchronously =>
{
@@ -2435,6 +2447,7 @@ public class AllTests : TestCommon.AllTests
SentCallback cb = new SentCallback();
Task t = communicator.flushBatchRequestsAsync(
+ Ice.CompressBatch.BasedOnProxy,
new Progress(sentSynchronously =>
{
cb.sent(sentSynchronously);
@@ -2462,6 +2475,7 @@ public class AllTests : TestCommon.AllTests
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
SentCallback cb = new SentCallback();
Task t = communicator.flushBatchRequestsAsync(
+ Ice.CompressBatch.BasedOnProxy,
new Progress(
sentSynchronously =>
{
@@ -2490,6 +2504,7 @@ public class AllTests : TestCommon.AllTests
b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
SentCallback cb = new SentCallback();
Task t = communicator.flushBatchRequestsAsync(
+ Ice.CompressBatch.BasedOnProxy,
new Progress(
sentSynchronously =>
{
@@ -2512,7 +2527,9 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
b1.opBatch();
FlushCallback cb = new FlushCallback(cookie);
- Ice.AsyncResult r = communicator.begin_flushBatchRequests(cb.completedAsync, cookie);
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy,
+ cb.completedAsync,
+ cookie);
r.whenSent(cb.sentAsync);
cb.check();
test(r.isSent());
@@ -2530,7 +2547,9 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushCallback cb = new FlushCallback(cookie);
- Ice.AsyncResult r = communicator.begin_flushBatchRequests(cb.completedAsync, cookie);
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy,
+ cb.completedAsync,
+ cookie);
r.whenSent(cb.sentAsync);
cb.check();
test(r.isSent()); // Exceptions are ignored!
@@ -2554,7 +2573,9 @@ public class AllTests : TestCommon.AllTests
b2.opBatch();
b2.opBatch();
FlushCallback cb = new FlushCallback(cookie);
- Ice.AsyncResult r = communicator.begin_flushBatchRequests(cb.completedAsync, cookie);
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy,
+ cb.completedAsync,
+ cookie);
r.whenSent(cb.sentAsync);
cb.check();
test(r.isSent());
@@ -2579,7 +2600,9 @@ public class AllTests : TestCommon.AllTests
b2.opBatch();
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushCallback cb = new FlushCallback(cookie);
- Ice.AsyncResult r = communicator.begin_flushBatchRequests(cb.completedAsync, cookie);
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy,
+ cb.completedAsync,
+ cookie);
r.whenSent(cb.sentAsync);
cb.check();
test(r.isSent()); // Exceptions are ignored!
@@ -2604,7 +2627,9 @@ public class AllTests : TestCommon.AllTests
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushCallback cb = new FlushCallback(cookie);
- Ice.AsyncResult r = communicator.begin_flushBatchRequests(cb.completedAsync, cookie);
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy,
+ cb.completedAsync,
+ cookie);
r.whenSent(cb.sentAsync);
cb.check();
test(r.isSent()); // Exceptions are ignored!
@@ -2622,7 +2647,7 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
b1.opBatch();
FlushCallback cb = new FlushCallback();
- Ice.AsyncResult r = communicator.begin_flushBatchRequests();
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(cb.exception);
r.whenSent(cb.sent);
cb.check();
@@ -2641,7 +2666,7 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushCallback cb = new FlushCallback();
- Ice.AsyncResult r = communicator.begin_flushBatchRequests();
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(cb.exception);
r.whenSent(cb.sent);
cb.check();
@@ -2665,7 +2690,7 @@ public class AllTests : TestCommon.AllTests
b2.opBatch();
b2.opBatch();
FlushCallback cb = new FlushCallback();
- Ice.AsyncResult r = communicator.begin_flushBatchRequests();
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(cb.exception);
r.whenSent(cb.sent);
cb.check();
@@ -2691,7 +2716,7 @@ public class AllTests : TestCommon.AllTests
b2.opBatch();
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushCallback cb = new FlushCallback();
- Ice.AsyncResult r = communicator.begin_flushBatchRequests();
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(cb.exception);
r.whenSent(cb.sent);
cb.check();
@@ -2717,7 +2742,7 @@ public class AllTests : TestCommon.AllTests
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushCallback cb = new FlushCallback();
- Ice.AsyncResult r = communicator.begin_flushBatchRequests();
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(cb.exception);
r.whenSent(cb.sent);
cb.check();
@@ -2744,6 +2769,7 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
FlushCallback cb = new FlushCallback(cookie);
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
(Ice.AsyncResult result) =>
{
cb.completedAsync(result);
@@ -2770,6 +2796,7 @@ public class AllTests : TestCommon.AllTests
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushCallback cb = new FlushCallback(cookie);
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
(Ice.AsyncResult result) =>
{
cb.completedAsync(result);
@@ -2801,6 +2828,7 @@ public class AllTests : TestCommon.AllTests
b2.opBatch();
FlushCallback cb = new FlushCallback(cookie);
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
(Ice.AsyncResult result) =>
{
cb.completedAsync(result);
@@ -2834,6 +2862,7 @@ public class AllTests : TestCommon.AllTests
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushCallback cb = new FlushCallback(cookie);
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
(Ice.AsyncResult result) =>
{
cb.completedAsync(result);
@@ -2867,6 +2896,7 @@ public class AllTests : TestCommon.AllTests
b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushCallback cb = new FlushCallback(cookie);
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
(Ice.AsyncResult result) =>
{
cb.completedAsync(result);
@@ -2892,7 +2922,7 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
b1.opBatch();
FlushCallback cb = new FlushCallback();
- Ice.AsyncResult r = communicator.begin_flushBatchRequests();
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(
(Ice.Exception ex) =>
{
@@ -2919,7 +2949,7 @@ public class AllTests : TestCommon.AllTests
b1.opBatch();
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushCallback cb = new FlushCallback();
- Ice.AsyncResult r = communicator.begin_flushBatchRequests();
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(
(Ice.Exception ex) =>
{
@@ -2951,7 +2981,7 @@ public class AllTests : TestCommon.AllTests
b2.opBatch();
b2.opBatch();
FlushCallback cb = new FlushCallback();
- Ice.AsyncResult r = communicator.begin_flushBatchRequests();
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(
(Ice.Exception ex) =>
{
@@ -2985,7 +3015,7 @@ public class AllTests : TestCommon.AllTests
b2.opBatch();
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushCallback cb = new FlushCallback();
- Ice.AsyncResult r = communicator.begin_flushBatchRequests();
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(
(Ice.Exception ex) =>
{
@@ -3019,7 +3049,7 @@ public class AllTests : TestCommon.AllTests
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
FlushCallback cb = new FlushCallback();
- Ice.AsyncResult r = communicator.begin_flushBatchRequests();
+ Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
r.whenCompleted(
(Ice.Exception ex) =>
{
@@ -3254,7 +3284,7 @@ public class AllTests : TestCommon.AllTests
Ice.Connection con = p.ice_getConnection();
p2 = p.ice_batchOneway() as Test.TestIntfPrx;
p2.ice_ping();
- r = con.begin_flushBatchRequests();
+ r = con.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
test(r.getConnection() == con);
test(r.getCommunicator() == communicator);
test(r.getProxy() == null); // Expected
@@ -3265,7 +3295,7 @@ public class AllTests : TestCommon.AllTests
//
p2 = p.ice_batchOneway() as Test.TestIntfPrx;
p2.ice_ping();
- r = communicator.begin_flushBatchRequests();
+ r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
test(r.getConnection() == null); // Expected
test(r.getCommunicator() == communicator);
test(r.getProxy() == null); // Expected
diff --git a/csharp/test/Ice/operations/BatchOneways.cs b/csharp/test/Ice/operations/BatchOneways.cs
index ee21049b0c7..1eb020416ba 100644
--- a/csharp/test/Ice/operations/BatchOneways.cs
+++ b/csharp/test/Ice/operations/BatchOneways.cs
@@ -162,5 +162,41 @@ class BatchOneways
ic.destroy();
}
+
+ p.ice_ping();
+ if(p.ice_getConnection() != null &&
+ p.ice_getCommunicator().getProperties().getProperty("Ice.Override.Compress").Equals(""))
+ {
+ Ice.ObjectPrx prx = p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway();
+
+ Test.MyClassPrx batchC1 = Test.MyClassPrxHelper.uncheckedCast(prx.ice_compress(false));
+ Test.MyClassPrx batchC2 = Test.MyClassPrxHelper.uncheckedCast(prx.ice_compress(true));
+ Test.MyClassPrx batchC3 = Test.MyClassPrxHelper.uncheckedCast(prx.ice_identity(identity));
+
+ batchC1.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.Yes);
+
+ batchC2.opByteSOneway(bs1);
+ batchC2.opByteSOneway(bs1);
+ batchC2.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.No);
+
+ batchC1.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
+
+ batchC1.opByteSOneway(bs1);
+ batchC2.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
+
+ batchC1.opByteSOneway(bs1);
+ batchC3.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
+ }
}
}
diff --git a/java-compat/src/Ice/src/main/java/Ice/CommunicatorI.java b/java-compat/src/Ice/src/main/java/Ice/CommunicatorI.java
index e4eea1ecf6d..6eda6d0aeaf 100644
--- a/java-compat/src/Ice/src/main/java/Ice/CommunicatorI.java
+++ b/java-compat/src/Ice/src/main/java/Ice/CommunicatorI.java
@@ -212,39 +212,40 @@ public final class CommunicatorI implements Communicator
@Override
public void
- flushBatchRequests()
+ flushBatchRequests(Ice.CompressBatch compressBatch)
{
- end_flushBatchRequests(begin_flushBatchRequests());
+ end_flushBatchRequests(begin_flushBatchRequests(compressBatch));
}
@Override
public AsyncResult
- begin_flushBatchRequests()
+ begin_flushBatchRequests(Ice.CompressBatch compressBatch)
{
- return begin_flushBatchRequestsInternal(null);
+ return begin_flushBatchRequestsInternal(compressBatch, null);
}
@Override
public AsyncResult
- begin_flushBatchRequests(Callback cb)
+ begin_flushBatchRequests(Ice.CompressBatch compressBatch, Callback cb)
{
- return begin_flushBatchRequestsInternal(cb);
+ return begin_flushBatchRequestsInternal(compressBatch, cb);
}
@Override
public AsyncResult
- begin_flushBatchRequests(Callback_Communicator_flushBatchRequests cb)
+ begin_flushBatchRequests(Ice.CompressBatch compressBatch, Callback_Communicator_flushBatchRequests cb)
{
- return begin_flushBatchRequestsInternal(cb);
+ return begin_flushBatchRequestsInternal(compressBatch, cb);
}
@Override
public AsyncResult
- begin_flushBatchRequests(IceInternal.Functional_VoidCallback responseCb,
+ begin_flushBatchRequests(Ice.CompressBatch compressBatch,
+ IceInternal.Functional_VoidCallback responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb,
IceInternal.Functional_BoolCallback sentCb)
{
- return begin_flushBatchRequestsInternal(
+ return begin_flushBatchRequestsInternal(compressBatch,
new IceInternal.Functional_CallbackBase(false, exceptionCb, sentCb)
{
@Override
@@ -265,7 +266,7 @@ public final class CommunicatorI implements Communicator
private static final String _flushBatchRequests_name = "flushBatchRequests";
private Ice.AsyncResult
- begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb)
+ begin_flushBatchRequestsInternal(Ice.CompressBatch compressBatch, IceInternal.CallbackBase cb)
{
IceInternal.OutgoingConnectionFactory connectionFactory = _instance.outgoingConnectionFactory();
IceInternal.ObjectAdapterFactory adapterFactory = _instance.objectAdapterFactory();
@@ -279,8 +280,8 @@ public final class CommunicatorI implements Communicator
_flushBatchRequests_name,
cb);
- connectionFactory.flushAsyncBatchRequests(result);
- adapterFactory.flushAsyncBatchRequests(result);
+ connectionFactory.flushAsyncBatchRequests(compressBatch, result);
+ adapterFactory.flushAsyncBatchRequests(compressBatch, result);
//
// Inform the callback that we have finished initiating all of the
diff --git a/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java b/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java
index 84131a38cfc..0130c676dd5 100644
--- a/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java
+++ b/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java
@@ -420,58 +420,62 @@ public final class ConnectionI extends IceInternal.EventHandler
}
@Override
- public void flushBatchRequests()
+ public void flushBatchRequests(Ice.CompressBatch compressBatch)
{
- end_flushBatchRequests(begin_flushBatchRequests());
+ end_flushBatchRequests(begin_flushBatchRequests(compressBatch));
}
private static final String _flushBatchRequests_name = "flushBatchRequests";
@Override
- public Ice.AsyncResult begin_flushBatchRequests()
+ public Ice.AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch)
{
- return begin_flushBatchRequestsInternal(null);
+ return begin_flushBatchRequestsInternal(compressBatch, null);
}
@Override
- public Ice.AsyncResult begin_flushBatchRequests(Callback cb)
+ public Ice.AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch, Callback cb)
{
- return begin_flushBatchRequestsInternal(cb);
+ return begin_flushBatchRequestsInternal(compressBatch, cb);
}
@Override
- public Ice.AsyncResult begin_flushBatchRequests(Callback_Connection_flushBatchRequests cb)
+ public Ice.AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch,
+ Callback_Connection_flushBatchRequests cb)
{
- return begin_flushBatchRequestsInternal(cb);
+ return begin_flushBatchRequestsInternal(compressBatch, cb);
}
@Override
- public AsyncResult begin_flushBatchRequests(IceInternal.Functional_VoidCallback responseCb,
- IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb,
- IceInternal.Functional_BoolCallback sentCb)
- {
- return begin_flushBatchRequestsInternal(new IceInternal.Functional_CallbackBase(false, exceptionCb, sentCb)
- {
- @Override
- public final void _iceCompleted(AsyncResult result)
- {
- try
- {
- result.getConnection().end_flushBatchRequests(result);
- }
- catch(Exception ex)
- {
- _exceptionCb.apply(ex);
- }
- }
- });
- }
-
- private Ice.AsyncResult begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb)
+ public AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch,
+ IceInternal.Functional_VoidCallback responseCb,
+ IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb,
+ IceInternal.Functional_BoolCallback sentCb)
+ {
+ return begin_flushBatchRequestsInternal(compressBatch,
+ new IceInternal.Functional_CallbackBase(false, exceptionCb, sentCb)
+ {
+ @Override
+ public final void _iceCompleted(AsyncResult result)
+ {
+ try
+ {
+ result.getConnection().end_flushBatchRequests(result);
+ }
+ catch(Exception ex)
+ {
+ _exceptionCb.apply(ex);
+ }
+ }
+ });
+ }
+
+ private Ice.AsyncResult begin_flushBatchRequestsInternal(Ice.CompressBatch compressBatch,
+ IceInternal.CallbackBase cb)
{
IceInternal.ConnectionFlushBatch result =
new IceInternal.ConnectionFlushBatch(this, _communicator, _instance, _flushBatchRequests_name, cb);
- result.invoke();
+ result.invoke(compressBatch);
return result;
}
diff --git a/java-compat/src/Ice/src/main/java/Ice/ObjectAdapterI.java b/java-compat/src/Ice/src/main/java/Ice/ObjectAdapterI.java
index 9ab45240784..4b4029abbd2 100644
--- a/java-compat/src/Ice/src/main/java/Ice/ObjectAdapterI.java
+++ b/java-compat/src/Ice/src/main/java/Ice/ObjectAdapterI.java
@@ -746,7 +746,7 @@ public final class ObjectAdapterI implements ObjectAdapter
}
public void
- flushAsyncBatchRequests(IceInternal.CommunicatorFlushBatch outAsync)
+ flushAsyncBatchRequests(Ice.CompressBatch compressBatch, IceInternal.CommunicatorFlushBatch outAsync)
{
List<IncomingConnectionFactory> f;
synchronized(this)
@@ -755,7 +755,7 @@ public final class ObjectAdapterI implements ObjectAdapter
}
for(IncomingConnectionFactory p : f)
{
- p.flushAsyncBatchRequests(outAsync);
+ p.flushAsyncBatchRequests(compressBatch, outAsync);
}
}
diff --git a/java-compat/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java b/java-compat/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java
index 3db2ac67177..6d7f2030ac0 100644
--- a/java-compat/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java
+++ b/java-compat/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java
@@ -23,7 +23,7 @@ public class BatchRequestQueue
@Override
public void enqueue()
{
- enqueueBatchRequest();
+ enqueueBatchRequest(_proxy);
}
@Override
@@ -59,6 +59,7 @@ public class BatchRequestQueue
_batchStream = new Ice.OutputStream(instance, Protocol.currentProtocolEncoding);
_batchStream.writeBlob(Protocol.requestBatchHdr);
_batchMarker = _batchStream.size();
+ _batchCompress = false;
_request = new BatchRequestI();
_maxSize = instance.batchAutoFlushSize();
@@ -112,6 +113,11 @@ public class BatchRequestQueue
}
else
{
+ Boolean compress = ((Ice.ObjectPrxHelperBase)proxy)._getReference().getCompressOverride();
+ if(compress != null)
+ {
+ _batchCompress |= compress.booleanValue();
+ }
_batchMarker = _batchStream.size();
++_batchRequestNum;
}
@@ -141,7 +147,7 @@ public class BatchRequestQueue
}
synchronized public int
- swap(Ice.OutputStream os)
+ swap(Ice.OutputStream os, Ice.BooleanHolder compress)
{
if(_batchRequestNum == 0)
{
@@ -161,12 +167,17 @@ public class BatchRequestQueue
}
int requestNum = _batchRequestNum;
+ if(compress != null)
+ {
+ compress.value = _batchCompress;
+ }
_batchStream.swap(os);
//
// Reset the batch.
//
_batchRequestNum = 0;
+ _batchCompress = false;
_batchStream.writeBlob(Protocol.requestBatchHdr);
_batchMarker = _batchStream.size();
if(lastRequest != null)
@@ -218,9 +229,14 @@ public class BatchRequestQueue
}
}
- private void enqueueBatchRequest()
+ private void enqueueBatchRequest(Ice.ObjectPrx proxy)
{
assert(_batchMarker < _batchStream.size());
+ Boolean compress = ((Ice.ObjectPrxHelperBase)proxy)._getReference().getCompressOverride();
+ if(compress != null)
+ {
+ _batchCompress |= compress.booleanValue();
+ }
_batchMarker = _batchStream.size();
++_batchRequestNum;
}
@@ -231,6 +247,7 @@ public class BatchRequestQueue
private boolean _batchStreamCanFlush;
private int _batchRequestNum;
private int _batchMarker;
+ private boolean _batchCompress;
private BatchRequestI _request;
private Ice.LocalException _exception;
private int _maxSize;
diff --git a/java-compat/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java b/java-compat/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java
index 309c9dd6a55..d6da14b8f24 100644
--- a/java-compat/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java
+++ b/java-compat/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java
@@ -43,7 +43,7 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI
_useCount = 1;
}
- public void flushConnection(final Ice.ConnectionI con)
+ public void flushConnection(final Ice.ConnectionI con, final Ice.CompressBatch compressBatch)
{
class FlushBatch extends OutgoingAsyncBase
{
@@ -96,7 +96,8 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI
try
{
final FlushBatch flushBatch = new FlushBatch();
- final int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs());
+ final Ice.BooleanHolder compress = new Ice.BooleanHolder();
+ final int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs(), compress);
if(batchRequestNum == 0)
{
flushBatch.sent();
@@ -108,14 +109,40 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI
@Override
public Void call() throws RetryException
{
- con.sendAsyncRequest(flushBatch, false, false, batchRequestNum);
+ boolean comp;
+ if(compressBatch == Ice.CompressBatch.Yes)
+ {
+ comp = true;
+ }
+ else if(compressBatch == Ice.CompressBatch.No)
+ {
+ comp = false;
+ }
+ else
+ {
+ comp = compress.value;
+ }
+ con.sendAsyncRequest(flushBatch, comp, false, batchRequestNum);
return null;
}
});
}
else
{
- con.sendAsyncRequest(flushBatch, false, false, batchRequestNum);
+ boolean comp;
+ if(compressBatch == Ice.CompressBatch.Yes)
+ {
+ comp = true;
+ }
+ else if(compressBatch == Ice.CompressBatch.No)
+ {
+ comp = false;
+ }
+ else
+ {
+ comp = compress.value;
+ }
+ con.sendAsyncRequest(flushBatch, comp, false, batchRequestNum);
}
}
catch(RetryException ex)
diff --git a/java-compat/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java b/java-compat/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java
index 7824d3154c1..e1839020fa6 100644
--- a/java-compat/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java
+++ b/java-compat/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java
@@ -42,12 +42,12 @@ public class ConnectionFlushBatch extends OutgoingAsyncBase
return _connection;
}
- public void invoke()
+ public void invoke(final Ice.CompressBatch compressBatch)
{
try
{
- final int batchRequestNum = _connection.getBatchRequestQueue().swap(_os);
-
+ final Ice.BooleanHolder compress = new Ice.BooleanHolder();
+ final int batchRequestNum = _connection.getBatchRequestQueue().swap(_os, compress);
int status;
if(batchRequestNum == 0)
{
@@ -64,13 +64,39 @@ public class ConnectionFlushBatch extends OutgoingAsyncBase
@Override
public Integer call() throws RetryException
{
- return _connection.sendAsyncRequest(ConnectionFlushBatch.this, false, false, batchRequestNum);
+ boolean comp;
+ if(compressBatch == Ice.CompressBatch.Yes)
+ {
+ comp = true;
+ }
+ else if(compressBatch == Ice.CompressBatch.No)
+ {
+ comp = false;
+ }
+ else
+ {
+ comp = compress.value;
+ }
+ return _connection.sendAsyncRequest(ConnectionFlushBatch.this, comp, false, batchRequestNum);
}
});
}
else
{
- status = _connection.sendAsyncRequest(this, false, false, batchRequestNum);
+ boolean comp;
+ if(compressBatch == Ice.CompressBatch.Yes)
+ {
+ comp = true;
+ }
+ else if(compressBatch == Ice.CompressBatch.No)
+ {
+ comp = false;
+ }
+ else
+ {
+ comp = compress.value;
+ }
+ status = _connection.sendAsyncRequest(this, comp, false, batchRequestNum);
}
if((status & AsyncStatus.Sent) > 0)
diff --git a/java-compat/src/Ice/src/main/java/IceInternal/FixedReference.java b/java-compat/src/Ice/src/main/java/IceInternal/FixedReference.java
index ebc59c5e690..b9e84ce5d0e 100644
--- a/java-compat/src/Ice/src/main/java/IceInternal/FixedReference.java
+++ b/java-compat/src/Ice/src/main/java/IceInternal/FixedReference.java
@@ -250,7 +250,7 @@ public class FixedReference extends Reference
_fixedConnection.throwException(); // Throw in case our connection is already destroyed.
- boolean compress;
+ boolean compress = false;
if(defaultsAndOverrides.overrideCompress)
{
compress = defaultsAndOverrides.overrideCompressValue;
@@ -259,10 +259,6 @@ public class FixedReference extends Reference
{
compress = _compress;
}
- else
- {
- compress = _fixedConnection.endpoint().compress();
- }
RequestHandler handler = new ConnectionRequestHandler(this, _fixedConnection, compress);
if(getInstance().queueRequests())
diff --git a/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java b/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java
index 108df4607e0..60514dfd230 100644
--- a/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java
+++ b/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java
@@ -196,13 +196,13 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
}
public void
- flushAsyncBatchRequests(CommunicatorFlushBatch outAsync)
+ flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync)
{
for(Ice.ConnectionI c : connections()) // connections() is synchronized, no need to synchronize here.
{
try
{
- outAsync.flushConnection(c);
+ outAsync.flushConnection(c, compressBatch);
}
catch(Ice.LocalException ex)
{
diff --git a/java-compat/src/Ice/src/main/java/IceInternal/ObjectAdapterFactory.java b/java-compat/src/Ice/src/main/java/IceInternal/ObjectAdapterFactory.java
index bfec67de6a6..c44a0b2ef40 100644
--- a/java-compat/src/Ice/src/main/java/IceInternal/ObjectAdapterFactory.java
+++ b/java-compat/src/Ice/src/main/java/IceInternal/ObjectAdapterFactory.java
@@ -218,7 +218,7 @@ public final class ObjectAdapterFactory
}
public void
- flushAsyncBatchRequests(CommunicatorFlushBatch outAsync)
+ flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync)
{
java.util.List<Ice.ObjectAdapterI> adapters;
synchronized(this)
@@ -228,7 +228,7 @@ public final class ObjectAdapterFactory
for(Ice.ObjectAdapterI adapter : adapters)
{
- adapter.flushAsyncBatchRequests(outAsync);
+ adapter.flushAsyncBatchRequests(compressBatch, outAsync);
}
}
diff --git a/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java b/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java
index b51856cf3e6..3102f3a500f 100644
--- a/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java
+++ b/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java
@@ -271,7 +271,7 @@ public final class OutgoingConnectionFactory
}
public void
- flushAsyncBatchRequests(CommunicatorFlushBatch outAsync)
+ flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync)
{
java.util.List<Ice.ConnectionI> c = new java.util.LinkedList<Ice.ConnectionI>();
@@ -296,7 +296,7 @@ public final class OutgoingConnectionFactory
{
try
{
- outAsync.flushConnection(conn);
+ outAsync.flushConnection(conn, compressBatch);
}
catch(Ice.LocalException ex)
{
diff --git a/java-compat/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java b/java-compat/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java
index 7a53d1b2c24..c83f46d964a 100644
--- a/java-compat/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java
+++ b/java-compat/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java
@@ -28,7 +28,7 @@ public class ProxyFlushBatch extends ProxyOutgoingAsyncBase
{
super(prx, operation, callback);
_observer = ObserverHelper.get(prx, operation);
- _batchRequestNum = prx._getBatchRequestQueue().swap(_os);
+ _batchRequestNum = prx._getBatchRequestQueue().swap(_os, null);
}
@Override
diff --git a/java-compat/src/Ice/src/main/java/IceInternal/Reference.java b/java-compat/src/Ice/src/main/java/IceInternal/Reference.java
index a1869b1fcda..c3b0e1b8d76 100644
--- a/java-compat/src/Ice/src/main/java/IceInternal/Reference.java
+++ b/java-compat/src/Ice/src/main/java/IceInternal/Reference.java
@@ -247,6 +247,21 @@ public abstract class Reference implements Cloneable
return _hashValue;
}
+ public java.lang.Boolean
+ getCompressOverride()
+ {
+ DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ return Boolean.valueOf(defaultsAndOverrides.overrideCompressValue);
+ }
+ else if(_overrideCompress)
+ {
+ return Boolean.valueOf(_compress);
+ }
+ return null; // Null indicates that compress is not overriden.
+ }
+
//
// Utility methods
//
diff --git a/java-compat/test/src/main/java/test/Ice/ami/AMI.java b/java-compat/test/src/main/java/test/Ice/ami/AMI.java
index c707f20e6c7..e3d3b792338 100644
--- a/java-compat/test/src/main/java/test/Ice/ami/AMI.java
+++ b/java-compat/test/src/main/java/test/Ice/ami/AMI.java
@@ -1933,6 +1933,7 @@ public class AMI
b1.opBatch();
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback()
{
@Override
@@ -1964,6 +1965,7 @@ public class AMI
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
final FlushExCallback cb = new FlushExCallback();
Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback()
{
@Override
@@ -1995,6 +1997,7 @@ public class AMI
b1.opBatch();
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback_Connection_flushBatchRequests()
{
@Override
@@ -2026,6 +2029,7 @@ public class AMI
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
final FlushExCallback cb = new FlushExCallback();
Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback_Connection_flushBatchRequests()
{
@Override
@@ -2062,6 +2066,7 @@ public class AMI
b1.opBatch();
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback()
{
@Override
@@ -2093,6 +2098,7 @@ public class AMI
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback()
{
@Override
@@ -2129,6 +2135,7 @@ public class AMI
b2.opBatch();
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback()
{
@Override
@@ -2167,6 +2174,7 @@ public class AMI
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback()
{
@Override
@@ -2205,6 +2213,7 @@ public class AMI
b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback()
{
@Override
@@ -2236,6 +2245,7 @@ public class AMI
b1.opBatch();
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback_Communicator_flushBatchRequests()
{
@Override
@@ -2267,6 +2277,7 @@ public class AMI
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback_Communicator_flushBatchRequests()
{
@Override
@@ -2303,6 +2314,7 @@ public class AMI
b2.opBatch();
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback_Communicator_flushBatchRequests()
{
@Override
@@ -2341,6 +2353,7 @@ public class AMI
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback_Communicator_flushBatchRequests()
{
@Override
@@ -2379,6 +2392,7 @@ public class AMI
b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
new Ice.Callback_Communicator_flushBatchRequests()
{
@Override
@@ -2500,7 +2514,7 @@ public class AMI
Ice.Connection con = p.ice_getConnection();
p2 = (TestIntfPrx)p.ice_batchOneway();
p2.ice_ping();
- r = con.begin_flushBatchRequests();
+ r = con.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
test(r.getConnection() == con);
test(r.getCommunicator() == communicator);
test(r.getProxy() == null); // Expected
@@ -2511,7 +2525,7 @@ public class AMI
//
p2 = (TestIntfPrx)p.ice_batchOneway();
p2.ice_ping();
- r = communicator.begin_flushBatchRequests();
+ r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
test(r.getConnection() == null); // Expected
test(r.getCommunicator() == communicator);
test(r.getProxy() == null); // Expected
diff --git a/java-compat/test/src/main/java/test/Ice/ami/lambda/AMI.java b/java-compat/test/src/main/java/test/Ice/ami/lambda/AMI.java
index 8d886c20d78..f64ea6dda04 100644
--- a/java-compat/test/src/main/java/test/Ice/ami/lambda/AMI.java
+++ b/java-compat/test/src/main/java/test/Ice/ami/lambda/AMI.java
@@ -875,6 +875,7 @@ public class AMI
b1.opBatch();
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
null,
(Ice.Exception ex) -> cb.exception(ex),
(boolean sentSynchronously) -> cb.sent(sentSynchronously));
@@ -885,6 +886,7 @@ public class AMI
final FlushCallback cb2 = new FlushCallback();
Ice.AsyncResult r2 = b1.ice_getConnection().begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
null,
(Ice.Exception ex) -> cb2.exception(ex),
(boolean sentSynchronously) -> cb2.sent(sentSynchronously));
@@ -904,6 +906,7 @@ public class AMI
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
final FlushExCallback cb = new FlushExCallback();
Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
null,
(Ice.Exception ex) -> cb.exception(ex),
(boolean sentSynchronously) -> cb.sent(sentSynchronously));
@@ -929,6 +932,7 @@ public class AMI
b1.opBatch();
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
null,
(Ice.Exception ex) -> cb.exception(ex),
(boolean sentSynchronously) -> cb.sent(sentSynchronously));
@@ -949,6 +953,7 @@ public class AMI
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
null,
(Ice.Exception ex) -> cb.exception(ex),
(boolean sentSynchronously) -> cb.sent(sentSynchronously));
@@ -974,6 +979,7 @@ public class AMI
b2.opBatch();
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
null,
(Ice.Exception ex) -> cb.exception(ex),
(boolean sentSynchronously) -> cb.sent(sentSynchronously));
@@ -1001,6 +1007,7 @@ public class AMI
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
null,
(Ice.Exception ex) -> cb.exception(ex),
(boolean sentSynchronously) -> cb.sent(sentSynchronously));
@@ -1028,6 +1035,7 @@ public class AMI
b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait);
final FlushCallback cb = new FlushCallback();
Ice.AsyncResult r = communicator.begin_flushBatchRequests(
+ Ice.CompressBatch.BasedOnProxy,
null,
(Ice.Exception ex) -> cb.exception(ex),
(boolean sentSynchronously) -> cb.sent(sentSynchronously));
diff --git a/java-compat/test/src/main/java/test/Ice/interrupt/AllTests.java b/java-compat/test/src/main/java/test/Ice/interrupt/AllTests.java
index ed523af5dbe..379c64d1032 100644
--- a/java-compat/test/src/main/java/test/Ice/interrupt/AllTests.java
+++ b/java-compat/test/src/main/java/test/Ice/interrupt/AllTests.java
@@ -476,7 +476,7 @@ public class AllTests
p2.op();
p2.op();
- AsyncResult r = p2.ice_getConnection().begin_flushBatchRequests();
+ AsyncResult r = p2.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
mainThread.interrupt();
try
{
@@ -495,7 +495,8 @@ public class AllTests
final CallbackBase cb = new CallbackBase();
Ice.Connection con = p2.ice_getConnection();
mainThread.interrupt();
- con.begin_flushBatchRequests(new Callback_Connection_flushBatchRequests()
+ con.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy,
+ new Callback_Connection_flushBatchRequests()
{
@Override
public void sent(boolean sentSynchronously)
@@ -525,7 +526,7 @@ public class AllTests
p2.op();
p2.op();
- AsyncResult r = communicator.begin_flushBatchRequests();
+ AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
mainThread.interrupt();
try
{
@@ -543,7 +544,8 @@ public class AllTests
final CallbackBase cb = new CallbackBase();
mainThread.interrupt();
- communicator.begin_flushBatchRequests(new Callback_Communicator_flushBatchRequests()
+ communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy,
+ new Callback_Communicator_flushBatchRequests()
{
@Override
public void sent(boolean sentSynchronously)
diff --git a/java-compat/test/src/main/java/test/Ice/operations/BatchOneways.java b/java-compat/test/src/main/java/test/Ice/operations/BatchOneways.java
index d3240fc5a06..5e62fe3c230 100644
--- a/java-compat/test/src/main/java/test/Ice/operations/BatchOneways.java
+++ b/java-compat/test/src/main/java/test/Ice/operations/BatchOneways.java
@@ -79,6 +79,11 @@ class BatchOneways
MyClassPrx batch = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway());
batch.ice_flushBatchRequests(); // Empty flush
+ if(batch.ice_getConnection() != null)
+ {
+ batch.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
+ }
+ batch.ice_getCommunicator().flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
p.opByteSOnewayCallCount(); // Reset the call count
@@ -177,5 +182,41 @@ class BatchOneways
ic.destroy();
}
+
+ p.ice_ping();
+ if(p.ice_getConnection() != null &&
+ p.ice_getCommunicator().getProperties().getProperty("Ice.Override.Compress").equals(""))
+ {
+ Ice.ObjectPrx prx = p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway();
+
+ MyClassPrx batchC1 = MyClassPrxHelper.uncheckedCast(prx.ice_compress(false));
+ MyClassPrx batchC2 = MyClassPrxHelper.uncheckedCast(prx.ice_compress(true));
+ MyClassPrx batchC3 = MyClassPrxHelper.uncheckedCast(prx.ice_identity(identity));
+
+ batchC1.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.Yes);
+
+ batchC2.opByteSOneway(bs1);
+ batchC2.opByteSOneway(bs1);
+ batchC2.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.No);
+
+ batchC1.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
+
+ batchC1.opByteSOneway(bs1);
+ batchC2.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
+
+ batchC1.opByteSOneway(bs1);
+ batchC3.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy);
+ }
}
}
diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/CommunicatorI.java b/java/src/Ice/src/main/java/com/zeroc/Ice/CommunicatorI.java
index 90d1d89467f..48cfd306144 100644
--- a/java/src/Ice/src/main/java/com/zeroc/Ice/CommunicatorI.java
+++ b/java/src/Ice/src/main/java/com/zeroc/Ice/CommunicatorI.java
@@ -211,18 +211,18 @@ public final class CommunicatorI implements Communicator
}
@Override
- public void flushBatchRequests()
+ public void flushBatchRequests(CompressBatch compressBatch)
{
- _iceI_flushBatchRequestsAsync().waitForResponse();
+ _iceI_flushBatchRequestsAsync(compressBatch).waitForResponse();
}
@Override
- public java.util.concurrent.CompletableFuture<Void> flushBatchRequestsAsync()
+ public java.util.concurrent.CompletableFuture<Void> flushBatchRequestsAsync(CompressBatch compressBatch)
{
- return _iceI_flushBatchRequestsAsync();
+ return _iceI_flushBatchRequestsAsync(compressBatch);
}
- public com.zeroc.IceInternal.CommunicatorFlushBatch _iceI_flushBatchRequestsAsync()
+ public com.zeroc.IceInternal.CommunicatorFlushBatch _iceI_flushBatchRequestsAsync(CompressBatch compressBatch)
{
com.zeroc.IceInternal.OutgoingConnectionFactory connectionFactory = _instance.outgoingConnectionFactory();
com.zeroc.IceInternal.ObjectAdapterFactory adapterFactory = _instance.objectAdapterFactory();
@@ -234,8 +234,8 @@ public final class CommunicatorI implements Communicator
com.zeroc.IceInternal.CommunicatorFlushBatch f =
new com.zeroc.IceInternal.CommunicatorFlushBatch(this, _instance);
- connectionFactory.flushAsyncBatchRequests(f);
- adapterFactory.flushAsyncBatchRequests(f);
+ connectionFactory.flushAsyncBatchRequests(compressBatch, f);
+ adapterFactory.flushAsyncBatchRequests(compressBatch, f);
//
// Inform the callback that we have finished initiating all of the
diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java b/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java
index 13b2d7a1dcc..4634a5ca4ec 100644
--- a/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java
+++ b/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java
@@ -344,8 +344,7 @@ public final class ConnectionI extends com.zeroc.IceInternal.EventHandler
}
synchronized public int
- sendAsyncRequest(OutgoingAsyncBase out, boolean compress, boolean response,
- int batchRequestNum)
+ sendAsyncRequest(OutgoingAsyncBase out, boolean compress, boolean response, int batchRequestNum)
throws com.zeroc.IceInternal.RetryException
{
final OutputStream os = out.getOs();
@@ -431,17 +430,17 @@ public final class ConnectionI extends com.zeroc.IceInternal.EventHandler
}
@Override
- public void flushBatchRequests()
+ public void flushBatchRequests(CompressBatch compressBatch)
{
- ObjectPrx.waitForResponseForCompletion(flushBatchRequestsAsync());
+ ObjectPrx.waitForResponseForCompletion(flushBatchRequestsAsync(compressBatch));
}
@Override
- public java.util.concurrent.CompletableFuture<Void> flushBatchRequestsAsync()
+ public java.util.concurrent.CompletableFuture<Void> flushBatchRequestsAsync(CompressBatch compressBatch)
{
com.zeroc.IceInternal.ConnectionFlushBatch f =
new com.zeroc.IceInternal.ConnectionFlushBatch(this, _communicator, _instance);
- f.invoke();
+ f.invoke(compressBatch);
return f;
}
diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/ObjectAdapterI.java b/java/src/Ice/src/main/java/com/zeroc/Ice/ObjectAdapterI.java
index de83357a228..2149b60a207 100644
--- a/java/src/Ice/src/main/java/com/zeroc/Ice/ObjectAdapterI.java
+++ b/java/src/Ice/src/main/java/com/zeroc/Ice/ObjectAdapterI.java
@@ -746,7 +746,8 @@ public final class ObjectAdapterI implements ObjectAdapter
}
public void
- flushAsyncBatchRequests(com.zeroc.IceInternal.CommunicatorFlushBatch outAsync)
+ flushAsyncBatchRequests(com.zeroc.Ice.CompressBatch compressBatch,
+ com.zeroc.IceInternal.CommunicatorFlushBatch outAsync)
{
List<IncomingConnectionFactory> f;
synchronized(this)
@@ -755,7 +756,7 @@ public final class ObjectAdapterI implements ObjectAdapter
}
for(IncomingConnectionFactory p : f)
{
- p.flushAsyncBatchRequests(outAsync);
+ p.flushAsyncBatchRequests(compressBatch, outAsync);
}
}
diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/BatchRequestQueue.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/BatchRequestQueue.java
index 3bb4d920c16..ed776c67636 100644
--- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/BatchRequestQueue.java
+++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/BatchRequestQueue.java
@@ -23,7 +23,7 @@ public class BatchRequestQueue
@Override
public void enqueue()
{
- enqueueBatchRequest();
+ enqueueBatchRequest(_proxy);
}
@Override
@@ -59,6 +59,7 @@ public class BatchRequestQueue
_batchStream = new com.zeroc.Ice.OutputStream(instance, Protocol.currentProtocolEncoding);
_batchStream.writeBlob(Protocol.requestBatchHdr);
_batchMarker = _batchStream.size();
+ _batchCompress = false;
_request = new BatchRequestI();
_maxSize = instance.batchAutoFlushSize();
@@ -112,6 +113,11 @@ public class BatchRequestQueue
}
else
{
+ Boolean compress = proxy._getReference().getCompressOverride();
+ if(compress != null)
+ {
+ _batchCompress |= compress.booleanValue();
+ }
_batchMarker = _batchStream.size();
++_batchRequestNum;
}
@@ -140,12 +146,18 @@ public class BatchRequestQueue
}
}
- synchronized public int
+ public class SwapResult
+ {
+ public int batchRequestNum;
+ public boolean compress;
+ };
+
+ synchronized public SwapResult
swap(com.zeroc.Ice.OutputStream os)
{
if(_batchRequestNum == 0)
{
- return 0;
+ return null;
}
waitStreamInUse(true);
@@ -160,20 +172,23 @@ public class BatchRequestQueue
_batchStream.resize(_batchMarker);
}
- int requestNum = _batchRequestNum;
+ SwapResult result = new SwapResult();
+ result.batchRequestNum = _batchRequestNum;
+ result.compress = _batchCompress;
_batchStream.swap(os);
//
// Reset the batch.
//
_batchRequestNum = 0;
+ _batchCompress = false;
_batchStream.writeBlob(Protocol.requestBatchHdr);
_batchMarker = _batchStream.size();
if(lastRequest != null)
{
_batchStream.writeBlob(lastRequest);
}
- return requestNum;
+ return result;
}
synchronized public void
@@ -218,9 +233,14 @@ public class BatchRequestQueue
}
}
- private void enqueueBatchRequest()
+ private void enqueueBatchRequest(com.zeroc.Ice.ObjectPrx proxy)
{
assert(_batchMarker < _batchStream.size());
+ Boolean compress = proxy._getReference().getCompressOverride();
+ if(compress != null)
+ {
+ _batchCompress |= compress.booleanValue();
+ }
_batchMarker = _batchStream.size();
++_batchRequestNum;
}
@@ -231,6 +251,7 @@ public class BatchRequestQueue
private boolean _batchStreamCanFlush;
private int _batchRequestNum;
private int _batchMarker;
+ private boolean _batchCompress;
private BatchRequestI _request;
private com.zeroc.Ice.LocalException _exception;
private int _maxSize;
diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/CommunicatorFlushBatch.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/CommunicatorFlushBatch.java
index a7c4dc65b38..c6193bb48aa 100644
--- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/CommunicatorFlushBatch.java
+++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/CommunicatorFlushBatch.java
@@ -36,7 +36,7 @@ public class CommunicatorFlushBatch extends InvocationFutureI<Void>
complete(null);
}
- public void flushConnection(final com.zeroc.Ice.ConnectionI con)
+ public void flushConnection(final com.zeroc.Ice.ConnectionI con, final com.zeroc.Ice.CompressBatch compressBatch)
{
class FlushBatch extends OutgoingAsyncBaseI<Void>
{
@@ -106,8 +106,8 @@ public class CommunicatorFlushBatch extends InvocationFutureI<Void>
try
{
final FlushBatch flushBatch = new FlushBatch();
- final int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs());
- if(batchRequestNum == 0)
+ final BatchRequestQueue.SwapResult r = con.getBatchRequestQueue().swap(flushBatch.getOs());
+ if(r == null)
{
flushBatch.sent();
}
@@ -118,14 +118,40 @@ public class CommunicatorFlushBatch extends InvocationFutureI<Void>
@Override
public Void call() throws RetryException
{
- con.sendAsyncRequest(flushBatch, false, false, batchRequestNum);
+ boolean comp = false;
+ if(compressBatch == com.zeroc.Ice.CompressBatch.Yes)
+ {
+ comp = true;
+ }
+ else if(compressBatch == com.zeroc.Ice.CompressBatch.No)
+ {
+ comp = false;
+ }
+ else
+ {
+ comp = r.compress;
+ }
+ con.sendAsyncRequest(flushBatch, comp, false, r.batchRequestNum);
return null;
}
});
}
else
{
- con.sendAsyncRequest(flushBatch, false, false, batchRequestNum);
+ boolean comp = false;
+ if(compressBatch == com.zeroc.Ice.CompressBatch.Yes)
+ {
+ comp = true;
+ }
+ else if(compressBatch == com.zeroc.Ice.CompressBatch.No)
+ {
+ comp = false;
+ }
+ else
+ {
+ comp = r.compress;
+ }
+ con.sendAsyncRequest(flushBatch, comp, false, r.batchRequestNum);
}
}
catch(RetryException ex)
diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ConnectionFlushBatch.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ConnectionFlushBatch.java
index 2d4a35a0f82..588905e9ba5 100644
--- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ConnectionFlushBatch.java
+++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ConnectionFlushBatch.java
@@ -45,14 +45,13 @@ public class ConnectionFlushBatch extends OutgoingAsyncBaseI<Void>
super.markCompleted();
}
- public void invoke()
+ public void invoke(com.zeroc.Ice.CompressBatch compressBatch)
{
try
{
- final int batchRequestNum = _connection.getBatchRequestQueue().swap(_os);
-
+ final BatchRequestQueue.SwapResult r = _connection.getBatchRequestQueue().swap(_os);
int status;
- if(batchRequestNum == 0)
+ if(r == null)
{
status = AsyncStatus.Sent;
if(sent())
@@ -68,13 +67,39 @@ public class ConnectionFlushBatch extends OutgoingAsyncBaseI<Void>
public Integer call()
throws RetryException
{
- return _connection.sendAsyncRequest(ConnectionFlushBatch.this, false, false, batchRequestNum);
+ boolean comp = false;
+ if(compressBatch == com.zeroc.Ice.CompressBatch.Yes)
+ {
+ comp = true;
+ }
+ else if(compressBatch == com.zeroc.Ice.CompressBatch.No)
+ {
+ comp = false;
+ }
+ else
+ {
+ comp = r.compress;
+ }
+ return _connection.sendAsyncRequest(ConnectionFlushBatch.this, comp, false, r.batchRequestNum);
}
});
}
else
{
- status = _connection.sendAsyncRequest(this, false, false, batchRequestNum);
+ boolean comp = false;
+ if(compressBatch == com.zeroc.Ice.CompressBatch.Yes)
+ {
+ comp = true;
+ }
+ else if(compressBatch == com.zeroc.Ice.CompressBatch.No)
+ {
+ comp = false;
+ }
+ else
+ {
+ comp = r.compress;
+ }
+ status = _connection.sendAsyncRequest(this, comp, false, r.batchRequestNum);
}
if((status & AsyncStatus.Sent) > 0)
diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/FixedReference.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/FixedReference.java
index cfa3d1e871b..a2a3de32109 100644
--- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/FixedReference.java
+++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/FixedReference.java
@@ -251,7 +251,7 @@ public class FixedReference extends Reference
_fixedConnection.throwException(); // Throw in case our connection is already destroyed.
- boolean compress;
+ boolean compress = false;
if(defaultsAndOverrides.overrideCompress)
{
compress = defaultsAndOverrides.overrideCompressValue;
@@ -260,10 +260,6 @@ public class FixedReference extends Reference
{
compress = _compress;
}
- else
- {
- compress = _fixedConnection.endpoint().compress();
- }
RequestHandler handler = new ConnectionRequestHandler(this, _fixedConnection, compress);
if(getInstance().queueRequests())
diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/IncomingConnectionFactory.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/IncomingConnectionFactory.java
index 98dd813fb66..2aacbaebdc3 100644
--- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/IncomingConnectionFactory.java
+++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/IncomingConnectionFactory.java
@@ -192,13 +192,13 @@ public final class IncomingConnectionFactory extends EventHandler implements Con
}
public void
- flushAsyncBatchRequests(CommunicatorFlushBatch outAsync)
+ flushAsyncBatchRequests(com.zeroc.Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync)
{
for(ConnectionI c : connections()) // connections() is synchronized, no need to synchronize here.
{
try
{
- outAsync.flushConnection(c);
+ outAsync.flushConnection(c, compressBatch);
}
catch(com.zeroc.Ice.LocalException ex)
{
diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ObjectAdapterFactory.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ObjectAdapterFactory.java
index 94d1417a663..ca27509d5cc 100644
--- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ObjectAdapterFactory.java
+++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ObjectAdapterFactory.java
@@ -221,7 +221,7 @@ public final class ObjectAdapterFactory
}
public void
- flushAsyncBatchRequests(CommunicatorFlushBatch outAsync)
+ flushAsyncBatchRequests(com.zeroc.Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync)
{
java.util.List<ObjectAdapterI> adapters;
synchronized(this)
@@ -231,7 +231,7 @@ public final class ObjectAdapterFactory
for(ObjectAdapterI adapter : adapters)
{
- adapter.flushAsyncBatchRequests(outAsync);
+ adapter.flushAsyncBatchRequests(compressBatch, outAsync);
}
}
diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/OutgoingConnectionFactory.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/OutgoingConnectionFactory.java
index 9bd778ecfb4..99f29b8cbd1 100644
--- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/OutgoingConnectionFactory.java
@@ -277,7 +277,7 @@ public final class OutgoingConnectionFactory
}
public void
- flushAsyncBatchRequests(CommunicatorFlushBatch outAsync)
+ flushAsyncBatchRequests(com.zeroc.Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync)
{
java.util.List<ConnectionI> c = new java.util.LinkedList<>();
@@ -302,7 +302,7 @@ public final class OutgoingConnectionFactory
{
try
{
- outAsync.flushConnection(conn);
+ outAsync.flushConnection(conn, compressBatch);
}
catch(LocalException ex)
{
diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ProxyFlushBatch.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ProxyFlushBatch.java
index 527a26658bd..870b6be3099 100644
--- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ProxyFlushBatch.java
+++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ProxyFlushBatch.java
@@ -15,7 +15,8 @@ public class ProxyFlushBatch extends ProxyOutgoingAsyncBaseI<Void>
{
super(prx, "ice_flushBatchRequests");
_observer = ObserverHelper.get(prx, "ice_flushBatchRequests");
- _batchRequestNum = prx._getBatchRequestQueue().swap(_os);
+ BatchRequestQueue.SwapResult r = prx._getBatchRequestQueue().swap(_os);
+ _batchRequestNum = r != null ? r.batchRequestNum : 0;
}
@Override
diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Reference.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Reference.java
index 51cfc65c06e..43a9befc735 100644
--- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Reference.java
+++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Reference.java
@@ -247,6 +247,21 @@ public abstract class Reference implements Cloneable
return _hashValue;
}
+ public java.lang.Boolean
+ getCompressOverride()
+ {
+ DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ return Boolean.valueOf(defaultsAndOverrides.overrideCompressValue);
+ }
+ else if(_overrideCompress)
+ {
+ return Boolean.valueOf(_compress);
+ }
+ return null; // Null indicates that compress is not overriden.
+ }
+
//
// Utility methods
//
diff --git a/java/test/src/main/java/test/Ice/ami/AMI.java b/java/test/src/main/java/test/Ice/ami/AMI.java
index 0c6b125897a..bc47eb8882f 100644
--- a/java/test/src/main/java/test/Ice/ami/AMI.java
+++ b/java/test/src/main/java/test/Ice/ami/AMI.java
@@ -16,6 +16,7 @@ import java.util.concurrent.CompletionException;
import com.zeroc.Ice.InvocationFuture;
import com.zeroc.Ice.Util;
+import com.zeroc.Ice.CompressBatch;
import test.Ice.ami.Test.CloseMode;
import test.Ice.ami.Test.TestIntfPrx;
@@ -375,7 +376,8 @@ public class AMI
ice_batchOneway();
b1.opBatch();
b1.opBatch();
- CompletableFuture<Void> r = b1.ice_getConnection().flushBatchRequestsAsync();
+ CompletableFuture<Void> r =
+ b1.ice_getConnection().flushBatchRequestsAsync(CompressBatch.BasedOnProxy);
Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) ->
{
test(ex == null);
@@ -397,7 +399,8 @@ public class AMI
ice_batchOneway();
b1.opBatch();
b1.ice_getConnection().close(com.zeroc.Ice.ConnectionClose.CloseGracefullyAndWait);
- CompletableFuture<Void> r = b1.ice_getConnection().flushBatchRequestsAsync();
+ CompletableFuture<Void> r =
+ b1.ice_getConnection().flushBatchRequestsAsync(CompressBatch.BasedOnProxy);
Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) ->
{
test(ex != null);
@@ -424,7 +427,7 @@ public class AMI
ice_batchOneway();
b1.opBatch();
b1.opBatch();
- CompletableFuture<Void> r = communicator.flushBatchRequestsAsync();
+ CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy);
Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) ->
{
test(ex == null);
@@ -446,7 +449,7 @@ public class AMI
ice_batchOneway();
b1.opBatch();
b1.ice_getConnection().close(com.zeroc.Ice.ConnectionClose.CloseGracefullyAndWait);
- CompletableFuture<Void> r = communicator.flushBatchRequestsAsync();
+ CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy);
Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) ->
{
test(ex == null);
@@ -473,7 +476,7 @@ public class AMI
b1.opBatch();
b2.opBatch();
b2.opBatch();
- CompletableFuture<Void> r = communicator.flushBatchRequestsAsync();
+ CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy);
Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) ->
{
test(ex == null);
@@ -502,7 +505,7 @@ public class AMI
b1.opBatch();
b2.opBatch();
b1.ice_getConnection().close(com.zeroc.Ice.ConnectionClose.CloseGracefullyAndWait);
- CompletableFuture<Void> r = communicator.flushBatchRequestsAsync();
+ CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy);
Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) ->
{
test(ex == null);
@@ -531,7 +534,7 @@ public class AMI
b2.opBatch();
b1.ice_getConnection().close(com.zeroc.Ice.ConnectionClose.CloseGracefullyAndWait);
b2.ice_getConnection().close(com.zeroc.Ice.ConnectionClose.CloseGracefullyAndWait);
- CompletableFuture<Void> r = communicator.flushBatchRequestsAsync();
+ CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy);
Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) ->
{
test(ex == null);
@@ -654,7 +657,8 @@ public class AMI
com.zeroc.Ice.Connection con = p.ice_getConnection();
TestIntfPrx p2 = p.ice_batchOneway();
p2.ice_ping();
- InvocationFuture<Void> r = Util.getInvocationFuture(con.flushBatchRequestsAsync());
+ InvocationFuture<Void> r =
+ Util.getInvocationFuture(con.flushBatchRequestsAsync(CompressBatch.BasedOnProxy));
test(r.getConnection() == con);
test(r.getCommunicator() == communicator);
test(r.getProxy() == null); // Expected
@@ -665,7 +669,7 @@ public class AMI
//
p2 = p.ice_batchOneway();
p2.ice_ping();
- r = Util.getInvocationFuture(communicator.flushBatchRequestsAsync());
+ r = Util.getInvocationFuture(communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy));
test(r.getConnection() == null); // Expected
test(r.getCommunicator() == communicator);
test(r.getProxy() == null); // Expected
diff --git a/java/test/src/main/java/test/Ice/interrupt/AllTests.java b/java/test/src/main/java/test/Ice/interrupt/AllTests.java
index 62a1d6034e1..c7b00170209 100644
--- a/java/test/src/main/java/test/Ice/interrupt/AllTests.java
+++ b/java/test/src/main/java/test/Ice/interrupt/AllTests.java
@@ -21,6 +21,7 @@ import com.zeroc.Ice.Connection;
import com.zeroc.Ice.LocalException;
import com.zeroc.Ice.Util;
import com.zeroc.Ice.InvocationFuture;
+import com.zeroc.Ice.CompressBatch;
import test.Ice.interrupt.Test.CannotInterruptException;
import test.Ice.interrupt.Test.TestIntfControllerPrx;
@@ -435,7 +436,7 @@ public class AllTests
p2.op();
p2.op();
- r = p2.ice_getConnection().flushBatchRequestsAsync();
+ r = p2.ice_getConnection().flushBatchRequestsAsync(CompressBatch.BasedOnProxy);
mainThread.interrupt();
try
{
@@ -462,7 +463,7 @@ public class AllTests
final Callback cb = new Callback();
com.zeroc.Ice.Connection con = p2.ice_getConnection();
mainThread.interrupt();
- r = con.flushBatchRequestsAsync();
+ r = con.flushBatchRequestsAsync(CompressBatch.BasedOnProxy);
r.whenComplete((result, ex) -> test(ex == null));
Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) ->
{
@@ -486,7 +487,7 @@ public class AllTests
p2.op();
p2.op();
- r = communicator.flushBatchRequestsAsync();
+ r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy);
mainThread.interrupt();
try
{
@@ -512,7 +513,7 @@ public class AllTests
final Callback cb = new Callback();
mainThread.interrupt();
- r = communicator.flushBatchRequestsAsync();
+ r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy);
r.whenComplete((result, ex) -> test(ex == null));
Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) ->
{
diff --git a/java/test/src/main/java/test/Ice/operations/BatchOneways.java b/java/test/src/main/java/test/Ice/operations/BatchOneways.java
index 30a5672e80d..be982010978 100644
--- a/java/test/src/main/java/test/Ice/operations/BatchOneways.java
+++ b/java/test/src/main/java/test/Ice/operations/BatchOneways.java
@@ -73,6 +73,11 @@ class BatchOneways
MyClassPrx batch = p.ice_batchOneway();
batch.ice_flushBatchRequests(); // Empty flush
+ if(batch.ice_getConnection() != null)
+ {
+ batch.ice_getConnection().flushBatchRequests(com.zeroc.Ice.CompressBatch.BasedOnProxy);
+ }
+ batch.ice_getCommunicator().flushBatchRequests(com.zeroc.Ice.CompressBatch.BasedOnProxy);
p.opByteSOnewayCallCount(); // Reset the call count
@@ -171,5 +176,41 @@ class BatchOneways
ic.destroy();
}
+
+ p.ice_ping();
+ if(p.ice_getConnection() != null &&
+ p.ice_getCommunicator().getProperties().getProperty("Ice.Override.Compress").equals(""))
+ {
+ com.zeroc.Ice.ObjectPrx prx = p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway();
+
+ MyClassPrx batchC1 = MyClassPrx.uncheckedCast(prx.ice_compress(false));
+ MyClassPrx batchC2 = MyClassPrx.uncheckedCast(prx.ice_compress(true));
+ MyClassPrx batchC3 = MyClassPrx.uncheckedCast(prx.ice_identity(identity));
+
+ batchC1.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(com.zeroc.Ice.CompressBatch.Yes);
+
+ batchC2.opByteSOneway(bs1);
+ batchC2.opByteSOneway(bs1);
+ batchC2.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(com.zeroc.Ice.CompressBatch.No);
+
+ batchC1.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(com.zeroc.Ice.CompressBatch.BasedOnProxy);
+
+ batchC1.opByteSOneway(bs1);
+ batchC2.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(com.zeroc.Ice.CompressBatch.BasedOnProxy);
+
+ batchC1.opByteSOneway(bs1);
+ batchC3.opByteSOneway(bs1);
+ batchC1.opByteSOneway(bs1);
+ batchC1.ice_getConnection().flushBatchRequests(com.zeroc.Ice.CompressBatch.BasedOnProxy);
+ }
}
}
diff --git a/js/src/Ice/ConnectRequestHandler.js b/js/src/Ice/ConnectRequestHandler.js
index 6b33f5fb760..6126c277234 100644
--- a/js/src/Ice/ConnectRequestHandler.js
+++ b/js/src/Ice/ConnectRequestHandler.js
@@ -36,7 +36,6 @@ class ConnectRequestHandler
this._initialized = false;
this._connection = null;
- this._compress = false;
this._exception = null;
this._requests = [];
}
@@ -67,7 +66,7 @@ class ConnectRequestHandler
this._requests.push(out);
return AsyncStatus.Queued;
}
- return out.invokeRemote(this._connection, this._compress, this._response);
+ return out.invokeRemote(this._connection, this._response);
}
asyncRequestCanceled(out, ex)
@@ -113,11 +112,11 @@ class ConnectRequestHandler
//
// Implementation of Reference_GetConnectionCallback
//
- setConnection(values)
+ setConnection(connection)
{
Debug.assert(this._exception === null && this._connection === null);
- [this._connection, this._compress] = values;
+ this._connection = connection;
//
// If this proxy is for a non-local object, and we are using a router, then
@@ -213,7 +212,7 @@ class ConnectRequestHandler
{
try
{
- request.invokeRemote(this._connection, this._compress, this._response);
+ request.invokeRemote(this._connection, this._response);
}
catch(ex)
{
@@ -238,7 +237,7 @@ class ConnectRequestHandler
if(this._reference.getCacheConnection() && exception === null)
{
- this._requestHandler = new ConnectionRequestHandler(this._reference, this._connection, this._compress);
+ this._requestHandler = new ConnectionRequestHandler(this._reference, this._connection);
this._proxies.forEach(proxy => proxy._updateRequestHandler(this, this._requestHandler));
}
diff --git a/js/src/Ice/ConnectionI.js b/js/src/Ice/ConnectionI.js
index 4541098ae78..4243b438ad9 100644
--- a/js/src/Ice/ConnectionI.js
+++ b/js/src/Ice/ConnectionI.js
@@ -67,7 +67,6 @@ class MessageInfo
this.stream = new InputStream(instance, Protocol.currentProtocolEncoding);
this.invokeNum = 0;
this.requestId = 0;
- this.compress = false;
this.servantManager = null;
this.adapter = null;
this.outAsync = null;
@@ -367,7 +366,7 @@ class ConnectionI
}
}
- sendAsyncRequest(out, compress, response, batchRequestNum)
+ sendAsyncRequest(out, response, batchRequestNum)
{
let requestId = 0;
const ostr = out.getOs();
@@ -424,7 +423,7 @@ class ConnectionI
let status;
try
{
- status = this.sendMessage(OutgoingMessage.create(out, out.getOs(), compress, requestId));
+ status = this.sendMessage(OutgoingMessage.create(out, out.getOs(), requestId));
}
catch(ex)
{
@@ -572,7 +571,7 @@ class ConnectionI
}
}
- sendResponse(os, compressFlag)
+ sendResponse(os)
{
Debug.assert(this._state > StateNotValidated);
@@ -593,7 +592,7 @@ class ConnectionI
throw this._exception;
}
- this.sendMessage(OutgoingMessage.createForStream(os, compressFlag !== 0, true));
+ this.sendMessage(OutgoingMessage.createForStream(os, true));
if(this._state === StateClosing && this._dispatchCount === 0)
{
@@ -927,8 +926,7 @@ class ConnectionI
if(info.invokeNum > 0)
{
- this.invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager,
- info.adapter);
+ this.invokeAll(info.stream, info.invokeNum, info.requestId, info.servantManager, info.adapter);
//
// Don't increase count, the dispatch count is
@@ -1444,7 +1442,7 @@ class ConnectionI
os.writeByte(0); // compression status: always report 0 for CloseConnection.
os.writeInt(Protocol.headerSize); // Message size.
- if((this.sendMessage(OutgoingMessage.createForStream(os, false, false)) & AsyncStatus.Sent) > 0)
+ if((this.sendMessage(OutgoingMessage.createForStream(os, false)) & AsyncStatus.Sent) > 0)
{
//
// Schedule the close timeout to wait for the peer to close the connection.
@@ -1479,7 +1477,7 @@ class ConnectionI
os.writeInt(Protocol.headerSize); // Message size.
try
{
- this.sendMessage(OutgoingMessage.createForStream(os, false, false));
+ this.sendMessage(OutgoingMessage.createForStream(os, false));
}
catch(ex)
{
@@ -1769,8 +1767,8 @@ class ConnectionI
//
info.stream.pos = 8;
const messageType = info.stream.readByte();
- info.compress = info.stream.readByte();
- if(info.compress === 2)
+ const compress = info.stream.readByte();
+ if(compress === 2)
{
throw new Ice.FeatureNotSupportedException("Cannot uncompress compressed message");
}
@@ -1902,7 +1900,7 @@ class ConnectionI
return info;
}
- invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter)
+ invokeAll(stream, invokeNum, requestId, servantManager, adapter)
{
try
{
@@ -1914,7 +1912,6 @@ class ConnectionI
let inc = new IncomingAsync(this._instance, this,
adapter,
!this._endpoint.datagram() && requestId !== 0, // response
- compress,
requestId);
//
@@ -2102,7 +2099,6 @@ class OutgoingMessage
{
this.stream = null;
this.outAsync = null;
- this.compress = false;
this.requestId = 0;
this.prepared = false;
}
@@ -2140,11 +2136,10 @@ class OutgoingMessage
}
}
- static createForStream(stream, compress, adopt)
+ static createForStream(stream, adopt)
{
const m = new OutgoingMessage();
m.stream = stream;
- m.compress = compress;
m.adopt = adopt;
m.isSent = false;
m.requestId = 0;
@@ -2152,11 +2147,10 @@ class OutgoingMessage
return m;
}
- static create(out, stream, compress, requestId)
+ static create(out, stream, requestId)
{
const m = new OutgoingMessage();
m.stream = stream;
- m.compress = compress;
m.outAsync = out;
m.requestId = requestId;
m.isSent = false;
diff --git a/js/src/Ice/ConnectionRequestHandler.js b/js/src/Ice/ConnectionRequestHandler.js
index 84e0c1a42a7..45ce1a8d29d 100644
--- a/js/src/Ice/ConnectionRequestHandler.js
+++ b/js/src/Ice/ConnectionRequestHandler.js
@@ -12,12 +12,11 @@ const ReferenceMode = Ice.ReferenceMode;
class ConnectionRequestHandler
{
- constructor(ref, connection, compress)
+ constructor(ref, connection)
{
this._reference = ref;
this._response = ref.getMode() == ReferenceMode.ModeTwoway;
this._connection = connection;
- this._compress = compress;
}
update(previousHandler, newHandler)
@@ -47,7 +46,7 @@ class ConnectionRequestHandler
sendAsyncRequest(out)
{
- return out.invokeRemote(this._connection, this._compress, this._response);
+ return out.invokeRemote(this._connection, this._response);
}
asyncRequestCanceled(out)
diff --git a/js/src/Ice/DefaultsAndOverrides.js b/js/src/Ice/DefaultsAndOverrides.js
index 420cdd048ea..753f48c46c1 100644
--- a/js/src/Ice/DefaultsAndOverrides.js
+++ b/js/src/Ice/DefaultsAndOverrides.js
@@ -11,9 +11,9 @@
const Ice = require("../Ice/ModuleRegistry").Ice;
Ice._ModuleRegistry.require(module,
[
- "../Ice/FormatType",
- "../Ice/EndpointTypes",
- "../Ice/Protocol",
+ "../Ice/FormatType",
+ "../Ice/EndpointTypes",
+ "../Ice/Protocol",
"../Ice/LocalException"
]);
@@ -88,7 +88,6 @@ class DefaultsAndOverrides
this.overrideCloseTimeoutValue = -1;
}
- this.overrideCompress = false;
this.overrideSecure = false;
value = properties.getPropertyWithDefault("Ice.Default.EndpointSelection", "Random");
diff --git a/js/src/Ice/IncomingAsync.js b/js/src/Ice/IncomingAsync.js
index 92d074dda71..b46fd8a5c77 100644
--- a/js/src/Ice/IncomingAsync.js
+++ b/js/src/Ice/IncomingAsync.js
@@ -33,11 +33,10 @@ const StringUtil = Ice.StringUtil;
class IncomingAsync
{
- constructor(instance, connection, adapter, response, compress, requestId)
+ constructor(instance, connection, adapter, response, requestId)
{
this._instance = instance;
this._response = response;
- this._compress = compress;
this._connection = connection;
this._format = Ice.FormatType.DefaultFormat;
@@ -230,7 +229,7 @@ class IncomingAsync
this._os.writeString(ex.operation);
- this._connection.sendResponse(this._os, this._compress);
+ this._connection.sendResponse(this._os);
}
else
{
@@ -251,7 +250,7 @@ class IncomingAsync
this._os.writeInt(this._current.requestId);
this._os.writeByte(Protocol.replyUnknownLocalException);
this._os.writeString(ex.unknown);
- this._connection.sendResponse(this._os, this._compress);
+ this._connection.sendResponse(this._os);
}
else
{
@@ -272,7 +271,7 @@ class IncomingAsync
this._os.writeInt(this._current.requestId);
this._os.writeByte(Protocol.replyUnknownUserException);
this._os.writeString(ex.unknown);
- this._connection.sendResponse(this._os, this._compress);
+ this._connection.sendResponse(this._os);
}
else
{
@@ -293,7 +292,7 @@ class IncomingAsync
this._os.writeInt(this._current.requestId);
this._os.writeByte(Protocol.replyUnknownException);
this._os.writeString(ex.unknown);
- this._connection.sendResponse(this._os, this._compress);
+ this._connection.sendResponse(this._os);
}
else
{
@@ -321,7 +320,7 @@ class IncomingAsync
s.push(ex.stack);
}
this._os.writeString(s.join(""));
- this._connection.sendResponse(this._os, this._compress);
+ this._connection.sendResponse(this._os);
}
else
{
@@ -339,7 +338,7 @@ class IncomingAsync
this._os.startEncapsulation(this._current.encoding, this._format);
this._os.writeUserException(ex);
this._os.endEncapsulation();
- this._connection.sendResponse(this._os, this._compress);
+ this._connection.sendResponse(this._os);
}
else
{
@@ -361,7 +360,7 @@ class IncomingAsync
this._os.writeByte(Protocol.replyUnknownException);
//this._os.writeString(ex.toString());
this._os.writeString(ex.toString() + (ex.stack ? "\n" + ex.stack : ""));
- this._connection.sendResponse(this._os, this._compress);
+ this._connection.sendResponse(this._os);
}
else
{
@@ -526,7 +525,7 @@ class IncomingAsync
if(this._response)
{
- this._connection.sendResponse(this._os, this._compress);
+ this._connection.sendResponse(this._os);
}
else
{
diff --git a/js/src/Ice/ObjectPrx.js b/js/src/Ice/ObjectPrx.js
index d68043eac47..506587e90d6 100644
--- a/js/src/Ice/ObjectPrx.js
+++ b/js/src/Ice/ObjectPrx.js
@@ -44,7 +44,7 @@ class ObjectPrx
this._reference = null;
this._requestHandler = null;
}
-
+
hashCode(r)
{
return this._reference.hashCode();
@@ -411,19 +411,6 @@ class ObjectPrx
}
}
- ice_compress(co)
- {
- const ref = this._reference.changeCompress(co);
- if(ref.equals(this._reference))
- {
- return this;
- }
- else
- {
- return this._newInstance(ref);
- }
- }
-
ice_timeout(t)
{
if(t < 1 && t !== -1)
@@ -661,7 +648,7 @@ class ObjectPrx
}
return false;
}
-
+
//
// Generic invocation for operations that have input parameters.
//
@@ -878,7 +865,7 @@ class ObjectPrx
return false;
}
-
+
static ice_staticId()
{
return this._id;
diff --git a/js/src/Ice/OutgoingAsync.js b/js/src/Ice/OutgoingAsync.js
index bce53bc29a9..168dd78976f 100644
--- a/js/src/Ice/OutgoingAsync.js
+++ b/js/src/Ice/OutgoingAsync.js
@@ -102,7 +102,7 @@ class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
{
this.markFinishedEx(ex);
}
-
+
invokeImpl(userThread)
{
try
@@ -182,7 +182,7 @@ class ProxyOutgoingAsyncBase extends OutgoingAsyncBase
}
super.markFinishedEx.call(this, ex);
}
-
+
handleException(ex)
{
const interval = { value: 0 };
@@ -276,9 +276,9 @@ class OutgoingAsync extends ProxyOutgoingAsyncBase
this.markSent(!this._proxy.ice_isTwoway());
}
- invokeRemote(connection, compress, response)
+ invokeRemote(connection, response)
{
- return connection.sendAsyncRequest(this, compress, response, 0);
+ return connection.sendAsyncRequest(this, response, 0);
}
abort(ex)
@@ -507,14 +507,14 @@ class ProxyFlushBatch extends ProxyOutgoingAsyncBase
this._batchRequestNum = prx._getBatchRequestQueue().swap(this._os);
}
- invokeRemote(connection, compress, response)
+ invokeRemote(connection, response)
{
if(this._batchRequestNum === 0)
{
this.sent();
return AsyncStatus.Sent;
}
- return connection.sendAsyncRequest(this, compress, response, this._batchRequestNum);
+ return connection.sendAsyncRequest(this, response, this._batchRequestNum);
}
invoke()
@@ -531,12 +531,12 @@ class ProxyGetConnection extends ProxyOutgoingAsyncBase
super(prx, operation);
}
- invokeRemote(connection, compress, response)
+ invokeRemote(connection, response)
{
this.markFinished(true, r => r.resolve(connection));
return AsyncStatus.Sent;
}
-
+
invoke()
{
this.invokeImpl(true); // userThread = true
@@ -563,7 +563,7 @@ class ConnectionFlushBatch extends OutgoingAsyncBase
}
else
{
- status = this._connection.sendAsyncRequest(this, false, false, batchRequestNum);
+ status = this._connection.sendAsyncRequest(this, false, batchRequestNum);
}
if((status & AsyncStatus.Sent) > 0)
@@ -596,7 +596,7 @@ class HeartbeatAsync extends OutgoingAsyncBase
this._os.writeByte(0);
this._os.writeInt(Protocol.headerSize); // Message size.
- let status = this._connection.sendAsyncRequest(this, false, false, 0);
+ let status = this._connection.sendAsyncRequest(this, false, 0);
if((status & AsyncStatus.Sent) > 0)
{
diff --git a/js/src/Ice/OutgoingConnectionFactory.js b/js/src/Ice/OutgoingConnectionFactory.js
index 860d78fd522..48dfabe7ce9 100644
--- a/js/src/Ice/OutgoingConnectionFactory.js
+++ b/js/src/Ice/OutgoingConnectionFactory.js
@@ -73,7 +73,7 @@ class OutgoingConnectionFactory
}
//
- // Returns a promise, success callback receives (connection, compress)
+ // Returns a promise, success callback receives the connection
//
create(endpts, hasMore, selType)
{
@@ -89,11 +89,10 @@ class OutgoingConnectionFactory
//
try
{
- const compress = { value: false };
- const connection = this.findConnectionByEndpoint(endpoints, compress);
+ const connection = this.findConnectionByEndpoint(endpoints);
if(connection !== null)
{
- return Ice.Promise.resolve([connection, compress.value]);
+ return Ice.Promise.resolve(connection);
}
}
catch(ex)
@@ -217,7 +216,7 @@ class OutgoingConnectionFactory
});
}
- findConnectionByEndpoint(endpoints, compress)
+ findConnectionByEndpoint(endpoints)
{
if(this._destroyed)
{
@@ -246,14 +245,6 @@ class OutgoingConnectionFactory
{
if(connectionList[j].isActiveOrHolding()) // Don't return destroyed or un-validated connections
{
- if(defaultsAndOverrides.overrideCompress)
- {
- compress.value = defaultsAndOverrides.overrideCompressValue;
- }
- else
- {
- compress.value = endpoint.compress();
- }
return connectionList[j];
}
}
@@ -289,7 +280,7 @@ class OutgoingConnectionFactory
}
}
- getConnection(endpoints, cb, compress)
+ getConnection(endpoints, cb)
{
if(this._destroyed)
{
@@ -322,7 +313,7 @@ class OutgoingConnectionFactory
//
// Search for a matching connection. If we find one, we're done.
//
- const connection = this.findConnectionByEndpoint(endpoints, compress);
+ const connection = this.findConnectionByEndpoint(endpoints);
if(connection !== null)
{
return connection;
@@ -447,12 +438,8 @@ class OutgoingConnectionFactory
callbacks.forEach(cc => cc.removeFromPending());
- const defaultsAndOverrides = this._instance.defaultsAndOverrides();
- const compress = defaultsAndOverrides.overrideCompress ? defaultsAndOverrides.overrideCompressValue :
- endpoint.compress();
-
callbacks.forEach(cc => cc.getConnection());
- connectionCallbacks.forEach(cc => cc.setConnection(connection, compress));
+ connectionCallbacks.forEach(cc => cc.setConnection(connection));
this.checkFinished();
}
@@ -748,13 +735,13 @@ class ConnectCallback
}
}
- setConnection(connection, compress)
+ setConnection(connection)
{
//
// Callback from the factory: the connection to one of the callback
// connectors has been established.
//
- this._promise.resolve([connection, compress]);
+ this._promise.resolve(connection);
this._factory.decPendingConnectCount(); // Must be called last.
}
@@ -824,8 +811,7 @@ class ConnectCallback
//
// Ask the factory to get a connection.
//
- const compress = { value: false };
- const connection = this._factory.getConnection(this._endpoints, this, compress);
+ const connection = this._factory.getConnection(this._endpoints, this);
if(connection === null)
{
//
@@ -837,7 +823,7 @@ class ConnectCallback
return;
}
- this._promise.resolve([connection, compress.value]);
+ this._promise.resolve(connection);
this._factory.decPendingConnectCount(); // Must be called last.
}
catch(ex)
diff --git a/js/src/Ice/Reference.js b/js/src/Ice/Reference.js
index 043947c5191..86b4d2ea4ca 100644
--- a/js/src/Ice/Reference.js
+++ b/js/src/Ice/Reference.js
@@ -877,8 +877,6 @@ class Reference
this._encoding = encoding;
this._invocationTimeout = invocationTimeout;
this._hashInitialized = false;
- this._overrideCompress = false;
- this._compress = false; // Only used if _overrideCompress == true
}
getMode()
@@ -1083,18 +1081,6 @@ class Reference
return r;
}
- changeCompress(newCompress)
- {
- if(this._overrideCompress && this._compress === newCompress)
- {
- return this;
- }
- const r = this._instance.referenceFactory().copy(this);
- r._compress = newCompress;
- r._overrideCompress = true;
- return r;
- }
-
changeAdapterId(newAdapterId)
{
// Abstract
@@ -1185,11 +1171,6 @@ class Reference
}
}
h = HashUtil.addString(h, this._facet);
- h = HashUtil.addBoolean(h, this._overrideCompress);
- if(this._overrideCompress)
- {
- h = HashUtil.addBoolean(h, this._compress);
- }
h = HashUtil.addHashable(h, this._protocol);
h = HashUtil.addHashable(h, this._encoding);
h = HashUtil.addNumber(h, this._invocationTimeout);
@@ -1424,15 +1405,6 @@ class Reference
return false;
}
- if(this._overrideCompress !== r._overrideCompress)
- {
- return false;
- }
- if(this._overrideCompress && this._compress !== r._compress)
- {
- return false;
- }
-
if(!this._protocol.equals(r._protocol))
{
return false;
@@ -1464,8 +1436,6 @@ class Reference
// Copy the members that are not passed to the constructor.
//
r._context = this._context;
- r._overrideCompress = this._overrideCompress;
- r._compress = this._compress;
}
}
@@ -1644,23 +1614,9 @@ class FixedReference extends Reference
this._fixedConnection.throwException(); // Throw in case our connection is already destroyed.
- let compress;
- if(defaultsAndOverrides.overrideCompress)
- {
- compress = defaultsAndOverrides.overrideCompressValue;
- }
- else if(this._overrideCompress)
- {
- compress = this._compress;
- }
- else
- {
- compress = this._fixedConnection.endpoint().compress();
- }
-
- return proxy._setRequestHandler(new ConnectionRequestHandler(this, this._fixedConnection, compress));
+ return proxy._setRequestHandler(new ConnectionRequestHandler(this, this._fixedConnection));
}
-
+
getBatchRequestQueue()
{
return this._fixedConnection.getBatchRequestQueue();
@@ -1775,16 +1731,6 @@ class RoutableReference extends Reference
return r;
}
- changeCompress(newCompress)
- {
- const r = super.changeCompress(newCompress);
- if(r !== this && this._endpoints.length > 0) // Also override the compress flag on the endpoints if it was updated.
- {
- r._endpoints = this._endpoints.map(endpoint => endpoint.changeCompress(newCompress));
- }
- return r;
- }
-
changeAdapterId(newAdapterId)
{
if(this._adapterId === newAdapterId)
@@ -2094,7 +2040,7 @@ class RoutableReference extends Reference
getConnection()
{
- const p = new Ice.Promise(); // success callback receives (connection, compress)
+ const p = new Ice.Promise(); // success callback receives (connection)
if(this._routerInfo !== null)
{
@@ -2225,10 +2171,6 @@ class RoutableReference extends Reference
for(let i = 0; i < endpts.length; ++i)
{
endpts[i] = endpts[i].changeConnectionId(this._connectionId);
- if(this._overrideCompress)
- {
- endpts[i] = endpts[i].changeCompress(this._compress);
- }
if(this._overrideTimeout)
{
endpts[i] = endpts[i].changeTimeout(this._timeout);
@@ -2352,7 +2294,7 @@ class RoutableReference extends Reference
//
const cb = new CreateConnectionCallback(this, null, promise);
factory.create(endpoints, false, this.getEndpointSelection()).then(
- values => cb.setConnection(values)).catch(ex => cb.setException(ex));
+ connection => cb.setConnection(connection)).catch(ex => cb.setException(ex));
}
else
{
@@ -2365,7 +2307,7 @@ class RoutableReference extends Reference
//
const cb = new CreateConnectionCallback(this, endpoints, promise);
factory.create([ endpoints[0] ], true, this.getEndpointSelection()).then(
- values => cb.setConnection(values)).catch(ex => cb.setException(ex));
+ connection => cb.setConnection(connection)).catch(ex => cb.setException(ex));
}
return promise;
}
@@ -2385,9 +2327,8 @@ class CreateConnectionCallback
this.exception = null;
}
- setConnection(values)
+ setConnection(connection)
{
- const [connection, compress] = values;
//
// If we have a router, set the object adapter for this router
// (if any) to the new connection, so that callbacks from the
@@ -2397,7 +2338,7 @@ class CreateConnectionCallback
{
connection.setAdapter(this.ref.getRouterInfo().getAdapter());
}
- this.promise.resolve(values);
+ this.promise.resolve(connection);
}
setException(ex)
@@ -2414,9 +2355,9 @@ class CreateConnectionCallback
}
this.ref.getInstance().outgoingConnectionFactory().create(
- [ this.endpoints[this.i] ],
- this.i != this.endpoints.length - 1,
- this.ref.getEndpointSelection()).then(values => this.setConnection(values))
+ [ this.endpoints[this.i] ],
+ this.i != this.endpoints.length - 1,
+ this.ref.getEndpointSelection()).then(connection => this.setConnection(connection))
.catch(ex => this.setException(ex));
}
}
diff --git a/js/src/Ice/RequestHandlerFactory.js b/js/src/Ice/RequestHandlerFactory.js
index fdfbcffb766..5256b61e950 100644
--- a/js/src/Ice/RequestHandlerFactory.js
+++ b/js/src/Ice/RequestHandlerFactory.js
@@ -50,9 +50,9 @@ class RequestHandlerFactory
if(connect)
{
- ref.getConnection().then(values =>
+ ref.getConnection().then(connection =>
{
- handler.setConnection(values);
+ handler.setConnection(connection);
},
ex =>
{
diff --git a/js/test/Ice/proxy/Client.js b/js/test/Ice/proxy/Client.js
index e8d14bc0254..a93eae5b46b 100644
--- a/js/test/Ice/proxy/Client.js
+++ b/js/test/Ice/proxy/Client.js
@@ -698,8 +698,9 @@
test(compObj.ice_connectionId("id1").ice_getConnectionId() === "id1");
test(compObj.ice_connectionId("id2").ice_getConnectionId() === "id2");
- test(compObj.ice_compress(true).equals(compObj.ice_compress(true)));
- test(!compObj.ice_compress(false).equals(compObj.ice_compress(true)));
+ // Proxy doesn't support ice_compress
+ //test(compObj.ice_compress(true).equals(compObj.ice_compress(true)));
+ //test(!compObj.ice_compress(false).equals(compObj.ice_compress(true)));
test(compObj.ice_timeout(20).equals(compObj.ice_timeout(20)));
test(!compObj.ice_timeout(10).equals(compObj.ice_timeout(20)));
diff --git a/objective-c/src/Ice/CommunicatorI.mm b/objective-c/src/Ice/CommunicatorI.mm
index 5bd82866953..050e5d9b0e7 100644
--- a/objective-c/src/Ice/CommunicatorI.mm
+++ b/objective-c/src/Ice/CommunicatorI.mm
@@ -445,12 +445,12 @@
{
@throw [ICEFeatureNotSupportedException featureNotSupportedException:__FILE__ line:__LINE__];
}
--(void) flushBatchRequests
+-(void) flushBatchRequests:(ICECompressBatch)compress
{
NSException* nsex = nil;
try
{
- COMMUNICATOR->flushBatchRequests();
+ COMMUNICATOR->flushBatchRequests((Ice::CompressBatch)compress);
}
catch(const std::exception& ex)
{
@@ -461,22 +461,24 @@
@throw nsex;
}
}
--(id<ICEAsyncResult>) begin_flushBatchRequests
+-(id<ICEAsyncResult>) begin_flushBatchRequests:(ICECompressBatch)compress
{
return beginCppCall(^(Ice::AsyncResultPtr& result)
{
- result = COMMUNICATOR->begin_flushBatchRequests();
+ result = COMMUNICATOR->begin_flushBatchRequests((Ice::CompressBatch)compress);
});
}
--(id<ICEAsyncResult>) begin_flushBatchRequests:(void(^)(ICEException*))exception
+-(id<ICEAsyncResult>) begin_flushBatchRequests:(ICECompressBatch)compress exception:(void(^)(ICEException*))exception
{
- return [self begin_flushBatchRequests:exception sent:nil];
+ return [self begin_flushBatchRequests:compress exception:exception sent:nil];
}
--(id<ICEAsyncResult>) begin_flushBatchRequests:(void(^)(ICEException*))exception sent:(void(^)(BOOL))sent
+-(id<ICEAsyncResult>) begin_flushBatchRequests:(ICECompressBatch)compress
+ exception:(void(^)(ICEException*))exception
+ sent:(void(^)(BOOL))sent
{
return beginCppCall(^(Ice::AsyncResultPtr& result, const Ice::CallbackPtr& cb)
{
- result = COMMUNICATOR->begin_flushBatchRequests(cb);
+ result = COMMUNICATOR->begin_flushBatchRequests((Ice::CompressBatch)compress, cb);
},
^(const Ice::AsyncResultPtr& result) {
COMMUNICATOR->end_flushBatchRequests(result);
diff --git a/objective-c/src/Ice/ConnectionI.mm b/objective-c/src/Ice/ConnectionI.mm
index 07f6b6be8bd..f1775726ee8 100644
--- a/objective-c/src/Ice/ConnectionI.mm
+++ b/objective-c/src/Ice/ConnectionI.mm
@@ -326,12 +326,12 @@ private:
@throw nsex;
return nil; // Keep the compiler happy.
}
--(void) flushBatchRequests
+-(void) flushBatchRequests:(ICECompressBatch)compress
{
NSException* nsex = nil;
try
{
- CONNECTION->flushBatchRequests();
+ CONNECTION->flushBatchRequests((Ice::CompressBatch)compress);
}
catch(const std::exception& ex)
{
@@ -342,22 +342,24 @@ private:
@throw nsex;
}
}
--(id<ICEAsyncResult>) begin_flushBatchRequests
+-(id<ICEAsyncResult>) begin_flushBatchRequests:(ICECompressBatch)compress
{
return beginCppCall(^(Ice::AsyncResultPtr& result)
{
- result = CONNECTION->begin_flushBatchRequests();
+ result = CONNECTION->begin_flushBatchRequests((Ice::CompressBatch)compress);
});
}
--(id<ICEAsyncResult>) begin_flushBatchRequests:(void(^)(ICEException*))exception
+-(id<ICEAsyncResult>) begin_flushBatchRequests:(ICECompressBatch)compress exception:(void(^)(ICEException*))exception
{
- return [self begin_flushBatchRequests:exception sent:nil];
+ return [self begin_flushBatchRequests:compress exception:exception sent:nil];
}
--(id<ICEAsyncResult>) begin_flushBatchRequests:(void(^)(ICEException*))exception sent:(void(^)(BOOL))sent
+-(id<ICEAsyncResult>) begin_flushBatchRequests:(ICECompressBatch)compress
+ exception:(void(^)(ICEException*))exception
+ sent:(void(^)(BOOL))sent
{
return beginCppCall(^(Ice::AsyncResultPtr& result, const Ice::CallbackPtr& cb)
{
- result = CONNECTION->begin_flushBatchRequests(cb);
+ result = CONNECTION->begin_flushBatchRequests((Ice::CompressBatch)compress, cb);
},
^(const Ice::AsyncResultPtr& result) {
CONNECTION->end_flushBatchRequests(result);
diff --git a/objective-c/test/Ice/ami/AllTests.m b/objective-c/test/Ice/ami/AllTests.m
index 3f1896cb4e7..dfa478c18d1 100644
--- a/objective-c/test/Ice/ami/AllTests.m
+++ b/objective-c/test/Ice/ami/AllTests.m
@@ -585,12 +585,13 @@ amiAllTests(id<ICECommunicator> communicator, BOOL collocated)
[b1 opBatch];
[b1 opBatch];
TestAMICallback* cb = [TestAMICallback create];
- id<ICEAsyncResult> r = [[b1 ice_getConnection] begin_flushBatchRequests:^(ICEException* ex)
- {
- test(NO);
- }
- sent:^(BOOL sentSynchronously) { [cb called]; }];
- [cb check];
+ id<ICEAsyncResult> r = [[b1 ice_getConnection] begin_flushBatchRequests:ICEBasedOnProxy
+ exception:^(ICEException* ex)
+ {
+ test(NO);
+ }
+ sent:^(BOOL sentSynchronously) { [cb called]; }];
+ [cb check];
test([r isSent]);
test([r isCompleted]);
test([p waitForBatch:2]);
@@ -605,8 +606,8 @@ amiAllTests(id<ICECommunicator> communicator, BOOL collocated)
[b1 opBatch];
[[b1 ice_getConnection] close:ICECloseGracefullyAndWait];
TestAMICallback* cb = [TestAMICallback create];
- id<ICEAsyncResult> r = [[b1 ice_getConnection] begin_flushBatchRequests:
- ^(ICEException* ex) { [cb called]; }
+ id<ICEAsyncResult> r = [[b1 ice_getConnection] begin_flushBatchRequests:ICEBasedOnProxy
+ exception:^(ICEException* ex) { [cb called]; }
sent:^(BOOL sentSynchronously) { test(NO); }];
[cb check];
test(![r isSent]);
@@ -627,7 +628,8 @@ amiAllTests(id<ICECommunicator> communicator, BOOL collocated)
[b1 opBatch];
[b1 opBatch];
TestAMICallback* cb = [TestAMICallback create];
- id<ICEAsyncResult> r = [communicator begin_flushBatchRequests:^(ICEException* ex) { test(NO); }
+ id<ICEAsyncResult> r = [communicator begin_flushBatchRequests:ICEBasedOnProxy
+ exception:^(ICEException* ex) { test(NO); }
sent:^(BOOL sentSynchronously) { [cb called]; }];
[cb check];
test([r isSent]);
@@ -644,7 +646,8 @@ amiAllTests(id<ICECommunicator> communicator, BOOL collocated)
[b1 opBatch];
[[b1 ice_getConnection] close:ICECloseGracefullyAndWait];
TestAMICallback* cb = [TestAMICallback create];
- id<ICEAsyncResult> r = [communicator begin_flushBatchRequests:^(ICEException* ex) { test(NO); }
+ id<ICEAsyncResult> r = [communicator begin_flushBatchRequests:ICEBasedOnProxy
+ exception:^(ICEException* ex) { test(NO); }
sent:^(BOOL sentSynchronously) { [cb called]; }];
[cb check];
test([r isSent]);
@@ -758,7 +761,7 @@ amiAllTests(id<ICECommunicator> communicator, BOOL collocated)
id<ICEConnection> con = [p ice_getConnection];
p2 = [p ice_batchOneway];
[p2 ice_ping];
- r = [con begin_flushBatchRequests];
+ r = [con begin_flushBatchRequests:ICEBasedOnProxy];
test([[r getConnection] isEqual:con]);
test([r getCommunicator] == communicator);
test([r getProxy] == nil); // Expected
@@ -769,7 +772,7 @@ amiAllTests(id<ICECommunicator> communicator, BOOL collocated)
//
p2 = [p ice_batchOneway];
[p2 ice_ping];
- r = [communicator begin_flushBatchRequests];
+ r = [communicator begin_flushBatchRequests:ICEBasedOnProxy];
test([r getConnection] == nil); // Expected
test([r getCommunicator] == communicator);
test([r getProxy] == nil); // Expected
diff --git a/php/src/php5/Communicator.cpp b/php/src/php5/Communicator.cpp
index c22723e53d7..964a255599d 100644
--- a/php/src/php5/Communicator.cpp
+++ b/php/src/php5/Communicator.cpp
@@ -807,17 +807,25 @@ ZEND_METHOD(Ice_Communicator, setDefaultLocator)
ZEND_METHOD(Ice_Communicator, flushBatchRequests)
{
- CommunicatorInfoIPtr _this = Wrapper<CommunicatorInfoIPtr>::value(getThis() TSRMLS_CC);
- assert(_this);
+ zval* compress;
+ if(zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, const_cast<char*>("z"), &compress TSRMLS_CC) != SUCCESS)
+ {
+ RETURN_NULL();
+ }
- if(ZEND_NUM_ARGS() != 8)
+ if(Z_TYPE_P(compress) != IS_LONG)
{
- WRONG_PARAM_COUNT;
+ invalidArgument("value for 'compress' argument must be an enumerator of CompressBatch" TSRMLS_CC);
+ RETURN_NULL();
}
+ Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(Z_LVAL_P(compress));
+
+ CommunicatorInfoIPtr _this = Wrapper<CommunicatorInfoIPtr>::value(getThis() TSRMLS_CC);
+ assert(_this);
try
{
- _this->getCommunicator()->flushBatchRequests();
+ _this->getCommunicator()->flushBatchRequests(cb);
}
catch(const IceUtil::Exception& ex)
{
diff --git a/php/src/php5/Connection.cpp b/php/src/php5/Connection.cpp
index 68827e63243..12aa1a7c2aa 100644
--- a/php/src/php5/Connection.cpp
+++ b/php/src/php5/Connection.cpp
@@ -128,17 +128,25 @@ ZEND_METHOD(Ice_Connection, getEndpoint)
ZEND_METHOD(Ice_Connection, flushBatchRequests)
{
- if(ZEND_NUM_ARGS() > 0)
+ zval* compress;
+ if(zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, const_cast<char*>("z"), &compress TSRMLS_CC) != SUCCESS)
{
- WRONG_PARAM_COUNT;
+ RETURN_NULL();
+ }
+
+ if(Z_TYPE_P(compress) != IS_LONG)
+ {
+ invalidArgument("value for 'compress' argument must be an enumerator of CompressBatch" TSRMLS_CC);
+ RETURN_NULL();
}
+ Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(Z_LVAL_P(compress));
Ice::ConnectionPtr _this = Wrapper<Ice::ConnectionPtr>::value(getThis() TSRMLS_CC);
assert(_this);
try
{
- _this->flushBatchRequests();
+ _this->flushBatchRequests(cb);
}
catch(const IceUtil::Exception& ex)
{
diff --git a/php/test/Ice/operations/Client.php b/php/test/Ice/operations/Client.php
index 9a79908a125..08b0838674e 100644
--- a/php/test/Ice/operations/Client.php
+++ b/php/test/Ice/operations/Client.php
@@ -1090,6 +1090,8 @@ function twoways($communicator, $p)
function allTests($communicator)
{
+ global $NS;
+
$ref = "test:default -p 12010";
$base = $communicator->stringToProxy($ref);
$cl = $base->ice_checkedCast("::Test::MyClass");
@@ -1102,10 +1104,17 @@ function allTests($communicator)
$derived->opDerived();
echo "ok\n";
+ # Test flush batch requests methods
+ $BasedOnProxy = $NS ? constant("Ice\\CompressBatch::BasedOnProxy") : constant("Ice_CompressBatch::BasedOnProxy");
+
+ $derived->ice_flushBatchRequests();
+ $derived->ice_getConnection()->flushBatchRequests($BasedOnProxy);
+ $derived->ice_getCommunicator()->flushBatchRequests($BasedOnProxy);
+
return $cl;
}
-$communicator = $NS ? eval("return Ice\\initialize(\$argv);") :
+$communicator = $NS ? eval("return Ice\\initialize(\$argv);") :
eval("return Ice_initialize(\$argv);");
$myClass = allTests($communicator);
diff --git a/python/modules/IcePy/Communicator.cpp b/python/modules/IcePy/Communicator.cpp
index 927c3cab5d2..37a99a78641 100644
--- a/python/modules/IcePy/Communicator.cpp
+++ b/python/modules/IcePy/Communicator.cpp
@@ -701,13 +701,24 @@ communicatorIdentityToString(CommunicatorObject* self, PyObject* args)
extern "C"
#endif
static PyObject*
-communicatorFlushBatchRequests(CommunicatorObject* self)
+communicatorFlushBatchRequests(CommunicatorObject* self, PyObject* args)
{
+ PyObject* compressBatchType = lookupType("Ice.CompressBatch");
+ PyObject* compressBatch;
+ if(!PyArg_ParseTuple(args, STRCAST("O!"), compressBatchType, &compressBatch))
+ {
+ return 0;
+ }
+
+ PyObjectHandle v = PyObject_GetAttrString(compressBatch, STRCAST("_value"));
+ assert(v.get());
+ Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(PyLong_AsLong(v.get()));
+
assert(self->communicator);
try
{
AllowThreads allowThreads; // Release Python's global interpreter lock to avoid a potential deadlock.
- (*self->communicator)->flushBatchRequests();
+ (*self->communicator)->flushBatchRequests(cb);
}
catch(const Ice::Exception& ex)
{
@@ -723,13 +734,24 @@ communicatorFlushBatchRequests(CommunicatorObject* self)
extern "C"
#endif
static PyObject*
-communicatorFlushBatchRequestsAsync(CommunicatorObject* self, PyObject* /*args*/, PyObject* /*kwds*/)
+communicatorFlushBatchRequestsAsync(CommunicatorObject* self, PyObject* args, PyObject* /*kwds*/)
{
+ PyObject* compressBatchType = lookupType("Ice.CompressBatch");
+ PyObject* compressBatch;
+ if(!PyArg_ParseTuple(args, STRCAST("O!"), compressBatchType, &compressBatch))
+ {
+ return 0;
+ }
+
+ PyObjectHandle v = PyObject_GetAttrString(compressBatch, STRCAST("_value"));
+ assert(v.get());
+ Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(PyLong_AsLong(v.get()));
+
assert(self->communicator);
const string op = "flushBatchRequests";
FlushAsyncCallbackPtr d = new FlushAsyncCallback(op);
- Ice::Callback_Communicator_flushBatchRequestsPtr cb =
+ Ice::Callback_Communicator_flushBatchRequestsPtr callback =
Ice::newCallback_Communicator_flushBatchRequests(d, &FlushAsyncCallback::exception, &FlushAsyncCallback::sent);
Ice::AsyncResultPtr result;
@@ -738,7 +760,7 @@ communicatorFlushBatchRequestsAsync(CommunicatorObject* self, PyObject* /*args*/
{
AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations.
- result = (*self->communicator)->begin_flushBatchRequests(cb);
+ result = (*self->communicator)->begin_flushBatchRequests(cb, callback);
}
catch(const Ice::Exception& ex)
{
@@ -771,17 +793,29 @@ communicatorBeginFlushBatchRequests(CommunicatorObject* self, PyObject* args, Py
static char* argNames[] =
{
+ const_cast<char*>("compress"),
const_cast<char*>("_ex"),
const_cast<char*>("_sent"),
0
};
+ PyObject* compressBatch;
PyObject* ex = Py_None;
PyObject* sent = Py_None;
- if(!PyArg_ParseTupleAndKeywords(args, kwds, STRCAST("|OO"), argNames, &ex, &sent))
+ if(!PyArg_ParseTupleAndKeywords(args, kwds, STRCAST("O|OO"), argNames, &compressBatch, &ex, &sent))
+ {
+ return 0;
+ }
+
+ PyObject* compressBatchType = lookupType("Ice.CompressBatch");
+ if(!PyObject_IsInstance(compressBatch, reinterpret_cast<PyObject*>(compressBatchType)))
{
return 0;
}
+ PyObjectHandle v = PyObject_GetAttrString(compressBatch, STRCAST("_value"));
+ assert(v.get());
+ Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(PyLong_AsLong(v.get()));
+
if(ex == Py_None)
{
ex = 0;
@@ -798,11 +832,11 @@ communicatorBeginFlushBatchRequests(CommunicatorObject* self, PyObject* args, Py
return 0;
}
- Ice::Callback_Communicator_flushBatchRequestsPtr cb;
+ Ice::Callback_Communicator_flushBatchRequestsPtr callback;
if(ex || sent)
{
FlushCallbackPtr d = new FlushCallback(ex, sent, "flushBatchRequests");
- cb = Ice::newCallback_Communicator_flushBatchRequests(d, &FlushCallback::exception, &FlushCallback::sent);
+ callback = Ice::newCallback_Communicator_flushBatchRequests(d, &FlushCallback::exception, &FlushCallback::sent);
}
Ice::AsyncResultPtr result;
@@ -810,13 +844,13 @@ communicatorBeginFlushBatchRequests(CommunicatorObject* self, PyObject* args, Py
{
AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations.
- if(cb)
+ if(callback)
{
- result = (*self->communicator)->begin_flushBatchRequests(cb);
+ result = (*self->communicator)->begin_flushBatchRequests(cb, callback);
}
else
{
- result = (*self->communicator)->begin_flushBatchRequests();
+ result = (*self->communicator)->begin_flushBatchRequests(cb);
}
}
catch(const Ice::Exception& ex)
@@ -1648,13 +1682,13 @@ static PyMethodDef CommunicatorMethods[] =
PyDoc_STR(STRCAST("getDefaultLocator() -> proxy")) },
{ STRCAST("setDefaultLocator"), reinterpret_cast<PyCFunction>(communicatorSetDefaultLocator), METH_VARARGS,
PyDoc_STR(STRCAST("setDefaultLocator(proxy) -> None")) },
- { STRCAST("flushBatchRequests"), reinterpret_cast<PyCFunction>(communicatorFlushBatchRequests), METH_NOARGS,
- PyDoc_STR(STRCAST("flushBatchRequests() -> None")) },
+ { STRCAST("flushBatchRequests"), reinterpret_cast<PyCFunction>(communicatorFlushBatchRequests), METH_VARARGS,
+ PyDoc_STR(STRCAST("flushBatchRequests(compress) -> None")) },
{ STRCAST("flushBatchRequestsAsync"), reinterpret_cast<PyCFunction>(communicatorFlushBatchRequestsAsync),
- METH_NOARGS, PyDoc_STR(STRCAST("flushBatchRequestsAsync() -> Ice.Future")) },
+ METH_VARARGS, PyDoc_STR(STRCAST("flushBatchRequestsAsync(compress) -> Ice.Future")) },
{ STRCAST("begin_flushBatchRequests"), reinterpret_cast<PyCFunction>(communicatorBeginFlushBatchRequests),
METH_VARARGS | METH_KEYWORDS,
- PyDoc_STR(STRCAST("begin_flushBatchRequests([_ex][, _sent]) -> Ice.AsyncResult")) },
+ PyDoc_STR(STRCAST("begin_flushBatchRequests(compress[, _ex][, _sent]) -> Ice.AsyncResult")) },
{ STRCAST("end_flushBatchRequests"), reinterpret_cast<PyCFunction>(communicatorEndFlushBatchRequests),
METH_VARARGS, PyDoc_STR(STRCAST("end_flushBatchRequests(Ice.AsyncResult) -> None")) },
{ STRCAST("createAdmin"), reinterpret_cast<PyCFunction>(communicatorCreateAdmin), METH_VARARGS,
diff --git a/python/modules/IcePy/Connection.cpp b/python/modules/IcePy/Connection.cpp
index 8e9e9286d02..056dc42e4ab 100644
--- a/python/modules/IcePy/Connection.cpp
+++ b/python/modules/IcePy/Connection.cpp
@@ -418,13 +418,24 @@ connectionGetAdapter(ConnectionObject* self)
extern "C"
#endif
static PyObject*
-connectionFlushBatchRequests(ConnectionObject* self)
+connectionFlushBatchRequests(ConnectionObject* self, PyObject* args)
{
+ PyObject* compressBatchType = lookupType("Ice.CompressBatch");
+ PyObject* compressBatch;
+ if(!PyArg_ParseTuple(args, STRCAST("O!"), compressBatchType, &compressBatch))
+ {
+ return 0;
+ }
+
+ PyObjectHandle v = PyObject_GetAttrString(compressBatch, STRCAST("_value"));
+ assert(v.get());
+ Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(PyLong_AsLong(v.get()));
+
assert(self->connection);
try
{
AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations.
- (*self->connection)->flushBatchRequests();
+ (*self->connection)->flushBatchRequests(cb);
}
catch(const Ice::Exception& ex)
{
@@ -440,13 +451,24 @@ connectionFlushBatchRequests(ConnectionObject* self)
extern "C"
#endif
static PyObject*
-connectionFlushBatchRequestsAsync(ConnectionObject* self, PyObject* /*args*/, PyObject* /*kwds*/)
+connectionFlushBatchRequestsAsync(ConnectionObject* self, PyObject* args, PyObject* /*kwds*/)
{
+ PyObject* compressBatchType = lookupType("Ice.CompressBatch");
+ PyObject* compressBatch;
+ if(!PyArg_ParseTuple(args, STRCAST("O!"), compressBatchType, &compressBatch))
+ {
+ return 0;
+ }
+
+ PyObjectHandle v = PyObject_GetAttrString(compressBatch, STRCAST("_value"));
+ assert(v.get());
+ Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(PyLong_AsLong(v.get()));
+
assert(self->connection);
const string op = "flushBatchRequests";
FlushAsyncCallbackPtr d = new FlushAsyncCallback(op);
- Ice::Callback_Connection_flushBatchRequestsPtr cb =
+ Ice::Callback_Connection_flushBatchRequestsPtr callback =
Ice::newCallback_Connection_flushBatchRequests(d, &FlushAsyncCallback::exception, &FlushAsyncCallback::sent);
Ice::AsyncResultPtr result;
@@ -455,7 +477,7 @@ connectionFlushBatchRequestsAsync(ConnectionObject* self, PyObject* /*args*/, Py
{
AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations.
- result = (*self->connection)->begin_flushBatchRequests(cb);
+ result = (*self->connection)->begin_flushBatchRequests(cb, callback);
}
catch(const Ice::Exception& ex)
{
@@ -490,17 +512,29 @@ connectionBeginFlushBatchRequests(ConnectionObject* self, PyObject* args, PyObje
static char* argNames[] =
{
+ const_cast<char*>("compress"),
const_cast<char*>("_ex"),
const_cast<char*>("_sent"),
0
};
+ PyObject* compressBatch;
PyObject* ex = Py_None;
PyObject* sent = Py_None;
- if(!PyArg_ParseTupleAndKeywords(args, kwds, STRCAST("|OO"), argNames, &ex, &sent))
+ if(!PyArg_ParseTupleAndKeywords(args, kwds, STRCAST("O|OO"), argNames, &compressBatch, &ex, &sent))
+ {
+ return 0;
+ }
+
+ PyObject* compressBatchType = lookupType("Ice.CompressBatch");
+ if(!PyObject_IsInstance(compressBatch, reinterpret_cast<PyObject*>(compressBatchType)))
{
return 0;
}
+ PyObjectHandle v = PyObject_GetAttrString(compressBatch, STRCAST("_value"));
+ assert(v.get());
+ Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(PyLong_AsLong(v.get()));
+
if(ex == Py_None)
{
ex = 0;
@@ -517,11 +551,11 @@ connectionBeginFlushBatchRequests(ConnectionObject* self, PyObject* args, PyObje
return 0;
}
- Ice::Callback_Connection_flushBatchRequestsPtr cb;
+ Ice::Callback_Connection_flushBatchRequestsPtr callback;
if(ex || sent)
{
FlushCallbackPtr d = new FlushCallback(ex, sent, "flushBatchRequests");
- cb = Ice::newCallback_Connection_flushBatchRequests(d, &FlushCallback::exception, &FlushCallback::sent);
+ callback = Ice::newCallback_Connection_flushBatchRequests(d, &FlushCallback::exception, &FlushCallback::sent);
}
Ice::AsyncResultPtr result;
@@ -529,13 +563,13 @@ connectionBeginFlushBatchRequests(ConnectionObject* self, PyObject* args, PyObje
{
AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations.
- if(cb)
+ if(callback)
{
- result = (*self->connection)->begin_flushBatchRequests(cb);
+ result = (*self->connection)->begin_flushBatchRequests(cb, callback);
}
else
{
- result = (*self->connection)->begin_flushBatchRequests();
+ result = (*self->connection)->begin_flushBatchRequests(cb);
}
}
catch(const Ice::Exception& ex)
@@ -1059,19 +1093,19 @@ connectionThrowException(ConnectionObject* self)
static PyMethodDef ConnectionMethods[] =
{
{ STRCAST("close"), reinterpret_cast<PyCFunction>(connectionClose), METH_VARARGS,
- PyDoc_STR(STRCAST("close(bool) -> None")) },
+ PyDoc_STR(STRCAST("close(Ice.ConnectionClose) -> None")) },
{ STRCAST("createProxy"), reinterpret_cast<PyCFunction>(connectionCreateProxy), METH_VARARGS,
PyDoc_STR(STRCAST("createProxy(Ice.Identity) -> Ice.ObjectPrx")) },
{ STRCAST("setAdapter"), reinterpret_cast<PyCFunction>(connectionSetAdapter), METH_VARARGS,
PyDoc_STR(STRCAST("setAdapter(Ice.ObjectAdapter) -> None")) },
{ STRCAST("getAdapter"), reinterpret_cast<PyCFunction>(connectionGetAdapter), METH_NOARGS,
PyDoc_STR(STRCAST("getAdapter() -> Ice.ObjectAdapter")) },
- { STRCAST("flushBatchRequests"), reinterpret_cast<PyCFunction>(connectionFlushBatchRequests), METH_NOARGS,
- PyDoc_STR(STRCAST("flushBatchRequests() -> None")) },
+ { STRCAST("flushBatchRequests"), reinterpret_cast<PyCFunction>(connectionFlushBatchRequests), METH_VARARGS,
+ PyDoc_STR(STRCAST("flushBatchRequests(Ice.CompressBatch) -> None")) },
{ STRCAST("flushBatchRequestsAsync"), reinterpret_cast<PyCFunction>(connectionFlushBatchRequestsAsync),
- METH_NOARGS, PyDoc_STR(STRCAST("flushBatchRequestsAsync() -> Ice.Future")) },
+ METH_VARARGS, PyDoc_STR(STRCAST("flushBatchRequestsAsync(Ice.CompressBatch) -> Ice.Future")) },
{ STRCAST("begin_flushBatchRequests"), reinterpret_cast<PyCFunction>(connectionBeginFlushBatchRequests),
- METH_VARARGS | METH_KEYWORDS, PyDoc_STR(STRCAST("begin_flushBatchRequests([_ex][, _sent]) -> Ice.AsyncResult")) },
+ METH_VARARGS | METH_KEYWORDS, PyDoc_STR(STRCAST("begin_flushBatchRequests(Ice.CompressBatch, [_ex][, _sent]) -> Ice.AsyncResult")) },
{ STRCAST("end_flushBatchRequests"), reinterpret_cast<PyCFunction>(connectionEndFlushBatchRequests), METH_VARARGS,
PyDoc_STR(STRCAST("end_flushBatchRequests(Ice.AsyncResult) -> None")) },
{ STRCAST("setCloseCallback"), reinterpret_cast<PyCFunction>(connectionSetCloseCallback), METH_VARARGS,
diff --git a/python/python/Ice.py b/python/python/Ice.py
index 80b48c639fc..9ce81306fb7 100644
--- a/python/python/Ice.py
+++ b/python/python/Ice.py
@@ -938,14 +938,14 @@ class CommunicatorI(Communicator):
def getPluginManager(self):
raise RuntimeError("operation `getPluginManager' not implemented")
- def flushBatchRequests(self):
- self._impl.flushBatchRequests()
+ def flushBatchRequests(self, compress):
+ self._impl.flushBatchRequests(compress)
- def flushBatchRequestsAsync(self):
- return self._impl.flushBatchRequestsAsync()
+ def flushBatchRequestsAsync(self, compress):
+ return self._impl.flushBatchRequestsAsync(compress)
- def begin_flushBatchRequests(self, _ex=None, _sent=None):
- return self._impl.begin_flushBatchRequests(_ex, _sent)
+ def begin_flushBatchRequests(self, compress, _ex=None, _sent=None):
+ return self._impl.begin_flushBatchRequests(compress, _ex, _sent)
def end_flushBatchRequests(self, r):
return self._impl.end_flushBatchRequests(r)
diff --git a/python/test/Ice/ami/AllTests.py b/python/test/Ice/ami/AllTests.py
index d29311fd9d6..78c71233218 100644
--- a/python/test/Ice/ami/AllTests.py
+++ b/python/test/Ice/ami/AllTests.py
@@ -805,7 +805,7 @@ def allTests(communicator, collocated):
b1.opBatch()
b1.opBatch()
cb = FlushCallback()
- r = b1.ice_getConnection().begin_flushBatchRequests(cb.exception, cb.sent)
+ r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent)
cb.check()
test(r.isSent())
test(r.isCompleted())
@@ -819,7 +819,8 @@ def allTests(communicator, collocated):
b1.opBatch()
b1.opBatch()
cb = FlushCallback(cookie)
- r = b1.ice_getConnection().begin_flushBatchRequests(lambda ex: cb.exceptionWC(ex, cookie),
+ r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy,
+ lambda ex: cb.exceptionWC(ex, cookie),
lambda ss: cb.sentWC(ss, cookie))
cb.check()
test(p.waitForBatch(2))
@@ -832,7 +833,7 @@ def allTests(communicator, collocated):
b1.opBatch()
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait)
cb = FlushExCallback()
- r = b1.ice_getConnection().begin_flushBatchRequests(cb.exception, cb.sent)
+ r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent)
cb.check()
test(not r.isSent())
test(r.isCompleted())
@@ -846,7 +847,8 @@ def allTests(communicator, collocated):
b1.opBatch()
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait)
cb = FlushExCallback(cookie)
- r = b1.ice_getConnection().begin_flushBatchRequests(lambda ex: cb.exceptionWC(ex, cookie),
+ r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy,
+ lambda ex: cb.exceptionWC(ex, cookie),
lambda ss: cb.sentWC(ss, cookie))
cb.check()
test(p.opBatchCount() == 0)
@@ -864,7 +866,7 @@ def allTests(communicator, collocated):
b1.opBatch()
b1.opBatch()
cb = FlushCallback()
- r = communicator.begin_flushBatchRequests(cb.exception, cb.sent)
+ r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent)
cb.check()
test(r.isSent())
test(r.isCompleted())
@@ -878,7 +880,7 @@ def allTests(communicator, collocated):
b1.opBatch()
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait)
cb = FlushCallback()
- r = communicator.begin_flushBatchRequests(cb.exception, cb.sent)
+ r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent)
cb.check()
test(r.isSent()) # Exceptions are ignored!
test(r.isCompleted())
@@ -897,7 +899,7 @@ def allTests(communicator, collocated):
b2.opBatch()
b2.opBatch()
cb = FlushCallback()
- r = communicator.begin_flushBatchRequests(cb.exception, cb.sent)
+ r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent)
cb.check()
test(r.isSent())
test(r.isCompleted())
@@ -918,7 +920,7 @@ def allTests(communicator, collocated):
b2.opBatch()
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait)
cb = FlushCallback()
- r = communicator.begin_flushBatchRequests(cb.exception, cb.sent)
+ r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent)
cb.check()
test(r.isSent()) # Exceptions are ignored!
test(r.isCompleted())
@@ -939,7 +941,7 @@ def allTests(communicator, collocated):
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait)
b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait)
cb = FlushCallback()
- r = communicator.begin_flushBatchRequests(cb.exception, cb.sent)
+ r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent)
cb.check()
test(r.isSent()) # Exceptions are ignored!
test(r.isCompleted())
@@ -1041,7 +1043,7 @@ def allTests(communicator, collocated):
con = p.ice_getConnection()
p2 = p.ice_batchOneway()
p2.ice_ping()
- r = con.begin_flushBatchRequests()
+ r = con.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy)
test(r.getConnection() == con)
test(r.getCommunicator() == communicator)
test(r.getProxy() == None) # Expected
@@ -1052,7 +1054,7 @@ def allTests(communicator, collocated):
#
p2 = p.ice_batchOneway()
p2.ice_ping()
- r = communicator.begin_flushBatchRequests()
+ r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy)
test(r.getConnection() == None) # Expected
test(r.getCommunicator() == communicator)
test(r.getProxy() == None) # Expected
@@ -1191,7 +1193,7 @@ def allTests(communicator, collocated):
#
# This test requires two threads in the server's thread pool: one will block in sleep() and the other
# will process the CloseConnection message.
- #
+ #
p.ice_ping()
con = p.ice_getConnection()
r = p.begin_sleep(1000)
@@ -1232,7 +1234,7 @@ def allTests(communicator, collocated):
#
# Local case: start a lengthy operation and then close the connection forcefully on the client side.
# There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException.
- #
+ #
p.ice_ping()
con = p.ice_getConnection()
r = p.begin_sleep(100)
@@ -1520,7 +1522,7 @@ def allTestsFuture(communicator, collocated):
b1.opBatch()
b1.opBatch()
cb = FutureFlushCallback()
- f = b1.ice_getConnection().flushBatchRequestsAsync()
+ f = b1.ice_getConnection().flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy)
f.add_sent_callback(cb.sent)
cb.check()
f.result() # Wait until finished.
@@ -1533,7 +1535,7 @@ def allTestsFuture(communicator, collocated):
b1.opBatch()
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait)
cb = FutureFlushExCallback()
- f = b1.ice_getConnection().flushBatchRequestsAsync()
+ f = b1.ice_getConnection().flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy)
f.add_done_callback(cb.exception)
f.add_sent_callback(cb.sent)
cb.check()
@@ -1554,7 +1556,7 @@ def allTestsFuture(communicator, collocated):
b1.opBatch()
b1.opBatch()
cb = FutureFlushCallback()
- f = communicator.flushBatchRequestsAsync()
+ f = communicator.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy)
f.add_sent_callback(cb.sent)
cb.check()
f.result() # Wait until finished.
@@ -1570,7 +1572,7 @@ def allTestsFuture(communicator, collocated):
b1.opBatch()
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait)
cb = FutureFlushCallback()
- f = communicator.flushBatchRequestsAsync()
+ f = communicator.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy)
f.add_sent_callback(cb.sent)
cb.check()
f.result() # Wait until finished.
@@ -1591,7 +1593,7 @@ def allTestsFuture(communicator, collocated):
b2.opBatch()
b2.opBatch()
cb = FutureFlushCallback()
- f = communicator.flushBatchRequestsAsync()
+ f = communicator.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy)
f.add_sent_callback(cb.sent)
cb.check()
f.result() # Wait until finished.
@@ -1614,7 +1616,7 @@ def allTestsFuture(communicator, collocated):
b2.opBatch()
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait)
cb = FutureFlushCallback()
- f = communicator.flushBatchRequestsAsync()
+ f = communicator.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy)
f.add_sent_callback(cb.sent)
cb.check()
f.result() # Wait until finished.
@@ -1637,7 +1639,7 @@ def allTestsFuture(communicator, collocated):
b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait)
b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait)
cb = FutureFlushCallback()
- f = communicator.flushBatchRequestsAsync()
+ f = communicator.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy)
f.add_sent_callback(cb.sent)
cb.check()
f.result() # Wait until finished.
@@ -1740,7 +1742,7 @@ def allTestsFuture(communicator, collocated):
con = p.ice_getConnection()
p2 = p.ice_batchOneway()
p2.ice_ping()
- f = con.flushBatchRequestsAsync()
+ f = con.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy)
test(f.connection() == con)
test(f.communicator() == communicator)
test(f.proxy() == None) # Expected
@@ -1751,7 +1753,7 @@ def allTestsFuture(communicator, collocated):
#
p2 = p.ice_batchOneway()
p2.ice_ping()
- f = communicator.flushBatchRequestsAsync()
+ f = communicator.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy)
test(f.connection() == None) # Expected
test(f.communicator() == communicator)
test(f.proxy() == None) # Expected
diff --git a/python/test/Ice/operations/BatchOneways.py b/python/test/Ice/operations/BatchOneways.py
index ea0fc26c29c..3b17cd89954 100644
--- a/python/test/Ice/operations/BatchOneways.py
+++ b/python/test/Ice/operations/BatchOneways.py
@@ -64,6 +64,9 @@ def batchOneways(p):
batch = Test.MyClassPrx.uncheckedCast(p.ice_batchOneway())
batch.ice_flushBatchRequests() # Empty flush
+ if batch.ice_getConnection():
+ batch.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy)
+ batch.ice_getCommunicator().flushBatchRequests(Ice.CompressBatch.BasedOnProxy)
p.opByteSOnewayCallCount() # Reset the call count
diff --git a/ruby/src/IceRuby/Communicator.cpp b/ruby/src/IceRuby/Communicator.cpp
index 232092dd9bd..cb7c24fd9a5 100644
--- a/ruby/src/IceRuby/Communicator.cpp
+++ b/ruby/src/IceRuby/Communicator.cpp
@@ -613,12 +613,22 @@ IceRuby_Communicator_setDefaultLocator(VALUE self, VALUE locator)
extern "C"
VALUE
-IceRuby_Communicator_flushBatchRequests(VALUE self)
+IceRuby_Communicator_flushBatchRequests(VALUE self, VALUE compress)
{
ICE_RUBY_TRY
{
Ice::CommunicatorPtr p = getCommunicator(self);
- p->flushBatchRequests();
+
+ volatile VALUE type = callRuby(rb_path2class, "Ice::CompressBatch");
+ if(callRuby(rb_obj_is_instance_of, compress, type) != Qtrue)
+ {
+ throw RubyException(rb_eTypeError,
+ "value for 'compress' argument must be an enumerator of Ice::CompressBatch");
+ }
+ volatile VALUE compressValue = callRuby(rb_funcall, compress, rb_intern("to_i"), 0);
+ assert(TYPE(compressValue) == T_FIXNUM);
+ Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(FIX2LONG(compressValue));
+ p->flushBatchRequests(cb);
}
ICE_RUBY_CATCH
return Qnil;
@@ -651,7 +661,7 @@ IceRuby::initCommunicator(VALUE iceModule)
rb_define_method(_communicatorClass, "setDefaultRouter", CAST_METHOD(IceRuby_Communicator_setDefaultRouter), 1);
rb_define_method(_communicatorClass, "getDefaultLocator", CAST_METHOD(IceRuby_Communicator_getDefaultLocator), 0);
rb_define_method(_communicatorClass, "setDefaultLocator", CAST_METHOD(IceRuby_Communicator_setDefaultLocator), 1);
- rb_define_method(_communicatorClass, "flushBatchRequests", CAST_METHOD(IceRuby_Communicator_flushBatchRequests), 0);
+ rb_define_method(_communicatorClass, "flushBatchRequests", CAST_METHOD(IceRuby_Communicator_flushBatchRequests), 1);
}
Ice::CommunicatorPtr
diff --git a/ruby/src/IceRuby/Connection.cpp b/ruby/src/IceRuby/Connection.cpp
index 2c5f65a8434..9c0a4b76ed1 100644
--- a/ruby/src/IceRuby/Connection.cpp
+++ b/ruby/src/IceRuby/Connection.cpp
@@ -57,7 +57,7 @@ IceRuby_Connection_close(VALUE self, VALUE mode)
if(callRuby(rb_obj_is_instance_of, mode, type) != Qtrue)
{
throw RubyException(rb_eTypeError,
- "value for 'mode' argument must be an enumerator of Ice.ConnectionClose");
+ "value for 'mode' argument must be an enumerator of Ice::ConnectionClose");
}
volatile VALUE modeValue = callRuby(rb_funcall, mode, rb_intern("to_i"), 0);
assert(TYPE(modeValue) == T_FIXNUM);
@@ -70,14 +70,23 @@ IceRuby_Connection_close(VALUE self, VALUE mode)
extern "C"
VALUE
-IceRuby_Connection_flushBatchRequests(VALUE self)
+IceRuby_Connection_flushBatchRequests(VALUE self, VALUE compress)
{
ICE_RUBY_TRY
{
Ice::ConnectionPtr* p = reinterpret_cast<Ice::ConnectionPtr*>(DATA_PTR(self));
assert(p);
- (*p)->flushBatchRequests();
+ volatile VALUE type = callRuby(rb_path2class, "Ice::CompressBatch");
+ if(callRuby(rb_obj_is_instance_of, compress, type) != Qtrue)
+ {
+ throw RubyException(rb_eTypeError,
+ "value for 'compress' argument must be an enumerator of Ice::CompressBatch");
+ }
+ volatile VALUE compressValue = callRuby(rb_funcall, compress, rb_intern("to_i"), 0);
+ assert(TYPE(compressValue) == T_FIXNUM);
+ Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(FIX2LONG(compressValue));
+ (*p)->flushBatchRequests(cb);
}
ICE_RUBY_CATCH
return Qnil;
@@ -416,7 +425,7 @@ IceRuby::initConnection(VALUE iceModule)
// Instance methods.
//
rb_define_method(_connectionClass, "close", CAST_METHOD(IceRuby_Connection_close), 1);
- rb_define_method(_connectionClass, "flushBatchRequests", CAST_METHOD(IceRuby_Connection_flushBatchRequests), 0);
+ rb_define_method(_connectionClass, "flushBatchRequests", CAST_METHOD(IceRuby_Connection_flushBatchRequests), 1);
rb_define_method(_connectionClass, "heartbeat", CAST_METHOD(IceRuby_Connection_heartbeat), 0);
rb_define_method(_connectionClass, "setACM", CAST_METHOD(IceRuby_Connection_setACM), 3);
rb_define_method(_connectionClass, "getACM", CAST_METHOD(IceRuby_Connection_getACM), 0);
diff --git a/ruby/test/Ice/operations/BatchOneways.rb b/ruby/test/Ice/operations/BatchOneways.rb
index 99b2225f61b..1935c52f1e8 100644
--- a/ruby/test/Ice/operations/BatchOneways.rb
+++ b/ruby/test/Ice/operations/BatchOneways.rb
@@ -13,6 +13,8 @@ def batchOneways(p)
batch = Test::MyClassPrx::uncheckedCast(p.ice_batchOneway())
batch.ice_flushBatchRequests() # Empty flush
+ batch.ice_getConnection().flushBatchRequests(Ice::CompressBatch::BasedOnProxy)
+ batch.ice_getCommunicator().flushBatchRequests(Ice::CompressBatch::BasedOnProxy)
p.opByteSOnewayCallCount() # Reset the call count
@@ -26,7 +28,7 @@ def batchOneways(p)
sleep(0.01)
end
- batch.ice_getConnection().flushBatchRequests()
+ batch.ice_getConnection().flushBatchRequests(Ice::CompressBatch::BasedOnProxy)
batch2 = Test::MyClassPrx::uncheckedCast(p.ice_batchOneway())
diff --git a/scripts/Expect.py b/scripts/Expect.py
index 7413244075d..05d768befab 100755
--- a/scripts/Expect.py
+++ b/scripts/Expect.py
@@ -613,7 +613,7 @@ class Expect (object):
try:
self.wait(timeout)
- if self.mapping == "java":
+ if self.mapping in ["java", "java-compat"]:
if self.killed is not None:
if win32:
test(self.exitstatus, -self.killed)
diff --git a/slice/Ice/Communicator.ice b/slice/Ice/Communicator.ice
index 49821037313..95c0cbadae1 100644
--- a/slice/Ice/Communicator.ice
+++ b/slice/Ice/Communicator.ice
@@ -23,6 +23,7 @@
#include <Ice/Current.ice>
#include <Ice/Properties.ice>
#include <Ice/FacetMap.ice>
+#include <Ice/Connection.ice>
/**
*
@@ -493,8 +494,11 @@ local interface Communicator
* for all connections associated with the communicator.
* Any errors that occur while flushing a connection are ignored.
*
+ * @param compress Specifies whether or not the queued batch requests
+ * should be compressed before being sent over the wire.
+ *
**/
- ["async-oneway"] void flushBatchRequests();
+ ["async-oneway"] void flushBatchRequests(CompressBatch compress);
/**
*
diff --git a/slice/Ice/Connection.ice b/slice/Ice/Connection.ice
index af7b03a5a38..08dd20ef498 100644
--- a/slice/Ice/Connection.ice
+++ b/slice/Ice/Connection.ice
@@ -24,6 +24,30 @@ module Ice
{
/**
+ * The batch compression option when flushing queued batch requests.
+ *
+ **/
+["cpp:unscoped"]
+local enum CompressBatch
+{
+ /**
+ * Compress the batch requests.
+ **/
+ Yes,
+
+ /**
+ * Don't compress the batch requests.
+ **/
+ No,
+
+ /**
+ * Compress the batch requests if at least one request was
+ * made on a compressed proxy.
+ **/
+ BasedOnProxy
+};
+
+/**
*
* Base class providing access to the connection details. *
**/
@@ -238,8 +262,11 @@ local interface Connection
* This means all batch requests invoked on fixed proxies
* associated with the connection.
*
+ * @param compress Specifies whether or not the queued batch requests
+ * should be compressed before being sent over the wire.
+ *
**/
- ["async-oneway"] void flushBatchRequests();
+ ["async-oneway"] void flushBatchRequests(CompressBatch compress);
/**
*