diff options
93 files changed, 1618 insertions, 978 deletions
diff --git a/CHANGELOG-3.7.md b/CHANGELOG-3.7.md index 8a9fa92ec08..d1a8814430b 100644 --- a/CHANGELOG-3.7.md +++ b/CHANGELOG-3.7.md @@ -17,6 +17,12 @@ These are the changes since Ice 3.6.3. ## General Changes +- The communicator and connection flushBatchRequests method now takes + an additional argument to specify whether or not the batch requests + to flush should be compressed. See the documentation of the + Ice::CompressBatch enumeration for the different options available + to specify whether or not the batch should be compressed. + - The UDP server endpoint now supports specifying `--interface *` to join the multicast group on using all the local interfaces. It's also now the default behavior if no `--interface` option is specified. diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h index f401975a569..c82884f5ab9 100644 --- a/cpp/include/Ice/OutgoingAsync.h +++ b/cpp/include/Ice/OutgoingAsync.h @@ -304,92 +304,6 @@ protected: bool _synchronous; }; -// -// Class for handling the proxy's begin_ice_flushBatchRequest request. -// -class ICE_API ProxyFlushBatchAsync : public ProxyOutgoingAsyncBase -{ -public: - - ProxyFlushBatchAsync(const Ice::ObjectPrxPtr&); - - virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool); - virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*); - - void invoke(const std::string&); - -private: - - int _batchRequestNum; -}; -typedef IceUtil::Handle<ProxyFlushBatchAsync> ProxyFlushBatchAsyncPtr; - -// -// Class for handling the proxy's begin_ice_getConnection request. -// -class ICE_API ProxyGetConnection : public ProxyOutgoingAsyncBase -{ -public: - - ProxyGetConnection(const Ice::ObjectPrxPtr&); - - virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool); - virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*); - - virtual Ice::ConnectionPtr getConnection() const; - - void invoke(const std::string&); -}; -typedef IceUtil::Handle<ProxyGetConnection> ProxyGetConnectionPtr; - -// -// Class for handling Ice::Connection::begin_flushBatchRequests -// -class ICE_API ConnectionFlushBatchAsync : public OutgoingAsyncBase -{ -public: - - ConnectionFlushBatchAsync(const Ice::ConnectionIPtr&, const InstancePtr&); - - virtual Ice::ConnectionPtr getConnection() const; - - void invoke(const std::string&); - -private: - - const Ice::ConnectionIPtr _connection; -}; -typedef IceUtil::Handle<ConnectionFlushBatchAsync> ConnectionFlushBatchAsyncPtr; - -// -// Class for handling Ice::Communicator::begin_flushBatchRequests -// -class ICE_API CommunicatorFlushBatchAsync : public OutgoingAsyncBase -{ -public: - - virtual ~CommunicatorFlushBatchAsync(); - - CommunicatorFlushBatchAsync(const InstancePtr&); - - void flushConnection(const Ice::ConnectionIPtr&); - void invoke(const std::string&); - -#ifdef ICE_CPP11_MAPPING - std::shared_ptr<CommunicatorFlushBatchAsync> shared_from_this() - { - return std::static_pointer_cast<CommunicatorFlushBatchAsync>(OutgoingAsyncBase::shared_from_this()); - } -#endif - -private: - - void check(bool); - - int _useCount; - InvocationObserver _observer; -}; - } namespace IceInternal diff --git a/cpp/include/Ice/Proxy.h b/cpp/include/Ice/Proxy.h index e996c1324ab..461c873ccdd 100644 --- a/cpp/include/Ice/Proxy.h +++ b/cpp/include/Ice/Proxy.h @@ -38,6 +38,49 @@ ICE_API extern const Context noExplicitContext; } +namespace IceInternal +{ + +// +// Class for handling the proxy's begin_ice_flushBatchRequest request. +// +class ICE_API ProxyFlushBatchAsync : public ProxyOutgoingAsyncBase +{ +public: + + ProxyFlushBatchAsync(const Ice::ObjectPrxPtr&); + + virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool); + virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*); + + void invoke(const std::string&); + +private: + + int _batchRequestNum; +}; +typedef IceUtil::Handle<ProxyFlushBatchAsync> ProxyFlushBatchAsyncPtr; + +// +// Class for handling the proxy's begin_ice_getConnection request. +// +class ICE_API ProxyGetConnection : public ProxyOutgoingAsyncBase +{ +public: + + ProxyGetConnection(const Ice::ObjectPrxPtr&); + + virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool); + virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*); + + virtual Ice::ConnectionPtr getConnection() const; + + void invoke(const std::string&); +}; +typedef IceUtil::Handle<ProxyGetConnection> ProxyGetConnectionPtr; + +} + #ifdef ICE_CPP11_MAPPING // C++11 mapping namespace IceInternal diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp index 5acc66dbfa9..423b52b4e87 100644 --- a/cpp/src/Glacier2/RequestQueue.cpp +++ b/cpp/src/Glacier2/RequestQueue.cpp @@ -374,7 +374,7 @@ Glacier2::RequestQueue::flush() if(flushBatchRequests) { - Ice::AsyncResultPtr result = _connection->begin_flushBatchRequests(_flushCallback); + Ice::AsyncResultPtr result = _connection->begin_flushBatchRequests(Ice::BasedOnProxy, _flushCallback); if(!result->sentSynchronously() && !result->isCompleted()) { _pendingSend = true; diff --git a/cpp/src/Ice/BatchRequestQueue.cpp b/cpp/src/Ice/BatchRequestQueue.cpp index 5eb2758b00e..50b69a5436e 100644 --- a/cpp/src/Ice/BatchRequestQueue.cpp +++ b/cpp/src/Ice/BatchRequestQueue.cpp @@ -10,6 +10,7 @@ #include <Ice/BatchRequestQueue.h> #include <Ice/Instance.h> #include <Ice/Properties.h> +#include <Ice/Reference.h> using namespace std; using namespace Ice; @@ -34,7 +35,7 @@ public: virtual void enqueue() const { - _queue.enqueueBatchRequest(); + _queue.enqueueBatchRequest(_proxy); } virtual int @@ -70,6 +71,7 @@ BatchRequestQueue::BatchRequestQueue(const InstancePtr& instance, bool datagram) _batchStream(instance.get(), Ice::currentProtocolEncoding), _batchStreamInUse(false), _batchStreamCanFlush(false), + _batchCompress(false), _batchRequestNum(0) { _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); @@ -101,7 +103,9 @@ BatchRequestQueue::prepareBatchRequest(OutputStream* os) } void -BatchRequestQueue::finishBatchRequest(OutputStream* os, const Ice::ObjectPrxPtr& proxy, const std::string& operation) +BatchRequestQueue::finishBatchRequest(OutputStream* os, + const Ice::ObjectPrxPtr& proxy, + const std::string& operation) { // // No need for synchronization, no other threads are supposed @@ -135,6 +139,11 @@ BatchRequestQueue::finishBatchRequest(OutputStream* os, const Ice::ObjectPrxPtr& } else { + bool compress; + if(proxy->_getReference()->getCompressOverride(compress)) + { + _batchCompress |= compress; + } _batchMarker = _batchStream.b.size(); ++_batchRequestNum; } @@ -170,7 +179,7 @@ BatchRequestQueue::abortBatchRequest(OutputStream* os) } int -BatchRequestQueue::swap(OutputStream* os) +BatchRequestQueue::swap(OutputStream* os, bool& compress) { Lock sync(*this); if(_batchRequestNum == 0) @@ -189,11 +198,13 @@ BatchRequestQueue::swap(OutputStream* os) int requestNum = _batchRequestNum; _batchStream.swap(*os); + compress = _batchCompress; // // Reset the batch. // _batchRequestNum = 0; + _batchCompress = false; _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); _batchMarker = _batchStream.b.size(); if(!lastRequest.empty()) @@ -231,9 +242,14 @@ BatchRequestQueue::waitStreamInUse(bool flush) } void -BatchRequestQueue::enqueueBatchRequest() +BatchRequestQueue::enqueueBatchRequest(const Ice::ObjectPrxPtr& proxy) { assert(_batchMarker < _batchStream.b.size()); + bool compress; + if(proxy->_getReference()->getCompressOverride(compress)) + { + _batchCompress |= compress; + } _batchMarker = _batchStream.b.size(); ++_batchRequestNum; } diff --git a/cpp/src/Ice/BatchRequestQueue.h b/cpp/src/Ice/BatchRequestQueue.h index 878b2a2ef6d..9bdc2b90790 100644 --- a/cpp/src/Ice/BatchRequestQueue.h +++ b/cpp/src/Ice/BatchRequestQueue.h @@ -33,12 +33,12 @@ public: void finishBatchRequest(Ice::OutputStream*, const Ice::ObjectPrxPtr&, const std::string&); void abortBatchRequest(Ice::OutputStream*); - int swap(Ice::OutputStream*); + int swap(Ice::OutputStream*, bool&); void destroy(const Ice::LocalException&); bool isEmpty(); - void enqueueBatchRequest(); + void enqueueBatchRequest(const Ice::ObjectPrxPtr&); private: @@ -52,6 +52,7 @@ private: Ice::OutputStream _batchStream; bool _batchStreamInUse; bool _batchStreamCanFlush; + bool _batchCompress; int _batchRequestNum; size_t _batchMarker; IceInternal::UniquePtr<Ice::LocalException> _exception; diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp index c47a3299c19..9b9e00ce239 100644 --- a/cpp/src/Ice/CommunicatorI.cpp +++ b/cpp/src/Ice/CommunicatorI.cpp @@ -29,6 +29,168 @@ using namespace std; using namespace Ice; using namespace IceInternal; +#ifndef ICE_CPP11_MAPPING +IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; } +#endif + +CommunicatorFlushBatchAsync::~CommunicatorFlushBatchAsync() +{ + // Out of line to avoid weak vtable +} + +CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const InstancePtr& instance) : + OutgoingAsyncBase(instance) +{ + // + // _useCount is initialized to 1 to prevent premature callbacks. + // The caller must invoke ready() after all flush requests have + // been initiated. + // + _useCount = 1; +} + +void +CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con, Ice::CompressBatch compressBatch) +{ + class FlushBatch : public OutgoingAsyncBase + { + public: + + FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync, + const InstancePtr& instance, + InvocationObserver& observer) : + OutgoingAsyncBase(instance), _outAsync(outAsync), _observer(observer) + { + } + + virtual bool + sent() + { + _childObserver.detach(); + _outAsync->check(false); + return false; + } + + virtual bool + exception(const Exception& ex) + { + _childObserver.failed(ex.ice_id()); + _childObserver.detach(); + _outAsync->check(false); + return false; + } + + virtual InvocationObserver& + getObserver() + { + return _observer; + } + + virtual bool handleSent(bool, bool) + { + return false; + } + + virtual bool handleException(const Ice::Exception&) + { + return false; + } + + virtual bool handleResponse(bool) + { + return false; + } + + virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const + { + assert(false); + } + + virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const + { + assert(false); + } + + virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const + { + assert(false); + } + + private: + + const CommunicatorFlushBatchAsyncPtr _outAsync; + InvocationObserver& _observer; + }; + + { + Lock sync(_m); + ++_useCount; + } + + try + { + OutgoingAsyncBasePtr flushBatch = ICE_MAKE_SHARED(FlushBatch, ICE_SHARED_FROM_THIS, _instance, _observer); + bool compress; + int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs(), compress); + if(batchRequestNum == 0) + { + flushBatch->sent(); + } + else + { + if(compressBatch == Ice::Yes) + { + compress = true; + } + else if(compressBatch == Ice::No) + { + compress = false; + } + con->sendAsyncRequest(flushBatch, compress, false, batchRequestNum); + } + } + catch(const LocalException&) + { + check(false); + throw; + } +} + +void +CommunicatorFlushBatchAsync::invoke(const string& operation, CompressBatch compressBatch) +{ + _observer.attach(_instance.get(), operation); + _instance->outgoingConnectionFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS, compressBatch); + _instance->objectAdapterFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS, compressBatch); + check(true); +} + +void +CommunicatorFlushBatchAsync::check(bool userThread) +{ + { + Lock sync(_m); + assert(_useCount > 0); + if(--_useCount > 0) + { + return; + } + } + + if(sentImpl(true)) + { + if(userThread) + { + _sentSynchronously = true; + invokeSent(); + } + else + { + invokeSentAsync(); + } + } +} + void Ice::CommunicatorI::destroy() ICE_NOEXCEPT { @@ -210,13 +372,15 @@ const ::std::string flushBatchRequests_name = "flushBatchRequests"; #ifdef ICE_CPP11_MAPPING void -Ice::CommunicatorI::flushBatchRequests() +Ice::CommunicatorI::flushBatchRequests(CompressBatch compress) { - Communicator::flushBatchRequestsAsync().get(); + Communicator::flushBatchRequestsAsync(compress).get(); } ::std::function<void()> -Ice::CommunicatorI::flushBatchRequestsAsync(function<void(exception_ptr)> ex, function<void(bool)> sent) +Ice::CommunicatorI::flushBatchRequestsAsync(CompressBatch compress, + function<void(exception_ptr)> ex, + function<void(bool)> sent) { class CommunicatorFlushBatchLambda : public CommunicatorFlushBatchAsync, public LambdaInvoke { @@ -230,39 +394,44 @@ Ice::CommunicatorI::flushBatchRequestsAsync(function<void(exception_ptr)> ex, fu } }; auto outAsync = make_shared<CommunicatorFlushBatchLambda>(_instance, ex, sent); - outAsync->invoke(flushBatchRequests_name); + outAsync->invoke(flushBatchRequests_name, compress); return [outAsync]() { outAsync->cancel(); }; } #else void -Ice::CommunicatorI::flushBatchRequests() +Ice::CommunicatorI::flushBatchRequests(CompressBatch compress) { - end_flushBatchRequests(begin_flushBatchRequests()); + end_flushBatchRequests(begin_flushBatchRequests(compress)); } AsyncResultPtr -Ice::CommunicatorI::begin_flushBatchRequests() +Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress) { - return _iceI_begin_flushBatchRequests(::IceInternal::dummyCallback, 0); + return _iceI_begin_flushBatchRequests(compress, ::IceInternal::dummyCallback, 0); } AsyncResultPtr -Ice::CommunicatorI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie) +Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress, + const CallbackPtr& cb, + const LocalObjectPtr& cookie) { - return _iceI_begin_flushBatchRequests(cb, cookie); + return _iceI_begin_flushBatchRequests(compress, cb, cookie); } AsyncResultPtr -Ice::CommunicatorI::begin_flushBatchRequests(const Callback_Communicator_flushBatchRequestsPtr& cb, +Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress, + const Callback_Communicator_flushBatchRequestsPtr& cb, const LocalObjectPtr& cookie) { - return _iceI_begin_flushBatchRequests(cb, cookie); + return _iceI_begin_flushBatchRequests(compress, cb, cookie); } AsyncResultPtr -Ice::CommunicatorI::_iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr& cb, const LocalObjectPtr& cookie) +Ice::CommunicatorI::_iceI_begin_flushBatchRequests(CompressBatch compress, + const IceInternal::CallbackBasePtr& cb, + const LocalObjectPtr& cookie) { class CommunicatorFlushBatchAsyncWithCallback : public CommunicatorFlushBatchAsync, public CallbackCompletion { @@ -294,7 +463,7 @@ Ice::CommunicatorI::_iceI_begin_flushBatchRequests(const IceInternal::CallbackBa }; CommunicatorFlushBatchAsyncPtr result = new CommunicatorFlushBatchAsyncWithCallback(this, _instance, cb, cookie); - result->invoke(flushBatchRequests_name); + result->invoke(flushBatchRequests_name, compress); return result; } diff --git a/cpp/src/Ice/CommunicatorI.h b/cpp/src/Ice/CommunicatorI.h index c1ef6279068..5c9035261f8 100644 --- a/cpp/src/Ice/CommunicatorI.h +++ b/cpp/src/Ice/CommunicatorI.h @@ -16,6 +16,41 @@ #include <Ice/Initialize.h> #include <Ice/Communicator.h> #include <Ice/CommunicatorAsync.h> +#include <Ice/OutgoingAsync.h> + +namespace IceInternal +{ + +// +// Class for handling Ice::Communicator::begin_flushBatchRequests +// +class CommunicatorFlushBatchAsync : public OutgoingAsyncBase +{ +public: + + virtual ~CommunicatorFlushBatchAsync(); + + CommunicatorFlushBatchAsync(const InstancePtr&); + + void flushConnection(const Ice::ConnectionIPtr&, Ice::CompressBatch); + void invoke(const std::string&, Ice::CompressBatch); + +#ifdef ICE_CPP11_MAPPING + std::shared_ptr<CommunicatorFlushBatchAsync> shared_from_this() + { + return std::static_pointer_cast<CommunicatorFlushBatchAsync>(OutgoingAsyncBase::shared_from_this()); + } +#endif + +private: + + void check(bool); + + int _useCount; + InvocationObserver _observer; +}; + +} namespace Ice { @@ -68,16 +103,18 @@ public: virtual ValueFactoryManagerPtr getValueFactoryManager() const; - virtual void flushBatchRequests(); + virtual void flushBatchRequests(CompressBatch); #ifdef ICE_CPP11_MAPPING virtual ::std::function<void()> - flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)>, + flushBatchRequestsAsync(CompressBatch, + ::std::function<void(::std::exception_ptr)>, ::std::function<void(bool)> = nullptr); #else - virtual AsyncResultPtr begin_flushBatchRequests(); - virtual AsyncResultPtr begin_flushBatchRequests(const CallbackPtr&, const LocalObjectPtr& = 0); - virtual AsyncResultPtr begin_flushBatchRequests(const Callback_Communicator_flushBatchRequestsPtr&, + virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch); + virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch, const CallbackPtr&, const LocalObjectPtr& = 0); + virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch, + const Callback_Communicator_flushBatchRequestsPtr&, const LocalObjectPtr& = 0); virtual void end_flushBatchRequests(const AsyncResultPtr&); @@ -113,7 +150,9 @@ private: friend ICE_API ::IceUtil::TimerPtr IceInternal::getInstanceTimer(const ::Ice::CommunicatorPtr&); #ifndef ICE_CPP11_MAPPING - AsyncResultPtr _iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); + AsyncResultPtr _iceI_begin_flushBatchRequests(CompressBatch, + const IceInternal::CallbackBasePtr&, + const LocalObjectPtr&); #endif const ::IceInternal::InstancePtr _instance; diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 755ab969003..0234c1c33dd 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -25,6 +25,7 @@ #include <Ice/LocalException.h> #include <Ice/Functional.h> #include <Ice/OutgoingAsync.h> +#include <Ice/CommunicatorI.h> #include <IceUtil/Random.h> #include <iterator> @@ -223,7 +224,8 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() } void -IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore, +IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, + bool hasMore, Ice::EndpointSelectionType selType, const CreateConnectionCallbackPtr& callback) { @@ -335,7 +337,8 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad } void -IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync) +IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync, + Ice::CompressBatch compress) { list<ConnectionIPtr> c; @@ -355,7 +358,7 @@ IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const Communicat { try { - outAsync->flushConnection(*p); + outAsync->flushConnection(*p, compress); } catch(const LocalException&) { @@ -1276,7 +1279,8 @@ IceInternal::IncomingConnectionFactory::connections() const } void -IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync) +IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync, + Ice::CompressBatch compress) { list<ConnectionIPtr> c = connections(); // connections() is synchronized, so no need to synchronize here. @@ -1284,7 +1288,7 @@ IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const Communicat { try { - outAsync->flushConnection(*p); + outAsync->flushConnection(*p, compress); } catch(const LocalException&) { diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index 69331e04f77..b8058ab593e 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -66,7 +66,7 @@ public: void create(const std::vector<EndpointIPtr>&, bool, Ice::EndpointSelectionType, const CreateConnectionCallbackPtr&); void setRouterInfo(const RouterInfoPtr&); void removeAdapter(const Ice::ObjectAdapterPtr&); - void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&); + void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&, Ice::CompressBatch); OutgoingConnectionFactory(const Ice::CommunicatorPtr&, const InstancePtr&); virtual ~OutgoingConnectionFactory(); @@ -191,7 +191,7 @@ public: EndpointIPtr endpoint() const; std::list<Ice::ConnectionIPtr> connections() const; - void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&); + void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&, Ice::CompressBatch); // // Operations from EventHandler diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 23388d399c6..598f4c983d3 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -133,6 +133,25 @@ private: const bool _close; }; +// +// Class for handling Ice::Connection::begin_flushBatchRequests +// +class ConnectionFlushBatchAsync : public OutgoingAsyncBase +{ +public: + + ConnectionFlushBatchAsync(const Ice::ConnectionIPtr&, const InstancePtr&); + + virtual Ice::ConnectionPtr getConnection() const; + + void invoke(const std::string&, Ice::CompressBatch); + +private: + + const Ice::ConnectionIPtr _connection; +}; +typedef IceUtil::Handle<ConnectionFlushBatchAsync> ConnectionFlushBatchAsyncPtr; + ConnectionState connectionStateMap[] = { ConnectionStateValidating, // StateNotInitialized ConnectionStateValidating, // StateNotValidated @@ -146,6 +165,72 @@ ConnectionState connectionStateMap[] = { } +ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, const InstancePtr& instance) : + OutgoingAsyncBase(instance), _connection(connection) +{ +} + +ConnectionPtr +ConnectionFlushBatchAsync::getConnection() const +{ + return _connection; +} + +void +ConnectionFlushBatchAsync::invoke(const string& operation, Ice::CompressBatch compressBatch) +{ + _observer.attach(_instance.get(), operation); + try + { + AsyncStatus status; + bool compress; + int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os, compress); + if(batchRequestNum == 0) + { + status = AsyncStatusSent; + if(sent()) + { + status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback); + } + } + else + { + if(compressBatch == Ice::Yes) + { + compress = true; + } + else if(compressBatch == Ice::No) + { + compress = false; + } + status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, batchRequestNum); + } + + if(status & AsyncStatusSent) + { + _sentSynchronously = true; + if(status & AsyncStatusInvokeSentCallback) + { + invokeSent(); + } + } + } + catch(const RetryException& ex) + { + if(exception(*ex.get())) + { + invokeExceptionAsync(); + } + } + catch(const Exception& ex) + { + if(exception(ex)) + { + invokeExceptionAsync(); + } + } +} + Ice::ConnectionI::Observer::Observer() : _readStreamPos(0), _writeStreamPos(0) { } @@ -698,13 +783,14 @@ Ice::ConnectionI::getBatchRequestQueue() const #ifdef ICE_CPP11_MAPPING void -Ice::ConnectionI::flushBatchRequests() +Ice::ConnectionI::flushBatchRequests(CompressBatch compress) { - Connection::flushBatchRequestsAsync().get(); + Connection::flushBatchRequestsAsync(compress).get(); } std::function<void()> -Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)> ex, +Ice::ConnectionI::flushBatchRequestsAsync(CompressBatch compress, + ::std::function<void(::std::exception_ptr)> ex, ::std::function<void(bool)> sent) { class ConnectionFlushBatchLambda : public ConnectionFlushBatchAsync, public LambdaInvoke @@ -720,37 +806,42 @@ Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_ } }; auto outAsync = make_shared<ConnectionFlushBatchLambda>(ICE_SHARED_FROM_THIS, _instance, ex, sent); - outAsync->invoke(flushBatchRequests_name); + outAsync->invoke(flushBatchRequests_name, compress); return [outAsync]() { outAsync->cancel(); }; } #else void -Ice::ConnectionI::flushBatchRequests() +Ice::ConnectionI::flushBatchRequests(CompressBatch compress) { - end_flushBatchRequests(begin_flushBatchRequests()); + end_flushBatchRequests(begin_flushBatchRequests(compress)); } AsyncResultPtr -Ice::ConnectionI::begin_flushBatchRequests() +Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress) { - return _iceI_begin_flushBatchRequests(dummyCallback, 0); + return _iceI_begin_flushBatchRequests(compress, dummyCallback, 0); } AsyncResultPtr -Ice::ConnectionI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie) +Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress, + const CallbackPtr& cb, + const LocalObjectPtr& cookie) { - return _iceI_begin_flushBatchRequests(cb, cookie); + return _iceI_begin_flushBatchRequests(compress, cb, cookie); } AsyncResultPtr -Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchRequestsPtr& cb, +Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress, + const Callback_Connection_flushBatchRequestsPtr& cb, const LocalObjectPtr& cookie) { - return _iceI_begin_flushBatchRequests(cb, cookie); + return _iceI_begin_flushBatchRequests(compress, cb, cookie); } AsyncResultPtr -Ice::ConnectionI::_iceI_begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie) +Ice::ConnectionI::_iceI_begin_flushBatchRequests(CompressBatch compress, + const CallbackBasePtr& cb, + const LocalObjectPtr& cookie) { class ConnectionFlushBatchAsyncWithCallback : public ConnectionFlushBatchAsync, public CallbackCompletion { @@ -791,8 +882,12 @@ Ice::ConnectionI::_iceI_begin_flushBatchRequests(const CallbackBasePtr& cb, cons Ice::ConnectionPtr _connection; }; - ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this, _communicator, _instance, cb, cookie); - result->invoke(flushBatchRequests_name); + ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this, + _communicator, + _instance, + cb, + cookie); + result->invoke(flushBatchRequests_name, compress); return result; } diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 7fb3781e880..bd6398362df 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -175,16 +175,18 @@ public: IceInternal::BatchRequestQueuePtr getBatchRequestQueue() const; - virtual void flushBatchRequests(); + virtual void flushBatchRequests(CompressBatch); #ifdef ICE_CPP11_MAPPING virtual std::function<void()> - flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)>, - ::std::function<void(bool)> = nullptr); + flushBatchRequestsAsync(CompressBatch, + ::std::function<void(::std::exception_ptr)>, + ::std::function<void(bool)> = nullptr); #else - virtual AsyncResultPtr begin_flushBatchRequests(); - virtual AsyncResultPtr begin_flushBatchRequests(const CallbackPtr&, const LocalObjectPtr& = 0); - virtual AsyncResultPtr begin_flushBatchRequests(const Callback_Connection_flushBatchRequestsPtr&, + virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch); + virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch, const CallbackPtr&, const LocalObjectPtr& = 0); + virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch, + const Callback_Connection_flushBatchRequestsPtr&, const LocalObjectPtr& = 0); virtual void end_flushBatchRequests(const AsyncResultPtr&); @@ -320,7 +322,9 @@ private: void reap(); #ifndef ICE_CPP11_MAPPING - AsyncResultPtr _iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); + AsyncResultPtr _iceI_begin_flushBatchRequests(CompressBatch, + const IceInternal::CallbackBasePtr&, + const LocalObjectPtr&); AsyncResultPtr _iceI_begin_heartbeat(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); #endif diff --git a/cpp/src/Ice/ObjectAdapterFactory.cpp b/cpp/src/Ice/ObjectAdapterFactory.cpp index 47398bc8720..2e9dd66ff0c 100644 --- a/cpp/src/Ice/ObjectAdapterFactory.cpp +++ b/cpp/src/Ice/ObjectAdapterFactory.cpp @@ -134,7 +134,7 @@ IceInternal::ObjectAdapterFactory::updateObservers(void (ObjectAdapterI::*fn)()) adapters = _adapters; } #ifdef ICE_CPP11_MAPPING - for_each(adapters.begin(), adapters.end(), + for_each(adapters.begin(), adapters.end(), [fn](const ObjectAdapterIPtr& adapter) { (adapter.get() ->* fn)(); @@ -231,7 +231,8 @@ IceInternal::ObjectAdapterFactory::removeObjectAdapter(const ObjectAdapterPtr& a } void -IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync) const +IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync, + CompressBatch compressBatch) const { list<ObjectAdapterIPtr> adapters; { @@ -242,7 +243,7 @@ IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorFlu for(list<ObjectAdapterIPtr>::const_iterator p = adapters.begin(); p != adapters.end(); ++p) { - (*p)->flushAsyncBatchRequests(outAsync); + (*p)->flushAsyncBatchRequests(outAsync, compressBatch); } } diff --git a/cpp/src/Ice/ObjectAdapterFactory.h b/cpp/src/Ice/ObjectAdapterFactory.h index 93a6aede7df..523ec46b304 100644 --- a/cpp/src/Ice/ObjectAdapterFactory.h +++ b/cpp/src/Ice/ObjectAdapterFactory.h @@ -38,7 +38,7 @@ public: ::Ice::ObjectAdapterPtr createObjectAdapter(const std::string&, const Ice::RouterPrxPtr&); ::Ice::ObjectAdapterPtr findObjectAdapter(const ::Ice::ObjectPrxPtr&); void removeObjectAdapter(const ::Ice::ObjectAdapterPtr&); - void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&) const; + void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&, ::Ice::CompressBatch) const; ObjectAdapterFactory(const InstancePtr&, const ::Ice::CommunicatorPtr&); virtual ~ObjectAdapterFactory(); diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp index ac60baa4a0c..e7c6e529095 100644 --- a/cpp/src/Ice/ObjectAdapterI.cpp +++ b/cpp/src/Ice/ObjectAdapterI.cpp @@ -9,6 +9,7 @@ #include <Ice/UUID.h> #include <Ice/ObjectAdapterI.h> +#include <Ice/CommunicatorI.h> #include <Ice/ObjectAdapterFactory.h> #include <Ice/Instance.h> #include <Ice/Proxy.h> @@ -786,7 +787,7 @@ Ice::ObjectAdapterI::isLocal(const ObjectPrxPtr& proxy) const } void -Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync) +Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync, CompressBatch compress) { vector<IncomingConnectionFactoryPtr> f; { @@ -796,7 +797,7 @@ Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPt for(vector<IncomingConnectionFactoryPtr>::const_iterator p = f.begin(); p != f.end(); ++p) { - (*p)->flushAsyncBatchRequests(outAsync); + (*p)->flushAsyncBatchRequests(outAsync, compress); } } diff --git a/cpp/src/Ice/ObjectAdapterI.h b/cpp/src/Ice/ObjectAdapterI.h index 855d836f471..d86ce43235c 100644 --- a/cpp/src/Ice/ObjectAdapterI.h +++ b/cpp/src/Ice/ObjectAdapterI.h @@ -92,7 +92,7 @@ public: bool isLocal(const ObjectPrxPtr&) const; - void flushAsyncBatchRequests(const IceInternal::CommunicatorFlushBatchAsyncPtr&); + void flushAsyncBatchRequests(const IceInternal::CommunicatorFlushBatchAsyncPtr&, CompressBatch); void updateConnectionObservers(); void updateThreadObservers(); diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index bd82a31e9a5..15a2d819260 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -29,7 +29,6 @@ using namespace IceInternal; IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; } IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; } IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; } #endif const unsigned char OutgoingAsyncBase::OK = 0x1; @@ -1181,298 +1180,6 @@ OutgoingAsync::throwUserException() #endif -ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy) : ProxyOutgoingAsyncBase(proxy) -{ -} - -AsyncStatus -ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool) -{ - if(_batchRequestNum == 0) - { - if(sent()) - { - return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback); - } - else - { - return AsyncStatusSent; - } - } - _cachedConnection = connection; - return connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, _batchRequestNum); -} - -AsyncStatus -ProxyFlushBatchAsync::invokeCollocated(CollocatedRequestHandler* handler) -{ - if(_batchRequestNum == 0) - { - if(sent()) - { - return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback); - } - else - { - return AsyncStatusSent; - } - } - return handler->invokeAsyncRequest(this, _batchRequestNum, false); -} - -void -ProxyFlushBatchAsync::invoke(const string& operation) -{ - checkSupportedProtocol(getCompatibleProtocol(_proxy->_getReference()->getProtocol())); - _observer.attach(_proxy, operation, ::Ice::noExplicitContext); - _batchRequestNum = _proxy->_getBatchRequestQueue()->swap(&_os); - invokeImpl(true); // userThread = true -} - -ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx) : ProxyOutgoingAsyncBase(prx) -{ -} - -AsyncStatus -ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool) -{ - _cachedConnection = connection; - if(responseImpl(true)) - { - invokeResponseAsync(); - } - return AsyncStatusSent; -} - -AsyncStatus -ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*) -{ - if(responseImpl(true)) - { - invokeResponseAsync(); - } - return AsyncStatusSent; -} - -Ice::ConnectionPtr -ProxyGetConnection::getConnection() const -{ - return _cachedConnection; -} - -void -ProxyGetConnection::invoke(const string& operation) -{ - _observer.attach(_proxy, operation, ::Ice::noExplicitContext); - invokeImpl(true); // userThread = true -} - -ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, const InstancePtr& instance) : - OutgoingAsyncBase(instance), _connection(connection) -{ -} - -ConnectionPtr -ConnectionFlushBatchAsync::getConnection() const -{ - return _connection; -} - -void -ConnectionFlushBatchAsync::invoke(const string& operation) -{ - _observer.attach(_instance.get(), operation); - try - { - AsyncStatus status; - int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os); - if(batchRequestNum == 0) - { - status = AsyncStatusSent; - if(sent()) - { - status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback); - } - } - else - { - status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, false, false, batchRequestNum); - } - - if(status & AsyncStatusSent) - { - _sentSynchronously = true; - if(status & AsyncStatusInvokeSentCallback) - { - invokeSent(); - } - } - } - catch(const RetryException& ex) - { - if(exception(*ex.get())) - { - invokeExceptionAsync(); - } - } - catch(const Exception& ex) - { - if(exception(ex)) - { - invokeExceptionAsync(); - } - } -} - -CommunicatorFlushBatchAsync::~CommunicatorFlushBatchAsync() -{ - // Out of line to avoid weak vtable -} - -CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const InstancePtr& instance) : - OutgoingAsyncBase(instance) -{ - // - // _useCount is initialized to 1 to prevent premature callbacks. - // The caller must invoke ready() after all flush requests have - // been initiated. - // - _useCount = 1; -} - -void -CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con) -{ - class FlushBatch : public OutgoingAsyncBase - { - public: - - FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync, - const InstancePtr& instance, - InvocationObserver& observer) : - OutgoingAsyncBase(instance), _outAsync(outAsync), _observer(observer) - { - } - - virtual bool - sent() - { - _childObserver.detach(); - _outAsync->check(false); - return false; - } - - virtual bool - exception(const Exception& ex) - { - _childObserver.failed(ex.ice_id()); - _childObserver.detach(); - _outAsync->check(false); - return false; - } - - virtual InvocationObserver& - getObserver() - { - return _observer; - } - - virtual bool handleSent(bool, bool) - { - return false; - } - - virtual bool handleException(const Ice::Exception&) - { - return false; - } - - virtual bool handleResponse(bool) - { - return false; - } - - virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const - { - assert(false); - } - - virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const - { - assert(false); - } - - virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const - { - assert(false); - } - - private: - - const CommunicatorFlushBatchAsyncPtr _outAsync; - InvocationObserver& _observer; - }; - - { - Lock sync(_m); - ++_useCount; - } - - try - { - OutgoingAsyncBasePtr flushBatch = ICE_MAKE_SHARED(FlushBatch, ICE_SHARED_FROM_THIS, _instance, _observer); - int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs()); - if(batchRequestNum == 0) - { - flushBatch->sent(); - } - else - { - con->sendAsyncRequest(flushBatch, false, false, batchRequestNum); - } - } - catch(const LocalException&) - { - check(false); - throw; - } -} - -void -CommunicatorFlushBatchAsync::invoke(const string& operation) -{ - _observer.attach(_instance.get(), operation); - _instance->outgoingConnectionFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS); - _instance->objectAdapterFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS); - check(true); -} - -void -CommunicatorFlushBatchAsync::check(bool userThread) -{ - { - Lock sync(_m); - assert(_useCount > 0); - if(--_useCount > 0) - { - return; - } - } - - if(sentImpl(true)) - { - if(userThread) - { - _sentSynchronously = true; - invokeSent(); - } - else - { - invokeSentAsync(); - } - } -} - #ifdef ICE_CPP11_MAPPING bool diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index 4c36e74e369..2535c05c1e4 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -14,6 +14,7 @@ #include <Ice/ObjectAdapterFactory.h> #include <Ice/OutgoingAsync.h> #include <Ice/Reference.h> +#include <Ice/CollocatedRequestHandler.h> #include <Ice/EndpointI.h> #include <Ice/Instance.h> #include <Ice/RouterInfo.h> @@ -48,6 +49,93 @@ const string ice_flushBatchRequests_name = "ice_flushBatchRequests"; } +ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy) : ProxyOutgoingAsyncBase(proxy) +{ +} + +AsyncStatus +ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool) +{ + if(_batchRequestNum == 0) + { + if(sent()) + { + return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback); + } + else + { + return AsyncStatusSent; + } + } + _cachedConnection = connection; + return connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, _batchRequestNum); +} + +AsyncStatus +ProxyFlushBatchAsync::invokeCollocated(CollocatedRequestHandler* handler) +{ + if(_batchRequestNum == 0) + { + if(sent()) + { + return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback); + } + else + { + return AsyncStatusSent; + } + } + return handler->invokeAsyncRequest(this, _batchRequestNum, false); +} + +void +ProxyFlushBatchAsync::invoke(const string& operation) +{ + checkSupportedProtocol(getCompatibleProtocol(_proxy->_getReference()->getProtocol())); + _observer.attach(_proxy, operation, ::Ice::noExplicitContext); + bool compress; // Ignore for proxy flushBatchRequests + _batchRequestNum = _proxy->_getBatchRequestQueue()->swap(&_os, compress); + invokeImpl(true); // userThread = true +} + +ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx) : ProxyOutgoingAsyncBase(prx) +{ +} + +AsyncStatus +ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool) +{ + _cachedConnection = connection; + if(responseImpl(true)) + { + invokeResponseAsync(); + } + return AsyncStatusSent; +} + +AsyncStatus +ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*) +{ + if(responseImpl(true)) + { + invokeResponseAsync(); + } + return AsyncStatusSent; +} + +Ice::ConnectionPtr +ProxyGetConnection::getConnection() const +{ + return _cachedConnection; +} + +void +ProxyGetConnection::invoke(const string& operation) +{ + _observer.attach(_proxy, operation, ::Ice::noExplicitContext); + invokeImpl(true); // userThread = true +} + #ifdef ICE_CPP11_MAPPING // C++11 mapping namespace Ice diff --git a/cpp/src/Ice/Reference.cpp b/cpp/src/Ice/Reference.cpp index 188ad33e5ee..31cd7631479 100644 --- a/cpp/src/Ice/Reference.cpp +++ b/cpp/src/Ice/Reference.cpp @@ -174,6 +174,25 @@ IceInternal::Reference::changeCompress(bool newCompress) const return r; } +bool +IceInternal::Reference::getCompressOverride(bool& compress) const +{ + DefaultsAndOverridesPtr defaultsAndOverrides = getInstance()->defaultsAndOverrides(); + if(defaultsAndOverrides->overrideCompress) + { + compress = defaultsAndOverrides->overrideCompressValue; + } + else if(_overrideCompress) + { + compress = _compress; + } + else + { + return false; + } + return true; +} + Int Reference::hash() const { @@ -791,7 +810,7 @@ IceInternal::FixedReference::getRequestHandler(const Ice::ObjectPrxPtr& proxy) c _fixedConnection->throwException(); // Throw in case our connection is already destroyed. - bool compress; + bool compress = false; if(defaultsAndOverrides->overrideCompress) { compress = defaultsAndOverrides->overrideCompressValue; @@ -800,10 +819,6 @@ IceInternal::FixedReference::getRequestHandler(const Ice::ObjectPrxPtr& proxy) c { compress = _compress; } - else - { - compress = _fixedConnection->endpoint()->compress(); - } ReferencePtr ref = const_cast<FixedReference*>(this); return proxy->_setRequestHandler(ICE_MAKE_SHARED(ConnectionRequestHandler, ref, _fixedConnection, compress)); diff --git a/cpp/src/Ice/Reference.h b/cpp/src/Ice/Reference.h index a48a61c66e4..7a19ab3525f 100644 --- a/cpp/src/Ice/Reference.h +++ b/cpp/src/Ice/Reference.h @@ -41,7 +41,7 @@ class Reference : public IceUtil::Shared { public: - class GetConnectionCallback + class GetConnectionCallback #ifndef ICE_CPP11_MAPPING : public virtual IceUtil::Shared #endif @@ -116,6 +116,8 @@ public: int hash() const; // Conceptually const. + bool getCompressOverride(bool&) const; + // // Utility methods. // diff --git a/cpp/src/slice2cpp/Gen.cpp b/cpp/src/slice2cpp/Gen.cpp index 7beb1fb7371..0173509b925 100644 --- a/cpp/src/slice2cpp/Gen.cpp +++ b/cpp/src/slice2cpp/Gen.cpp @@ -6620,7 +6620,7 @@ Slice::Gen::Cpp11LocalObjectVisitor::visitOperation(const OperationPtr& p) { vector<string> paramsDeclAMI; vector<string> outParamsDeclAMI; - + vector<string> paramsArgAMI; ParamDeclList paramList = p->parameters(); for(ParamDeclList::const_iterator r = paramList.begin(); r != paramList.end(); ++r) { @@ -6632,6 +6632,7 @@ Slice::Gen::Cpp11LocalObjectVisitor::visitOperation(const OperationPtr& p) { typeString = inputTypeToString((*r)->type(), (*r)->optional(), metaData, typeCtx); paramsDeclAMI.push_back(typeString + ' ' + paramName); + paramsArgAMI.push_back(paramName); } } @@ -6663,11 +6664,11 @@ Slice::Gen::Cpp11LocalObjectVisitor::visitOperation(const OperationPtr& p) H << nl << name << "Async("; H.useCurrentPosAsIndent(); - for(vector<string>::const_iterator i = paramsDeclAMI.begin(); i != paramsDeclAMI.end(); ++i) + for(vector<string>::const_iterator i = paramsArgAMI.begin(); i != paramsArgAMI.end(); ++i) { H << *i << ","; } - if(!paramsDeclAMI.empty()) + if(!paramsArgAMI.empty()) { H << nl; } diff --git a/cpp/src/slice2objc/Gen.cpp b/cpp/src/slice2objc/Gen.cpp index 80b5609417e..7ad7d9883e6 100644 --- a/cpp/src/slice2objc/Gen.cpp +++ b/cpp/src/slice2objc/Gen.cpp @@ -1214,15 +1214,29 @@ Slice::Gen::TypesVisitor::visitOperation(const OperationPtr& p) if(cl->isLocal() && (cl->hasMetaData("async-oneway") || p->hasMetaData("async-oneway"))) { - // TODO: add support for parameters when needed. - _H << nl << "-(id<ICEAsyncResult>) begin_" << name << deprecateSymbol << ";"; - _H << nl << "-(id<ICEAsyncResult>) begin_" << name << ":(void(^)(ICEException*))exception" - << deprecateSymbol << ";"; - _H << nl << "-(id<ICEAsyncResult>) begin_" << name - << ":(void(^)(ICEException*))exception sent:(void(^)(BOOL))sent" - << deprecateSymbol << ";"; - _H << nl << "-(void) end_" << name << ":(id<ICEAsyncResult>)result" - << deprecateSymbol << ";"; + string marshalParams = getMarshalParams(p); + string unmarshalParams = getUnmarshalParams(p); + + _H << nl << "-(id<ICEAsyncResult>) begin_" << name << marshalParams << deprecateSymbol << ";"; + _H << nl << "-(id<ICEAsyncResult>) begin_" << name << marshalParams; + if(!marshalParams.empty()) + { + _H << " exception"; + } + _H << ":(void(^)(ICEException*))exception" << deprecateSymbol << ";"; + _H << nl << "-(id<ICEAsyncResult>) begin_" << name << marshalParams; + if(!marshalParams.empty()) + { + _H << " exception"; + } + _H << ":(void(^)(ICEException*))exception sent:(void(^)(BOOL))sent" << deprecateSymbol << ";"; + + _H << nl << "-(void) end_" << name << unmarshalParams; + if(!unmarshalParams.empty()) + { + _H << " result"; + } + _H << ":(id<ICEAsyncResult>)result" << deprecateSymbol << ";"; } } diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp index 3366c4fdc7e..d285f0498ab 100644 --- a/cpp/test/Ice/ami/AllTests.cpp +++ b/cpp/test/Ice/ami/AllTests.cpp @@ -1901,6 +1901,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) promise<void> promise; b1->ice_getConnection()->flushBatchRequestsAsync( + Ice::CompressBatch::BasedOnProxy, [&](const exception_ptr& ex) { promise.set_exception(ex); @@ -1920,6 +1921,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) auto id = this_thread::get_id(); promise<void> promise; p->ice_getConnection()->flushBatchRequestsAsync( + Ice::CompressBatch::BasedOnProxy, [&](const exception_ptr& ex) { promise.set_exception(ex); @@ -1943,6 +1945,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) promise<void> promise; b1->ice_getConnection()->flushBatchRequestsAsync( + Ice::CompressBatch::BasedOnProxy, [&](exception_ptr ex) { promise.set_exception(move(ex)); @@ -1977,7 +1980,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->opBatch(); - communicator->flushBatchRequestsAsync().get(); + communicator->flushBatchRequestsAsync(Ice::CompressBatch::BasedOnProxy).get(); test(p->waitForBatch(2)); } @@ -1994,6 +1997,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) promise<void> promise; auto id = this_thread::get_id(); communicator->flushBatchRequestsAsync( + Ice::CompressBatch::BasedOnProxy, [&](exception_ptr ex) { promise.set_exception(move(ex)); @@ -2022,6 +2026,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) promise<void> promise; auto id = this_thread::get_id(); communicator->flushBatchRequestsAsync( + Ice::CompressBatch::BasedOnProxy, [&](exception_ptr ex) { promise.set_exception(move(ex)); @@ -2056,6 +2061,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) promise<void> promise; auto id = this_thread::get_id(); communicator->flushBatchRequestsAsync( + Ice::CompressBatch::BasedOnProxy, [&](exception_ptr ex) { promise.set_exception(move(ex)); @@ -2093,6 +2099,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) promise<void> promise; auto id = this_thread::get_id(); communicator->flushBatchRequestsAsync( + Ice::CompressBatch::BasedOnProxy, [&](exception_ptr ex) { promise.set_exception(move(ex)); @@ -2287,7 +2294,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) // // This test requires two threads in the server's thread pool: one will block in sleep() and the other // will process the CloseConnection message. - // + // p->ice_ping(); auto con = p->ice_getConnection(); auto s = make_shared<promise<void>>(); @@ -2346,7 +2353,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) // // Local case: start a lengthy operation and then close the connection forcefully on the client side. // There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException. - // + // p->ice_ping(); auto con = p->ice_getConnection(); auto s = make_shared<promise<void>>(); @@ -3208,7 +3215,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->opBatch(); FlushCallbackPtr cb = new FlushCallback(); - Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests( + Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); cb->check(); test(r->isSent()); @@ -3217,7 +3224,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) // Ensure it also works with a twoway proxy cb = new FlushCallback(); - r = p->ice_getConnection()->begin_flushBatchRequests( + r = p->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); cb->check(); test(r->isSent()); @@ -3234,7 +3241,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->opBatch(); FlushCallbackPtr cb = new FlushCallback(cookie); - b1->ice_getConnection()->begin_flushBatchRequests( + b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie); cb->check(); test(p->waitForBatch(2)); @@ -3251,7 +3258,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushExCallbackPtr cb = new FlushExCallback(); - Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests( + Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback(cb, &FlushExCallback::completedAsync, &FlushExCallback::sentAsync)); cb->check(); test(!r->isSent()); @@ -3270,7 +3277,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushExCallbackPtr cb = new FlushExCallback(cookie); - b1->ice_getConnection()->begin_flushBatchRequests( + b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback(cb, &FlushExCallback::completedAsync, &FlushExCallback::sentAsync), cookie); cb->check(); test(p->opBatchCount() == 0); @@ -3286,7 +3293,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->opBatch(); FlushCallbackPtr cb = new FlushCallback(); - Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests( + Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback_Connection_flushBatchRequests(cb, &FlushCallback::exception, &FlushCallback::sent)); cb->check(); test(r->isSent()); @@ -3304,7 +3311,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->opBatch(); FlushCallbackPtr cb = new FlushCallback(cookie); - b1->ice_getConnection()->begin_flushBatchRequests( + b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback_Connection_flushBatchRequests(cb, &FlushCallback::exceptionWC, &FlushCallback::sentWC), cookie); cb->check(); @@ -3322,7 +3329,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushExCallbackPtr cb = new FlushExCallback(); - Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests( + Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback_Connection_flushBatchRequests(cb, &FlushExCallback::exception, &FlushExCallback::sent)); cb->check(); @@ -3342,7 +3349,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushExCallbackPtr cb = new FlushExCallback(cookie); - b1->ice_getConnection()->begin_flushBatchRequests( + b1->ice_getConnection()->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback_Connection_flushBatchRequests(cb, &FlushExCallback::exceptionWC, &FlushExCallback::sentWC), cookie); cb->check(); @@ -3365,7 +3372,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->opBatch(); FlushCallbackPtr cb = new FlushCallback(); - Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( + Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); cb->check(); test(r->isSent()); @@ -3383,7 +3390,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->opBatch(); FlushCallbackPtr cb = new FlushCallback(cookie); - communicator->begin_flushBatchRequests( + communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie); cb->check(); test(p->waitForBatch(2)); @@ -3400,7 +3407,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); - Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( + Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); cb->check(); test(r->isSent()); // Exceptions are ignored! @@ -3419,7 +3426,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(cookie); - communicator->begin_flushBatchRequests( + communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie); cb->check(); test(p->opBatchCount() == 0); @@ -3442,7 +3449,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->opBatch(); b2->opBatch(); FlushCallbackPtr cb = new FlushCallback(); - Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( + Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); cb->check(); test(r->isSent()); @@ -3470,7 +3477,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->opBatch(); b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); - Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( + Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); cb->check(); test(r->isSent()); // Exceptions are ignored! @@ -3498,7 +3505,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); - Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( + Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); cb->check(); test(r->isSent()); // Exceptions are ignored! @@ -3516,7 +3523,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->opBatch(); FlushCallbackPtr cb = new FlushCallback(); - Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( + Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception, &FlushCallback::sent)); cb->check(); @@ -3535,7 +3542,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->opBatch(); FlushCallbackPtr cb = new FlushCallback(cookie); - communicator->begin_flushBatchRequests( + communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exceptionWC, &FlushCallback::sentWC), cookie); cb->check(); @@ -3553,7 +3560,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); - Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( + Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception, &FlushCallback::sent)); cb->check(); @@ -3573,7 +3580,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->opBatch(); b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(cookie); - communicator->begin_flushBatchRequests( + communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exceptionWC, &FlushCallback::sentWC), cookie); cb->check(); @@ -3598,7 +3605,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->opBatch(); b2->opBatch(); FlushCallbackPtr cb = new FlushCallback(); - Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( + Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception, &FlushCallback::sent)); cb->check(); @@ -3627,7 +3634,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->opBatch(); b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); - Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( + Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception, &FlushCallback::sent)); cb->check(); @@ -3656,7 +3663,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); - Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( + Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy, Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception, &FlushCallback::sent)); cb->check(); @@ -3775,7 +3782,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Ice::ConnectionPtr con = p->ice_getConnection(); p2 = p->ice_batchOneway(); p2->ice_ping(); - r = con->begin_flushBatchRequests(); + r = con->begin_flushBatchRequests(Ice::BasedOnProxy); test(r->getConnection() == con); test(r->getCommunicator() == communicator); test(!r->getProxy()); // Expected @@ -3786,7 +3793,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) // p2 = p->ice_batchOneway(); p2->ice_ping(); - r = communicator->begin_flushBatchRequests(); + r = communicator->begin_flushBatchRequests(Ice::BasedOnProxy); test(!r->getConnection()); // Expected test(r->getCommunicator() == communicator); test(!r->getProxy()); // Expected @@ -3961,7 +3968,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) // // This test requires two threads in the server's thread pool: one will block in sleep() and the other // will process the CloseConnection message. - // + // p->ice_ping(); Ice::ConnectionPtr con = p->ice_getConnection(); Ice::AsyncResultPtr r = p->begin_sleep(100); @@ -4009,7 +4016,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) // // Local case: start a lengthy operation and then close the connection forcefully on the client side. // There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException. - // + // p->ice_ping(); Ice::ConnectionPtr con = p->ice_getConnection(); Ice::AsyncResultPtr r = p->begin_sleep(100); diff --git a/cpp/test/Ice/operations/BatchOneways.cpp b/cpp/test/Ice/operations/BatchOneways.cpp index e3d261cf7e3..c7466821faa 100644 --- a/cpp/test/Ice/operations/BatchOneways.cpp +++ b/cpp/test/Ice/operations/BatchOneways.cpp @@ -89,6 +89,11 @@ batchOneways(const Test::MyClassPrxPtr& p) Test::MyClassPrxPtr batch = ICE_UNCHECKED_CAST(Test::MyClassPrx, p->ice_batchOneway()); batch->ice_flushBatchRequests(); // Empty flush + if(batch->ice_getConnection()) + { + batch->ice_getConnection()->flushBatchRequests(Ice::BasedOnProxy); + } + batch->ice_getCommunicator()->flushBatchRequests(Ice::BasedOnProxy); int i; p->opByteSOnewayCallCount(); // Reset the call count @@ -154,11 +159,10 @@ batchOneways(const Test::MyClassPrxPtr& p) BatchRequestInterceptorIPtr interceptor = ICE_MAKE_SHARED(BatchRequestInterceptorI); #if defined(ICE_CPP11_MAPPING) - initData.batchRequestInterceptor = - [=](const Ice::BatchRequest& request, int count, int size) - { - interceptor->enqueue(request, count, size); - }; + initData.batchRequestInterceptor = [=](const Ice::BatchRequest& request, int count, int size) + { + interceptor->enqueue(request, count, size); + }; #else initData.batchRequestInterceptor = interceptor; #endif @@ -195,4 +199,38 @@ batchOneways(const Test::MyClassPrxPtr& p) ic->destroy(); } + if(batch->ice_getConnection() && + p->ice_getCommunicator()->getProperties()->getProperty("Ice.Override.Compress") == "") + { + Ice::ObjectPrxPtr prx = batch->ice_getConnection()->createProxy(batch->ice_getIdentity())->ice_batchOneway(); + + Test::MyClassPrxPtr batch1 = ICE_UNCHECKED_CAST(Test::MyClassPrx, prx->ice_compress(false)); + Test::MyClassPrxPtr batch2 = ICE_UNCHECKED_CAST(Test::MyClassPrx, prx->ice_compress(true)); + Test::MyClassPrxPtr batch3 = ICE_UNCHECKED_CAST(Test::MyClassPrx, prx->ice_identity(identity)); + + batch1->opByteSOneway(bs1); + batch1->opByteSOneway(bs1); + batch1->opByteSOneway(bs1); + batch1->ice_getConnection()->flushBatchRequests(Ice::Yes); + + batch2->opByteSOneway(bs1); + batch2->opByteSOneway(bs1); + batch2->opByteSOneway(bs1); + batch1->ice_getConnection()->flushBatchRequests(Ice::No); + + batch1->opByteSOneway(bs1); + batch1->opByteSOneway(bs1); + batch1->opByteSOneway(bs1); + batch1->ice_getConnection()->flushBatchRequests(Ice::BasedOnProxy); + + batch1->opByteSOneway(bs1); + batch2->opByteSOneway(bs1); + batch1->opByteSOneway(bs1); + batch1->ice_getConnection()->flushBatchRequests(Ice::BasedOnProxy); + + batch1->opByteSOneway(bs1); + batch3->opByteSOneway(bs1); + batch1->opByteSOneway(bs1); + batch1->ice_getConnection()->flushBatchRequests(Ice::BasedOnProxy); + } } diff --git a/csharp/src/Ice/BatchRequestQueue.cs b/csharp/src/Ice/BatchRequestQueue.cs index 5507e879933..a2d3ee13857 100644 --- a/csharp/src/Ice/BatchRequestQueue.cs +++ b/csharp/src/Ice/BatchRequestQueue.cs @@ -27,7 +27,7 @@ namespace IceInternal public void enqueue() { - _queue.enqueueBatchRequest(); + _queue.enqueueBatchRequest(_proxy); } public Ice.ObjectPrx getProxy() @@ -118,6 +118,11 @@ namespace IceInternal } else { + bool compress; + if(((Ice.ObjectPrxHelperBase)proxy).iceReference().getCompressOverride(out compress)) + { + _batchCompress |= compress; + } _batchMarker = _batchStream.size(); ++_batchRequestNum; } @@ -150,12 +155,13 @@ namespace IceInternal } public int - swap(Ice.OutputStream os) + swap(Ice.OutputStream os, out bool compress) { lock(this) { if(_batchRequestNum == 0) { + compress = false; return 0; } @@ -172,12 +178,14 @@ namespace IceInternal } int requestNum = _batchRequestNum; + compress = _batchCompress; _batchStream.swap(os); // // Reset the batch. // _batchRequestNum = 0; + _batchCompress = false; _batchStream.writeBlob(Protocol.requestBatchHdr); _batchMarker = _batchStream.size(); if(lastRequest != null) @@ -221,9 +229,14 @@ namespace IceInternal } } - internal void enqueueBatchRequest() + internal void enqueueBatchRequest(Ice.ObjectPrx proxy) { Debug.Assert(_batchMarker < _batchStream.size()); + bool compress; + if(((Ice.ObjectPrxHelperBase)proxy).iceReference().getCompressOverride(out compress)) + { + _batchCompress |= compress; + } _batchMarker = _batchStream.size(); ++_batchRequestNum; } @@ -234,6 +247,7 @@ namespace IceInternal private bool _batchStreamCanFlush; private int _batchRequestNum; private int _batchMarker; + private bool _batchCompress; private BatchRequestI _request; private Ice.LocalException _exception; private int _maxSize; diff --git a/csharp/src/Ice/CommunicatorI.cs b/csharp/src/Ice/CommunicatorI.cs index 73a9d0b0dbb..1528a116a06 100644 --- a/csharp/src/Ice/CommunicatorI.cs +++ b/csharp/src/Ice/CommunicatorI.cs @@ -163,23 +163,24 @@ namespace Ice return _instance.pluginManager(); } - public void flushBatchRequests() + public void flushBatchRequests(Ice.CompressBatch compressBatch) { - flushBatchRequestsAsync().Wait(); + flushBatchRequestsAsync(compressBatch).Wait(); } - public Task flushBatchRequestsAsync(IProgress<bool> progress = null, + public Task flushBatchRequestsAsync(Ice.CompressBatch compressBatch, + IProgress<bool> progress = null, CancellationToken cancel = new CancellationToken()) { var completed = new FlushBatchTaskCompletionCallback(progress, cancel); var outgoing = new CommunicatorFlushBatchAsync(_instance, completed); - outgoing.invoke(_flushBatchRequests_name); + outgoing.invoke(_flushBatchRequests_name, compressBatch); return completed.Task; } - public AsyncResult begin_flushBatchRequests() + public AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch) { - return begin_flushBatchRequests(null, null); + return begin_flushBatchRequests(compressBatch, null, null); } private const string _flushBatchRequests_name = "flushBatchRequests"; @@ -214,11 +215,15 @@ namespace Ice } }; - public AsyncResult begin_flushBatchRequests(AsyncCallback cb, object cookie) + public AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch, AsyncCallback cb, object cookie) { - var result = new CommunicatorFlushBatchCompletionCallback(this, _instance, _flushBatchRequests_name, cookie, cb); + var result = new CommunicatorFlushBatchCompletionCallback(this, + _instance, + _flushBatchRequests_name, + cookie, + cb); var outgoing = new CommunicatorFlushBatchAsync(_instance, result); - outgoing.invoke(_flushBatchRequests_name); + outgoing.invoke(_flushBatchRequests_name, compressBatch); return result; } diff --git a/csharp/src/Ice/ConnectionFactory.cs b/csharp/src/Ice/ConnectionFactory.cs index 936f2b3da57..51e9c992e1e 100644 --- a/csharp/src/Ice/ConnectionFactory.cs +++ b/csharp/src/Ice/ConnectionFactory.cs @@ -255,7 +255,7 @@ namespace IceInternal } } - public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync) + public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync) { ICollection<Ice.ConnectionI> c = new List<Ice.ConnectionI>(); @@ -280,7 +280,7 @@ namespace IceInternal { try { - outAsync.flushConnection(conn); + outAsync.flushConnection(conn, compressBatch); } catch(Ice.LocalException) { @@ -1292,7 +1292,7 @@ namespace IceInternal } } - public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync) + public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync) { // // connections() is synchronized, no need to synchronize here. @@ -1301,7 +1301,7 @@ namespace IceInternal { try { - outAsync.flushConnection(connection); + outAsync.flushConnection(connection, compressBatch); } catch(Ice.LocalException) { diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs index 63a5148742a..a1cbdde8fcf 100644 --- a/csharp/src/Ice/ConnectionI.cs +++ b/csharp/src/Ice/ConnectionI.cs @@ -473,9 +473,9 @@ namespace Ice return _batchRequestQueue; } - public void flushBatchRequests() + public void flushBatchRequests(CompressBatch compressBatch) { - flushBatchRequestsAsync().Wait(); + flushBatchRequestsAsync(compressBatch).Wait(); } private class ConnectionFlushBatchCompletionCallback : AsyncResultCompletionCallback @@ -517,21 +517,24 @@ namespace Ice private Connection _connection; } - public Task flushBatchRequestsAsync(IProgress<bool> progress = null, + public Task flushBatchRequestsAsync(CompressBatch compressBatch, + IProgress<bool> progress = null, CancellationToken cancel = new CancellationToken()) { var completed = new FlushBatchTaskCompletionCallback(progress, cancel); var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed); - outgoing.invoke(_flushBatchRequests_name); + outgoing.invoke(_flushBatchRequests_name, compressBatch); return completed.Task; } - public AsyncResult begin_flushBatchRequests(AsyncCallback cb = null, object cookie = null) + public AsyncResult begin_flushBatchRequests(CompressBatch compressBatch, + AsyncCallback cb = null, + object cookie = null) { var result = new ConnectionFlushBatchCompletionCallback(this, _communicator, _instance, _flushBatchRequests_name, cookie, cb); var outgoing = new ConnectionFlushBatchAsync(this, _instance, result); - outgoing.invoke(_flushBatchRequests_name); + outgoing.invoke(_flushBatchRequests_name, compressBatch); return result; } diff --git a/csharp/src/Ice/ObjectAdapterFactory.cs b/csharp/src/Ice/ObjectAdapterFactory.cs index 17ccfa91b1c..0abdea67a62 100644 --- a/csharp/src/Ice/ObjectAdapterFactory.cs +++ b/csharp/src/Ice/ObjectAdapterFactory.cs @@ -28,10 +28,10 @@ namespace IceInternal } adapters = new List<Ice.ObjectAdapterI>(_adapters); - + _instance = null; _communicator = null; - + System.Threading.Monitor.PulseAll(this); } @@ -44,7 +44,7 @@ namespace IceInternal adapter.deactivate(); } } - + public void waitForShutdown() { List<Ice.ObjectAdapterI> adapters; @@ -57,7 +57,7 @@ namespace IceInternal { System.Threading.Monitor.Wait(this); } - + adapters = new List<Ice.ObjectAdapterI>(_adapters); } @@ -110,13 +110,13 @@ namespace IceInternal { adapters = new List<Ice.ObjectAdapterI>(_adapters); } - + foreach(Ice.ObjectAdapterI adapter in adapters) { adapter.updateConnectionObservers(); } } - + public void updateThreadObservers() { @@ -125,13 +125,13 @@ namespace IceInternal { adapters = new List<Ice.ObjectAdapterI>(_adapters); } - + foreach(Ice.ObjectAdapterI adapter in adapters) { adapter.updateThreadObservers(); } } - + public Ice.ObjectAdapter createObjectAdapter(string name, Ice.RouterPrx router) { lock(this) @@ -140,7 +140,7 @@ namespace IceInternal { throw new Ice.CommunicatorDestroyedException(); } - + Ice.ObjectAdapterI adapter = null; if(name.Length == 0) { @@ -163,7 +163,7 @@ namespace IceInternal return adapter; } } - + public Ice.ObjectAdapter findObjectAdapter(Ice.ObjectPrx proxy) { List<Ice.ObjectAdapterI> adapters; @@ -173,10 +173,10 @@ namespace IceInternal { return null; } - + adapters = new List<Ice.ObjectAdapterI>(_adapters); } - + foreach(Ice.ObjectAdapterI adapter in adapters) { try @@ -209,7 +209,7 @@ namespace IceInternal } } - public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync) + public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync) { List<Ice.ObjectAdapterI> adapters; lock(this) @@ -219,10 +219,10 @@ namespace IceInternal foreach(Ice.ObjectAdapterI adapter in adapters) { - adapter.flushAsyncBatchRequests(outAsync); + adapter.flushAsyncBatchRequests(compressBatch, outAsync); } } - + // // Only for use by Instance. // @@ -233,7 +233,7 @@ namespace IceInternal _adapterNamesInUse = new HashSet<string>(); _adapters = new List<Ice.ObjectAdapterI>(); } - + private Instance _instance; private Ice.Communicator _communicator; private HashSet<string> _adapterNamesInUse; diff --git a/csharp/src/Ice/ObjectAdapterI.cs b/csharp/src/Ice/ObjectAdapterI.cs index 5929b46fd3c..01892c30401 100644 --- a/csharp/src/Ice/ObjectAdapterI.cs +++ b/csharp/src/Ice/ObjectAdapterI.cs @@ -688,7 +688,7 @@ namespace Ice } } - public void flushAsyncBatchRequests(CommunicatorFlushBatchAsync outAsync) + public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync) { List<IncomingConnectionFactory> f; lock(this) @@ -698,7 +698,7 @@ namespace Ice foreach(IncomingConnectionFactory factory in f) { - factory.flushAsyncBatchRequests(outAsync); + factory.flushAsyncBatchRequests(compressBatch, outAsync); } } diff --git a/csharp/src/Ice/OutgoingAsync.cs b/csharp/src/Ice/OutgoingAsync.cs index db0e76d2bc7..74828721b6a 100644 --- a/csharp/src/Ice/OutgoingAsync.cs +++ b/csharp/src/Ice/OutgoingAsync.cs @@ -1140,7 +1140,8 @@ namespace IceInternal { Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(proxy_.iceReference().getProtocol())); observer_ = ObserverHelper.get(proxy_, operation, null); - _batchRequestNum = proxy_.iceGetBatchRequestQueue().swap(os_); + bool compress; // Not used for proxy flush batch requests. + _batchRequestNum = proxy_.iceGetBatchRequestQueue().swap(os_, out compress); invokeImpl(true); // userThread = true } @@ -1198,13 +1199,14 @@ namespace IceInternal _connection = connection; } - public void invoke(string operation) + public void invoke(string operation, Ice.CompressBatch compressBatch) { observer_ = ObserverHelper.get(instance_, operation); try { int status; - int batchRequestNum = _connection.getBatchRequestQueue().swap(os_); + bool compress; + int batchRequestNum = _connection.getBatchRequestQueue().swap(os_, out compress); if(batchRequestNum == 0) { status = AsyncStatusSent; @@ -1215,7 +1217,20 @@ namespace IceInternal } else { - status = _connection.sendAsyncRequest(this, false, false, batchRequestNum); + bool comp; + if(compressBatch == Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = compress; + } + status = _connection.sendAsyncRequest(this, comp, false, batchRequestNum); } if((status & AsyncStatusSent) != 0) @@ -1311,7 +1326,7 @@ namespace IceInternal _useCount = 1; } - public void flushConnection(Ice.ConnectionI con) + public void flushConnection(Ice.ConnectionI con, Ice.CompressBatch compressBatch) { lock(this) { @@ -1321,14 +1336,28 @@ namespace IceInternal try { var flushBatch = new FlushBatch(this, instance_, _observer); - int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs()); + bool compress; + int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs(), out compress); if(batchRequestNum == 0) { flushBatch.sent(); } else { - con.sendAsyncRequest(flushBatch, false, false, batchRequestNum); + bool comp; + if(compressBatch == Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = compress; + } + con.sendAsyncRequest(flushBatch, comp, false, batchRequestNum); } } catch(Ice.LocalException) @@ -1338,15 +1367,15 @@ namespace IceInternal } } - public void invoke(string operation) + public void invoke(string operation, Ice.CompressBatch compressBatch) { _observer = ObserverHelper.get(instance_, operation); if(_observer != null) { _observer.attach(); } - instance_.outgoingConnectionFactory().flushAsyncBatchRequests(this); - instance_.objectAdapterFactory().flushAsyncBatchRequests(this); + instance_.outgoingConnectionFactory().flushAsyncBatchRequests(compressBatch, this); + instance_.objectAdapterFactory().flushAsyncBatchRequests(compressBatch, this); check(true); } diff --git a/csharp/src/Ice/Reference.cs b/csharp/src/Ice/Reference.cs index cfd1019cc74..f786dd5e18e 100644 --- a/csharp/src/Ice/Reference.cs +++ b/csharp/src/Ice/Reference.cs @@ -237,6 +237,25 @@ namespace IceInternal } } + public bool getCompressOverride(out bool compress) + { + DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCompress) + { + compress = defaultsAndOverrides.overrideCompressValue; + } + else if(overrideCompress_) + { + compress = compress_; + } + else + { + compress = false; + return false; + } + return true; + } + public abstract bool isIndirect(); public abstract bool isWellKnown(); @@ -709,7 +728,7 @@ namespace IceInternal _fixedConnection.throwException(); // Throw in case our connection is already destroyed. - bool compress; + bool compress = false; if(defaultsAndOverrides.overrideCompress) { compress = defaultsAndOverrides.overrideCompressValue; @@ -718,10 +737,6 @@ namespace IceInternal { compress = compress_; } - else - { - compress = _fixedConnection.endpoint().compress(); - } return proxy.iceSetRequestHandler(new ConnectionRequestHandler(this, _fixedConnection, compress)); } diff --git a/csharp/test/Ice/ami/AllTests.cs b/csharp/test/Ice/ami/AllTests.cs index 0a515a0b2e9..d6f4f68e4d3 100644 --- a/csharp/test/Ice/ami/AllTests.cs +++ b/csharp/test/Ice/ami/AllTests.cs @@ -2147,6 +2147,7 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); SentCallback cb = new SentCallback(); Task t = b1.ice_getConnection().flushBatchRequestsAsync( + Ice.CompressBatch.BasedOnProxy, progress:new Progress( sentSyncrhonously => { @@ -2168,6 +2169,7 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); Task t = b1.ice_getConnection().flushBatchRequestsAsync( + Ice.CompressBatch.BasedOnProxy, progress: new Progress( sentSynchronously => { @@ -2189,7 +2191,10 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); b1.opBatch(); FlushCallback cb = new FlushCallback(cookie); - Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(cb.completedAsync, cookie); + Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, + cb.completedAsync, + cookie); r.whenSent(cb.sentAsync); cb.check(); test(r.isSent()); @@ -2207,7 +2212,10 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushExCallback cb = new FlushExCallback(cookie); - Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(cb.completedAsync, cookie); + Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, + cb.completedAsync, + cookie); r.whenSent(cb.sentAsync); cb.check(); test(!r.isSent()); @@ -2225,7 +2233,7 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); b1.opBatch(); FlushCallback cb = new FlushCallback(); - Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(); + Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted(cb.exception); r.whenSent(cb.sent); cb.check(); @@ -2244,7 +2252,7 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushExCallback cb = new FlushExCallback(); - Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(); + Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted(cb.exception); r.whenSent(cb.sent); cb.check(); @@ -2274,6 +2282,7 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, (Ice.AsyncResult result) => { cb.completedAsync(result); @@ -2300,6 +2309,7 @@ public class AllTests : TestCommon.AllTests b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushExCallback cb = new FlushExCallback(cookie); Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, (Ice.AsyncResult result) => { cb.completedAsync(result); @@ -2325,7 +2335,7 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); b1.opBatch(); FlushCallback cb = new FlushCallback(); - Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(); + Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted( (Ice.Exception ex) => { @@ -2352,7 +2362,7 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushExCallback cb = new FlushExCallback(); - Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(); + Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted( (Ice.Exception ex) => { @@ -2386,6 +2396,7 @@ public class AllTests : TestCommon.AllTests SentCallback cb = new SentCallback(); Task t = communicator.flushBatchRequestsAsync( + Ice.CompressBatch.BasedOnProxy, progress: new Progress( sentSynchronously => { @@ -2407,6 +2418,7 @@ public class AllTests : TestCommon.AllTests b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); SentCallback cb = new SentCallback(); Task t = communicator.flushBatchRequestsAsync( + Ice.CompressBatch.BasedOnProxy, progress:new Progress( sentSynchronously => { @@ -2435,6 +2447,7 @@ public class AllTests : TestCommon.AllTests SentCallback cb = new SentCallback(); Task t = communicator.flushBatchRequestsAsync( + Ice.CompressBatch.BasedOnProxy, new Progress(sentSynchronously => { cb.sent(sentSynchronously); @@ -2462,6 +2475,7 @@ public class AllTests : TestCommon.AllTests b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); SentCallback cb = new SentCallback(); Task t = communicator.flushBatchRequestsAsync( + Ice.CompressBatch.BasedOnProxy, new Progress( sentSynchronously => { @@ -2490,6 +2504,7 @@ public class AllTests : TestCommon.AllTests b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); SentCallback cb = new SentCallback(); Task t = communicator.flushBatchRequestsAsync( + Ice.CompressBatch.BasedOnProxy, new Progress( sentSynchronously => { @@ -2512,7 +2527,9 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); b1.opBatch(); FlushCallback cb = new FlushCallback(cookie); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(cb.completedAsync, cookie); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, + cb.completedAsync, + cookie); r.whenSent(cb.sentAsync); cb.check(); test(r.isSent()); @@ -2530,7 +2547,9 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushCallback cb = new FlushCallback(cookie); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(cb.completedAsync, cookie); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, + cb.completedAsync, + cookie); r.whenSent(cb.sentAsync); cb.check(); test(r.isSent()); // Exceptions are ignored! @@ -2554,7 +2573,9 @@ public class AllTests : TestCommon.AllTests b2.opBatch(); b2.opBatch(); FlushCallback cb = new FlushCallback(cookie); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(cb.completedAsync, cookie); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, + cb.completedAsync, + cookie); r.whenSent(cb.sentAsync); cb.check(); test(r.isSent()); @@ -2579,7 +2600,9 @@ public class AllTests : TestCommon.AllTests b2.opBatch(); b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushCallback cb = new FlushCallback(cookie); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(cb.completedAsync, cookie); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, + cb.completedAsync, + cookie); r.whenSent(cb.sentAsync); cb.check(); test(r.isSent()); // Exceptions are ignored! @@ -2604,7 +2627,9 @@ public class AllTests : TestCommon.AllTests b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushCallback cb = new FlushCallback(cookie); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(cb.completedAsync, cookie); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, + cb.completedAsync, + cookie); r.whenSent(cb.sentAsync); cb.check(); test(r.isSent()); // Exceptions are ignored! @@ -2622,7 +2647,7 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); b1.opBatch(); FlushCallback cb = new FlushCallback(); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted(cb.exception); r.whenSent(cb.sent); cb.check(); @@ -2641,7 +2666,7 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushCallback cb = new FlushCallback(); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted(cb.exception); r.whenSent(cb.sent); cb.check(); @@ -2665,7 +2690,7 @@ public class AllTests : TestCommon.AllTests b2.opBatch(); b2.opBatch(); FlushCallback cb = new FlushCallback(); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted(cb.exception); r.whenSent(cb.sent); cb.check(); @@ -2691,7 +2716,7 @@ public class AllTests : TestCommon.AllTests b2.opBatch(); b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushCallback cb = new FlushCallback(); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted(cb.exception); r.whenSent(cb.sent); cb.check(); @@ -2717,7 +2742,7 @@ public class AllTests : TestCommon.AllTests b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushCallback cb = new FlushCallback(); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted(cb.exception); r.whenSent(cb.sent); cb.check(); @@ -2744,6 +2769,7 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, (Ice.AsyncResult result) => { cb.completedAsync(result); @@ -2770,6 +2796,7 @@ public class AllTests : TestCommon.AllTests b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, (Ice.AsyncResult result) => { cb.completedAsync(result); @@ -2801,6 +2828,7 @@ public class AllTests : TestCommon.AllTests b2.opBatch(); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, (Ice.AsyncResult result) => { cb.completedAsync(result); @@ -2834,6 +2862,7 @@ public class AllTests : TestCommon.AllTests b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, (Ice.AsyncResult result) => { cb.completedAsync(result); @@ -2867,6 +2896,7 @@ public class AllTests : TestCommon.AllTests b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushCallback cb = new FlushCallback(cookie); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, (Ice.AsyncResult result) => { cb.completedAsync(result); @@ -2892,7 +2922,7 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); b1.opBatch(); FlushCallback cb = new FlushCallback(); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted( (Ice.Exception ex) => { @@ -2919,7 +2949,7 @@ public class AllTests : TestCommon.AllTests b1.opBatch(); b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushCallback cb = new FlushCallback(); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted( (Ice.Exception ex) => { @@ -2951,7 +2981,7 @@ public class AllTests : TestCommon.AllTests b2.opBatch(); b2.opBatch(); FlushCallback cb = new FlushCallback(); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted( (Ice.Exception ex) => { @@ -2985,7 +3015,7 @@ public class AllTests : TestCommon.AllTests b2.opBatch(); b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushCallback cb = new FlushCallback(); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted( (Ice.Exception ex) => { @@ -3019,7 +3049,7 @@ public class AllTests : TestCommon.AllTests b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); FlushCallback cb = new FlushCallback(); - Ice.AsyncResult r = communicator.begin_flushBatchRequests(); + Ice.AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); r.whenCompleted( (Ice.Exception ex) => { @@ -3254,7 +3284,7 @@ public class AllTests : TestCommon.AllTests Ice.Connection con = p.ice_getConnection(); p2 = p.ice_batchOneway() as Test.TestIntfPrx; p2.ice_ping(); - r = con.begin_flushBatchRequests(); + r = con.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); test(r.getConnection() == con); test(r.getCommunicator() == communicator); test(r.getProxy() == null); // Expected @@ -3265,7 +3295,7 @@ public class AllTests : TestCommon.AllTests // p2 = p.ice_batchOneway() as Test.TestIntfPrx; p2.ice_ping(); - r = communicator.begin_flushBatchRequests(); + r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); test(r.getConnection() == null); // Expected test(r.getCommunicator() == communicator); test(r.getProxy() == null); // Expected diff --git a/csharp/test/Ice/operations/BatchOneways.cs b/csharp/test/Ice/operations/BatchOneways.cs index ee21049b0c7..1eb020416ba 100644 --- a/csharp/test/Ice/operations/BatchOneways.cs +++ b/csharp/test/Ice/operations/BatchOneways.cs @@ -162,5 +162,41 @@ class BatchOneways ic.destroy(); } + + p.ice_ping(); + if(p.ice_getConnection() != null && + p.ice_getCommunicator().getProperties().getProperty("Ice.Override.Compress").Equals("")) + { + Ice.ObjectPrx prx = p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway(); + + Test.MyClassPrx batchC1 = Test.MyClassPrxHelper.uncheckedCast(prx.ice_compress(false)); + Test.MyClassPrx batchC2 = Test.MyClassPrxHelper.uncheckedCast(prx.ice_compress(true)); + Test.MyClassPrx batchC3 = Test.MyClassPrxHelper.uncheckedCast(prx.ice_identity(identity)); + + batchC1.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.Yes); + + batchC2.opByteSOneway(bs1); + batchC2.opByteSOneway(bs1); + batchC2.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.No); + + batchC1.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy); + + batchC1.opByteSOneway(bs1); + batchC2.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy); + + batchC1.opByteSOneway(bs1); + batchC3.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy); + } } } diff --git a/java-compat/src/Ice/src/main/java/Ice/CommunicatorI.java b/java-compat/src/Ice/src/main/java/Ice/CommunicatorI.java index e4eea1ecf6d..6eda6d0aeaf 100644 --- a/java-compat/src/Ice/src/main/java/Ice/CommunicatorI.java +++ b/java-compat/src/Ice/src/main/java/Ice/CommunicatorI.java @@ -212,39 +212,40 @@ public final class CommunicatorI implements Communicator @Override public void - flushBatchRequests() + flushBatchRequests(Ice.CompressBatch compressBatch) { - end_flushBatchRequests(begin_flushBatchRequests()); + end_flushBatchRequests(begin_flushBatchRequests(compressBatch)); } @Override public AsyncResult - begin_flushBatchRequests() + begin_flushBatchRequests(Ice.CompressBatch compressBatch) { - return begin_flushBatchRequestsInternal(null); + return begin_flushBatchRequestsInternal(compressBatch, null); } @Override public AsyncResult - begin_flushBatchRequests(Callback cb) + begin_flushBatchRequests(Ice.CompressBatch compressBatch, Callback cb) { - return begin_flushBatchRequestsInternal(cb); + return begin_flushBatchRequestsInternal(compressBatch, cb); } @Override public AsyncResult - begin_flushBatchRequests(Callback_Communicator_flushBatchRequests cb) + begin_flushBatchRequests(Ice.CompressBatch compressBatch, Callback_Communicator_flushBatchRequests cb) { - return begin_flushBatchRequestsInternal(cb); + return begin_flushBatchRequestsInternal(compressBatch, cb); } @Override public AsyncResult - begin_flushBatchRequests(IceInternal.Functional_VoidCallback responseCb, + begin_flushBatchRequests(Ice.CompressBatch compressBatch, + IceInternal.Functional_VoidCallback responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb, IceInternal.Functional_BoolCallback sentCb) { - return begin_flushBatchRequestsInternal( + return begin_flushBatchRequestsInternal(compressBatch, new IceInternal.Functional_CallbackBase(false, exceptionCb, sentCb) { @Override @@ -265,7 +266,7 @@ public final class CommunicatorI implements Communicator private static final String _flushBatchRequests_name = "flushBatchRequests"; private Ice.AsyncResult - begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb) + begin_flushBatchRequestsInternal(Ice.CompressBatch compressBatch, IceInternal.CallbackBase cb) { IceInternal.OutgoingConnectionFactory connectionFactory = _instance.outgoingConnectionFactory(); IceInternal.ObjectAdapterFactory adapterFactory = _instance.objectAdapterFactory(); @@ -279,8 +280,8 @@ public final class CommunicatorI implements Communicator _flushBatchRequests_name, cb); - connectionFactory.flushAsyncBatchRequests(result); - adapterFactory.flushAsyncBatchRequests(result); + connectionFactory.flushAsyncBatchRequests(compressBatch, result); + adapterFactory.flushAsyncBatchRequests(compressBatch, result); // // Inform the callback that we have finished initiating all of the diff --git a/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java b/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java index 84131a38cfc..0130c676dd5 100644 --- a/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java +++ b/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java @@ -420,58 +420,62 @@ public final class ConnectionI extends IceInternal.EventHandler } @Override - public void flushBatchRequests() + public void flushBatchRequests(Ice.CompressBatch compressBatch) { - end_flushBatchRequests(begin_flushBatchRequests()); + end_flushBatchRequests(begin_flushBatchRequests(compressBatch)); } private static final String _flushBatchRequests_name = "flushBatchRequests"; @Override - public Ice.AsyncResult begin_flushBatchRequests() + public Ice.AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch) { - return begin_flushBatchRequestsInternal(null); + return begin_flushBatchRequestsInternal(compressBatch, null); } @Override - public Ice.AsyncResult begin_flushBatchRequests(Callback cb) + public Ice.AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch, Callback cb) { - return begin_flushBatchRequestsInternal(cb); + return begin_flushBatchRequestsInternal(compressBatch, cb); } @Override - public Ice.AsyncResult begin_flushBatchRequests(Callback_Connection_flushBatchRequests cb) + public Ice.AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch, + Callback_Connection_flushBatchRequests cb) { - return begin_flushBatchRequestsInternal(cb); + return begin_flushBatchRequestsInternal(compressBatch, cb); } @Override - public AsyncResult begin_flushBatchRequests(IceInternal.Functional_VoidCallback responseCb, - IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb, - IceInternal.Functional_BoolCallback sentCb) - { - return begin_flushBatchRequestsInternal(new IceInternal.Functional_CallbackBase(false, exceptionCb, sentCb) - { - @Override - public final void _iceCompleted(AsyncResult result) - { - try - { - result.getConnection().end_flushBatchRequests(result); - } - catch(Exception ex) - { - _exceptionCb.apply(ex); - } - } - }); - } - - private Ice.AsyncResult begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb) + public AsyncResult begin_flushBatchRequests(Ice.CompressBatch compressBatch, + IceInternal.Functional_VoidCallback responseCb, + IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb, + IceInternal.Functional_BoolCallback sentCb) + { + return begin_flushBatchRequestsInternal(compressBatch, + new IceInternal.Functional_CallbackBase(false, exceptionCb, sentCb) + { + @Override + public final void _iceCompleted(AsyncResult result) + { + try + { + result.getConnection().end_flushBatchRequests(result); + } + catch(Exception ex) + { + _exceptionCb.apply(ex); + } + } + }); + } + + private Ice.AsyncResult begin_flushBatchRequestsInternal(Ice.CompressBatch compressBatch, + IceInternal.CallbackBase cb) { IceInternal.ConnectionFlushBatch result = new IceInternal.ConnectionFlushBatch(this, _communicator, _instance, _flushBatchRequests_name, cb); - result.invoke(); + result.invoke(compressBatch); return result; } diff --git a/java-compat/src/Ice/src/main/java/Ice/ObjectAdapterI.java b/java-compat/src/Ice/src/main/java/Ice/ObjectAdapterI.java index 9ab45240784..4b4029abbd2 100644 --- a/java-compat/src/Ice/src/main/java/Ice/ObjectAdapterI.java +++ b/java-compat/src/Ice/src/main/java/Ice/ObjectAdapterI.java @@ -746,7 +746,7 @@ public final class ObjectAdapterI implements ObjectAdapter } public void - flushAsyncBatchRequests(IceInternal.CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(Ice.CompressBatch compressBatch, IceInternal.CommunicatorFlushBatch outAsync) { List<IncomingConnectionFactory> f; synchronized(this) @@ -755,7 +755,7 @@ public final class ObjectAdapterI implements ObjectAdapter } for(IncomingConnectionFactory p : f) { - p.flushAsyncBatchRequests(outAsync); + p.flushAsyncBatchRequests(compressBatch, outAsync); } } diff --git a/java-compat/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java b/java-compat/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java index 3db2ac67177..6d7f2030ac0 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/BatchRequestQueue.java @@ -23,7 +23,7 @@ public class BatchRequestQueue @Override public void enqueue() { - enqueueBatchRequest(); + enqueueBatchRequest(_proxy); } @Override @@ -59,6 +59,7 @@ public class BatchRequestQueue _batchStream = new Ice.OutputStream(instance, Protocol.currentProtocolEncoding); _batchStream.writeBlob(Protocol.requestBatchHdr); _batchMarker = _batchStream.size(); + _batchCompress = false; _request = new BatchRequestI(); _maxSize = instance.batchAutoFlushSize(); @@ -112,6 +113,11 @@ public class BatchRequestQueue } else { + Boolean compress = ((Ice.ObjectPrxHelperBase)proxy)._getReference().getCompressOverride(); + if(compress != null) + { + _batchCompress |= compress.booleanValue(); + } _batchMarker = _batchStream.size(); ++_batchRequestNum; } @@ -141,7 +147,7 @@ public class BatchRequestQueue } synchronized public int - swap(Ice.OutputStream os) + swap(Ice.OutputStream os, Ice.BooleanHolder compress) { if(_batchRequestNum == 0) { @@ -161,12 +167,17 @@ public class BatchRequestQueue } int requestNum = _batchRequestNum; + if(compress != null) + { + compress.value = _batchCompress; + } _batchStream.swap(os); // // Reset the batch. // _batchRequestNum = 0; + _batchCompress = false; _batchStream.writeBlob(Protocol.requestBatchHdr); _batchMarker = _batchStream.size(); if(lastRequest != null) @@ -218,9 +229,14 @@ public class BatchRequestQueue } } - private void enqueueBatchRequest() + private void enqueueBatchRequest(Ice.ObjectPrx proxy) { assert(_batchMarker < _batchStream.size()); + Boolean compress = ((Ice.ObjectPrxHelperBase)proxy)._getReference().getCompressOverride(); + if(compress != null) + { + _batchCompress |= compress.booleanValue(); + } _batchMarker = _batchStream.size(); ++_batchRequestNum; } @@ -231,6 +247,7 @@ public class BatchRequestQueue private boolean _batchStreamCanFlush; private int _batchRequestNum; private int _batchMarker; + private boolean _batchCompress; private BatchRequestI _request; private Ice.LocalException _exception; private int _maxSize; diff --git a/java-compat/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java b/java-compat/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java index 309c9dd6a55..d6da14b8f24 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/CommunicatorFlushBatch.java @@ -43,7 +43,7 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI _useCount = 1; } - public void flushConnection(final Ice.ConnectionI con) + public void flushConnection(final Ice.ConnectionI con, final Ice.CompressBatch compressBatch) { class FlushBatch extends OutgoingAsyncBase { @@ -96,7 +96,8 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI try { final FlushBatch flushBatch = new FlushBatch(); - final int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs()); + final Ice.BooleanHolder compress = new Ice.BooleanHolder(); + final int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs(), compress); if(batchRequestNum == 0) { flushBatch.sent(); @@ -108,14 +109,40 @@ public class CommunicatorFlushBatch extends IceInternal.AsyncResultI @Override public Void call() throws RetryException { - con.sendAsyncRequest(flushBatch, false, false, batchRequestNum); + boolean comp; + if(compressBatch == Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = compress.value; + } + con.sendAsyncRequest(flushBatch, comp, false, batchRequestNum); return null; } }); } else { - con.sendAsyncRequest(flushBatch, false, false, batchRequestNum); + boolean comp; + if(compressBatch == Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = compress.value; + } + con.sendAsyncRequest(flushBatch, comp, false, batchRequestNum); } } catch(RetryException ex) diff --git a/java-compat/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java b/java-compat/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java index 7824d3154c1..e1839020fa6 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/ConnectionFlushBatch.java @@ -42,12 +42,12 @@ public class ConnectionFlushBatch extends OutgoingAsyncBase return _connection; } - public void invoke() + public void invoke(final Ice.CompressBatch compressBatch) { try { - final int batchRequestNum = _connection.getBatchRequestQueue().swap(_os); - + final Ice.BooleanHolder compress = new Ice.BooleanHolder(); + final int batchRequestNum = _connection.getBatchRequestQueue().swap(_os, compress); int status; if(batchRequestNum == 0) { @@ -64,13 +64,39 @@ public class ConnectionFlushBatch extends OutgoingAsyncBase @Override public Integer call() throws RetryException { - return _connection.sendAsyncRequest(ConnectionFlushBatch.this, false, false, batchRequestNum); + boolean comp; + if(compressBatch == Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = compress.value; + } + return _connection.sendAsyncRequest(ConnectionFlushBatch.this, comp, false, batchRequestNum); } }); } else { - status = _connection.sendAsyncRequest(this, false, false, batchRequestNum); + boolean comp; + if(compressBatch == Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = compress.value; + } + status = _connection.sendAsyncRequest(this, comp, false, batchRequestNum); } if((status & AsyncStatus.Sent) > 0) diff --git a/java-compat/src/Ice/src/main/java/IceInternal/FixedReference.java b/java-compat/src/Ice/src/main/java/IceInternal/FixedReference.java index ebc59c5e690..b9e84ce5d0e 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/FixedReference.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/FixedReference.java @@ -250,7 +250,7 @@ public class FixedReference extends Reference _fixedConnection.throwException(); // Throw in case our connection is already destroyed. - boolean compress; + boolean compress = false; if(defaultsAndOverrides.overrideCompress) { compress = defaultsAndOverrides.overrideCompressValue; @@ -259,10 +259,6 @@ public class FixedReference extends Reference { compress = _compress; } - else - { - compress = _fixedConnection.endpoint().compress(); - } RequestHandler handler = new ConnectionRequestHandler(this, _fixedConnection, compress); if(getInstance().queueRequests()) diff --git a/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java b/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java index 108df4607e0..60514dfd230 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java @@ -196,13 +196,13 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice } public void - flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync) { for(Ice.ConnectionI c : connections()) // connections() is synchronized, no need to synchronize here. { try { - outAsync.flushConnection(c); + outAsync.flushConnection(c, compressBatch); } catch(Ice.LocalException ex) { diff --git a/java-compat/src/Ice/src/main/java/IceInternal/ObjectAdapterFactory.java b/java-compat/src/Ice/src/main/java/IceInternal/ObjectAdapterFactory.java index bfec67de6a6..c44a0b2ef40 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/ObjectAdapterFactory.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/ObjectAdapterFactory.java @@ -218,7 +218,7 @@ public final class ObjectAdapterFactory } public void - flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync) { java.util.List<Ice.ObjectAdapterI> adapters; synchronized(this) @@ -228,7 +228,7 @@ public final class ObjectAdapterFactory for(Ice.ObjectAdapterI adapter : adapters) { - adapter.flushAsyncBatchRequests(outAsync); + adapter.flushAsyncBatchRequests(compressBatch, outAsync); } } diff --git a/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java b/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java index b51856cf3e6..3102f3a500f 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java @@ -271,7 +271,7 @@ public final class OutgoingConnectionFactory } public void - flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync) { java.util.List<Ice.ConnectionI> c = new java.util.LinkedList<Ice.ConnectionI>(); @@ -296,7 +296,7 @@ public final class OutgoingConnectionFactory { try { - outAsync.flushConnection(conn); + outAsync.flushConnection(conn, compressBatch); } catch(Ice.LocalException ex) { diff --git a/java-compat/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java b/java-compat/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java index 7a53d1b2c24..c83f46d964a 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/ProxyFlushBatch.java @@ -28,7 +28,7 @@ public class ProxyFlushBatch extends ProxyOutgoingAsyncBase { super(prx, operation, callback); _observer = ObserverHelper.get(prx, operation); - _batchRequestNum = prx._getBatchRequestQueue().swap(_os); + _batchRequestNum = prx._getBatchRequestQueue().swap(_os, null); } @Override diff --git a/java-compat/src/Ice/src/main/java/IceInternal/Reference.java b/java-compat/src/Ice/src/main/java/IceInternal/Reference.java index a1869b1fcda..c3b0e1b8d76 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/Reference.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/Reference.java @@ -247,6 +247,21 @@ public abstract class Reference implements Cloneable return _hashValue; } + public java.lang.Boolean + getCompressOverride() + { + DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCompress) + { + return Boolean.valueOf(defaultsAndOverrides.overrideCompressValue); + } + else if(_overrideCompress) + { + return Boolean.valueOf(_compress); + } + return null; // Null indicates that compress is not overriden. + } + // // Utility methods // diff --git a/java-compat/test/src/main/java/test/Ice/ami/AMI.java b/java-compat/test/src/main/java/test/Ice/ami/AMI.java index c707f20e6c7..e3d3b792338 100644 --- a/java-compat/test/src/main/java/test/Ice/ami/AMI.java +++ b/java-compat/test/src/main/java/test/Ice/ami/AMI.java @@ -1933,6 +1933,7 @@ public class AMI b1.opBatch(); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback() { @Override @@ -1964,6 +1965,7 @@ public class AMI b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); final FlushExCallback cb = new FlushExCallback(); Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback() { @Override @@ -1995,6 +1997,7 @@ public class AMI b1.opBatch(); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback_Connection_flushBatchRequests() { @Override @@ -2026,6 +2029,7 @@ public class AMI b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); final FlushExCallback cb = new FlushExCallback(); Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback_Connection_flushBatchRequests() { @Override @@ -2062,6 +2066,7 @@ public class AMI b1.opBatch(); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback() { @Override @@ -2093,6 +2098,7 @@ public class AMI b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback() { @Override @@ -2129,6 +2135,7 @@ public class AMI b2.opBatch(); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback() { @Override @@ -2167,6 +2174,7 @@ public class AMI b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback() { @Override @@ -2205,6 +2213,7 @@ public class AMI b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback() { @Override @@ -2236,6 +2245,7 @@ public class AMI b1.opBatch(); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback_Communicator_flushBatchRequests() { @Override @@ -2267,6 +2277,7 @@ public class AMI b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback_Communicator_flushBatchRequests() { @Override @@ -2303,6 +2314,7 @@ public class AMI b2.opBatch(); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback_Communicator_flushBatchRequests() { @Override @@ -2341,6 +2353,7 @@ public class AMI b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback_Communicator_flushBatchRequests() { @Override @@ -2379,6 +2392,7 @@ public class AMI b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, new Ice.Callback_Communicator_flushBatchRequests() { @Override @@ -2500,7 +2514,7 @@ public class AMI Ice.Connection con = p.ice_getConnection(); p2 = (TestIntfPrx)p.ice_batchOneway(); p2.ice_ping(); - r = con.begin_flushBatchRequests(); + r = con.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); test(r.getConnection() == con); test(r.getCommunicator() == communicator); test(r.getProxy() == null); // Expected @@ -2511,7 +2525,7 @@ public class AMI // p2 = (TestIntfPrx)p.ice_batchOneway(); p2.ice_ping(); - r = communicator.begin_flushBatchRequests(); + r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); test(r.getConnection() == null); // Expected test(r.getCommunicator() == communicator); test(r.getProxy() == null); // Expected diff --git a/java-compat/test/src/main/java/test/Ice/ami/lambda/AMI.java b/java-compat/test/src/main/java/test/Ice/ami/lambda/AMI.java index 8d886c20d78..f64ea6dda04 100644 --- a/java-compat/test/src/main/java/test/Ice/ami/lambda/AMI.java +++ b/java-compat/test/src/main/java/test/Ice/ami/lambda/AMI.java @@ -875,6 +875,7 @@ public class AMI b1.opBatch(); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, null, (Ice.Exception ex) -> cb.exception(ex), (boolean sentSynchronously) -> cb.sent(sentSynchronously)); @@ -885,6 +886,7 @@ public class AMI final FlushCallback cb2 = new FlushCallback(); Ice.AsyncResult r2 = b1.ice_getConnection().begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, null, (Ice.Exception ex) -> cb2.exception(ex), (boolean sentSynchronously) -> cb2.sent(sentSynchronously)); @@ -904,6 +906,7 @@ public class AMI b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); final FlushExCallback cb = new FlushExCallback(); Ice.AsyncResult r = b1.ice_getConnection().begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, null, (Ice.Exception ex) -> cb.exception(ex), (boolean sentSynchronously) -> cb.sent(sentSynchronously)); @@ -929,6 +932,7 @@ public class AMI b1.opBatch(); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, null, (Ice.Exception ex) -> cb.exception(ex), (boolean sentSynchronously) -> cb.sent(sentSynchronously)); @@ -949,6 +953,7 @@ public class AMI b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, null, (Ice.Exception ex) -> cb.exception(ex), (boolean sentSynchronously) -> cb.sent(sentSynchronously)); @@ -974,6 +979,7 @@ public class AMI b2.opBatch(); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, null, (Ice.Exception ex) -> cb.exception(ex), (boolean sentSynchronously) -> cb.sent(sentSynchronously)); @@ -1001,6 +1007,7 @@ public class AMI b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, null, (Ice.Exception ex) -> cb.exception(ex), (boolean sentSynchronously) -> cb.sent(sentSynchronously)); @@ -1028,6 +1035,7 @@ public class AMI b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait); final FlushCallback cb = new FlushCallback(); Ice.AsyncResult r = communicator.begin_flushBatchRequests( + Ice.CompressBatch.BasedOnProxy, null, (Ice.Exception ex) -> cb.exception(ex), (boolean sentSynchronously) -> cb.sent(sentSynchronously)); diff --git a/java-compat/test/src/main/java/test/Ice/interrupt/AllTests.java b/java-compat/test/src/main/java/test/Ice/interrupt/AllTests.java index ed523af5dbe..379c64d1032 100644 --- a/java-compat/test/src/main/java/test/Ice/interrupt/AllTests.java +++ b/java-compat/test/src/main/java/test/Ice/interrupt/AllTests.java @@ -476,7 +476,7 @@ public class AllTests p2.op(); p2.op(); - AsyncResult r = p2.ice_getConnection().begin_flushBatchRequests(); + AsyncResult r = p2.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); mainThread.interrupt(); try { @@ -495,7 +495,8 @@ public class AllTests final CallbackBase cb = new CallbackBase(); Ice.Connection con = p2.ice_getConnection(); mainThread.interrupt(); - con.begin_flushBatchRequests(new Callback_Connection_flushBatchRequests() + con.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, + new Callback_Connection_flushBatchRequests() { @Override public void sent(boolean sentSynchronously) @@ -525,7 +526,7 @@ public class AllTests p2.op(); p2.op(); - AsyncResult r = communicator.begin_flushBatchRequests(); + AsyncResult r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy); mainThread.interrupt(); try { @@ -543,7 +544,8 @@ public class AllTests final CallbackBase cb = new CallbackBase(); mainThread.interrupt(); - communicator.begin_flushBatchRequests(new Callback_Communicator_flushBatchRequests() + communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, + new Callback_Communicator_flushBatchRequests() { @Override public void sent(boolean sentSynchronously) diff --git a/java-compat/test/src/main/java/test/Ice/operations/BatchOneways.java b/java-compat/test/src/main/java/test/Ice/operations/BatchOneways.java index d3240fc5a06..5e62fe3c230 100644 --- a/java-compat/test/src/main/java/test/Ice/operations/BatchOneways.java +++ b/java-compat/test/src/main/java/test/Ice/operations/BatchOneways.java @@ -79,6 +79,11 @@ class BatchOneways MyClassPrx batch = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway()); batch.ice_flushBatchRequests(); // Empty flush + if(batch.ice_getConnection() != null) + { + batch.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy); + } + batch.ice_getCommunicator().flushBatchRequests(Ice.CompressBatch.BasedOnProxy); p.opByteSOnewayCallCount(); // Reset the call count @@ -177,5 +182,41 @@ class BatchOneways ic.destroy(); } + + p.ice_ping(); + if(p.ice_getConnection() != null && + p.ice_getCommunicator().getProperties().getProperty("Ice.Override.Compress").equals("")) + { + Ice.ObjectPrx prx = p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway(); + + MyClassPrx batchC1 = MyClassPrxHelper.uncheckedCast(prx.ice_compress(false)); + MyClassPrx batchC2 = MyClassPrxHelper.uncheckedCast(prx.ice_compress(true)); + MyClassPrx batchC3 = MyClassPrxHelper.uncheckedCast(prx.ice_identity(identity)); + + batchC1.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.Yes); + + batchC2.opByteSOneway(bs1); + batchC2.opByteSOneway(bs1); + batchC2.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.No); + + batchC1.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy); + + batchC1.opByteSOneway(bs1); + batchC2.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy); + + batchC1.opByteSOneway(bs1); + batchC3.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy); + } } } diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/CommunicatorI.java b/java/src/Ice/src/main/java/com/zeroc/Ice/CommunicatorI.java index 90d1d89467f..48cfd306144 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/CommunicatorI.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/CommunicatorI.java @@ -211,18 +211,18 @@ public final class CommunicatorI implements Communicator } @Override - public void flushBatchRequests() + public void flushBatchRequests(CompressBatch compressBatch) { - _iceI_flushBatchRequestsAsync().waitForResponse(); + _iceI_flushBatchRequestsAsync(compressBatch).waitForResponse(); } @Override - public java.util.concurrent.CompletableFuture<Void> flushBatchRequestsAsync() + public java.util.concurrent.CompletableFuture<Void> flushBatchRequestsAsync(CompressBatch compressBatch) { - return _iceI_flushBatchRequestsAsync(); + return _iceI_flushBatchRequestsAsync(compressBatch); } - public com.zeroc.IceInternal.CommunicatorFlushBatch _iceI_flushBatchRequestsAsync() + public com.zeroc.IceInternal.CommunicatorFlushBatch _iceI_flushBatchRequestsAsync(CompressBatch compressBatch) { com.zeroc.IceInternal.OutgoingConnectionFactory connectionFactory = _instance.outgoingConnectionFactory(); com.zeroc.IceInternal.ObjectAdapterFactory adapterFactory = _instance.objectAdapterFactory(); @@ -234,8 +234,8 @@ public final class CommunicatorI implements Communicator com.zeroc.IceInternal.CommunicatorFlushBatch f = new com.zeroc.IceInternal.CommunicatorFlushBatch(this, _instance); - connectionFactory.flushAsyncBatchRequests(f); - adapterFactory.flushAsyncBatchRequests(f); + connectionFactory.flushAsyncBatchRequests(compressBatch, f); + adapterFactory.flushAsyncBatchRequests(compressBatch, f); // // Inform the callback that we have finished initiating all of the diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java b/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java index 13b2d7a1dcc..4634a5ca4ec 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java @@ -344,8 +344,7 @@ public final class ConnectionI extends com.zeroc.IceInternal.EventHandler } synchronized public int - sendAsyncRequest(OutgoingAsyncBase out, boolean compress, boolean response, - int batchRequestNum) + sendAsyncRequest(OutgoingAsyncBase out, boolean compress, boolean response, int batchRequestNum) throws com.zeroc.IceInternal.RetryException { final OutputStream os = out.getOs(); @@ -431,17 +430,17 @@ public final class ConnectionI extends com.zeroc.IceInternal.EventHandler } @Override - public void flushBatchRequests() + public void flushBatchRequests(CompressBatch compressBatch) { - ObjectPrx.waitForResponseForCompletion(flushBatchRequestsAsync()); + ObjectPrx.waitForResponseForCompletion(flushBatchRequestsAsync(compressBatch)); } @Override - public java.util.concurrent.CompletableFuture<Void> flushBatchRequestsAsync() + public java.util.concurrent.CompletableFuture<Void> flushBatchRequestsAsync(CompressBatch compressBatch) { com.zeroc.IceInternal.ConnectionFlushBatch f = new com.zeroc.IceInternal.ConnectionFlushBatch(this, _communicator, _instance); - f.invoke(); + f.invoke(compressBatch); return f; } diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/ObjectAdapterI.java b/java/src/Ice/src/main/java/com/zeroc/Ice/ObjectAdapterI.java index de83357a228..2149b60a207 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/ObjectAdapterI.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/ObjectAdapterI.java @@ -746,7 +746,8 @@ public final class ObjectAdapterI implements ObjectAdapter } public void - flushAsyncBatchRequests(com.zeroc.IceInternal.CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(com.zeroc.Ice.CompressBatch compressBatch, + com.zeroc.IceInternal.CommunicatorFlushBatch outAsync) { List<IncomingConnectionFactory> f; synchronized(this) @@ -755,7 +756,7 @@ public final class ObjectAdapterI implements ObjectAdapter } for(IncomingConnectionFactory p : f) { - p.flushAsyncBatchRequests(outAsync); + p.flushAsyncBatchRequests(compressBatch, outAsync); } } diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/BatchRequestQueue.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/BatchRequestQueue.java index 3bb4d920c16..ed776c67636 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/BatchRequestQueue.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/BatchRequestQueue.java @@ -23,7 +23,7 @@ public class BatchRequestQueue @Override public void enqueue() { - enqueueBatchRequest(); + enqueueBatchRequest(_proxy); } @Override @@ -59,6 +59,7 @@ public class BatchRequestQueue _batchStream = new com.zeroc.Ice.OutputStream(instance, Protocol.currentProtocolEncoding); _batchStream.writeBlob(Protocol.requestBatchHdr); _batchMarker = _batchStream.size(); + _batchCompress = false; _request = new BatchRequestI(); _maxSize = instance.batchAutoFlushSize(); @@ -112,6 +113,11 @@ public class BatchRequestQueue } else { + Boolean compress = proxy._getReference().getCompressOverride(); + if(compress != null) + { + _batchCompress |= compress.booleanValue(); + } _batchMarker = _batchStream.size(); ++_batchRequestNum; } @@ -140,12 +146,18 @@ public class BatchRequestQueue } } - synchronized public int + public class SwapResult + { + public int batchRequestNum; + public boolean compress; + }; + + synchronized public SwapResult swap(com.zeroc.Ice.OutputStream os) { if(_batchRequestNum == 0) { - return 0; + return null; } waitStreamInUse(true); @@ -160,20 +172,23 @@ public class BatchRequestQueue _batchStream.resize(_batchMarker); } - int requestNum = _batchRequestNum; + SwapResult result = new SwapResult(); + result.batchRequestNum = _batchRequestNum; + result.compress = _batchCompress; _batchStream.swap(os); // // Reset the batch. // _batchRequestNum = 0; + _batchCompress = false; _batchStream.writeBlob(Protocol.requestBatchHdr); _batchMarker = _batchStream.size(); if(lastRequest != null) { _batchStream.writeBlob(lastRequest); } - return requestNum; + return result; } synchronized public void @@ -218,9 +233,14 @@ public class BatchRequestQueue } } - private void enqueueBatchRequest() + private void enqueueBatchRequest(com.zeroc.Ice.ObjectPrx proxy) { assert(_batchMarker < _batchStream.size()); + Boolean compress = proxy._getReference().getCompressOverride(); + if(compress != null) + { + _batchCompress |= compress.booleanValue(); + } _batchMarker = _batchStream.size(); ++_batchRequestNum; } @@ -231,6 +251,7 @@ public class BatchRequestQueue private boolean _batchStreamCanFlush; private int _batchRequestNum; private int _batchMarker; + private boolean _batchCompress; private BatchRequestI _request; private com.zeroc.Ice.LocalException _exception; private int _maxSize; diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/CommunicatorFlushBatch.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/CommunicatorFlushBatch.java index a7c4dc65b38..c6193bb48aa 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/CommunicatorFlushBatch.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/CommunicatorFlushBatch.java @@ -36,7 +36,7 @@ public class CommunicatorFlushBatch extends InvocationFutureI<Void> complete(null); } - public void flushConnection(final com.zeroc.Ice.ConnectionI con) + public void flushConnection(final com.zeroc.Ice.ConnectionI con, final com.zeroc.Ice.CompressBatch compressBatch) { class FlushBatch extends OutgoingAsyncBaseI<Void> { @@ -106,8 +106,8 @@ public class CommunicatorFlushBatch extends InvocationFutureI<Void> try { final FlushBatch flushBatch = new FlushBatch(); - final int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs()); - if(batchRequestNum == 0) + final BatchRequestQueue.SwapResult r = con.getBatchRequestQueue().swap(flushBatch.getOs()); + if(r == null) { flushBatch.sent(); } @@ -118,14 +118,40 @@ public class CommunicatorFlushBatch extends InvocationFutureI<Void> @Override public Void call() throws RetryException { - con.sendAsyncRequest(flushBatch, false, false, batchRequestNum); + boolean comp = false; + if(compressBatch == com.zeroc.Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == com.zeroc.Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = r.compress; + } + con.sendAsyncRequest(flushBatch, comp, false, r.batchRequestNum); return null; } }); } else { - con.sendAsyncRequest(flushBatch, false, false, batchRequestNum); + boolean comp = false; + if(compressBatch == com.zeroc.Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == com.zeroc.Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = r.compress; + } + con.sendAsyncRequest(flushBatch, comp, false, r.batchRequestNum); } } catch(RetryException ex) diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ConnectionFlushBatch.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ConnectionFlushBatch.java index 2d4a35a0f82..588905e9ba5 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ConnectionFlushBatch.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ConnectionFlushBatch.java @@ -45,14 +45,13 @@ public class ConnectionFlushBatch extends OutgoingAsyncBaseI<Void> super.markCompleted(); } - public void invoke() + public void invoke(com.zeroc.Ice.CompressBatch compressBatch) { try { - final int batchRequestNum = _connection.getBatchRequestQueue().swap(_os); - + final BatchRequestQueue.SwapResult r = _connection.getBatchRequestQueue().swap(_os); int status; - if(batchRequestNum == 0) + if(r == null) { status = AsyncStatus.Sent; if(sent()) @@ -68,13 +67,39 @@ public class ConnectionFlushBatch extends OutgoingAsyncBaseI<Void> public Integer call() throws RetryException { - return _connection.sendAsyncRequest(ConnectionFlushBatch.this, false, false, batchRequestNum); + boolean comp = false; + if(compressBatch == com.zeroc.Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == com.zeroc.Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = r.compress; + } + return _connection.sendAsyncRequest(ConnectionFlushBatch.this, comp, false, r.batchRequestNum); } }); } else { - status = _connection.sendAsyncRequest(this, false, false, batchRequestNum); + boolean comp = false; + if(compressBatch == com.zeroc.Ice.CompressBatch.Yes) + { + comp = true; + } + else if(compressBatch == com.zeroc.Ice.CompressBatch.No) + { + comp = false; + } + else + { + comp = r.compress; + } + status = _connection.sendAsyncRequest(this, comp, false, r.batchRequestNum); } if((status & AsyncStatus.Sent) > 0) diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/FixedReference.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/FixedReference.java index cfa3d1e871b..a2a3de32109 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/FixedReference.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/FixedReference.java @@ -251,7 +251,7 @@ public class FixedReference extends Reference _fixedConnection.throwException(); // Throw in case our connection is already destroyed. - boolean compress; + boolean compress = false; if(defaultsAndOverrides.overrideCompress) { compress = defaultsAndOverrides.overrideCompressValue; @@ -260,10 +260,6 @@ public class FixedReference extends Reference { compress = _compress; } - else - { - compress = _fixedConnection.endpoint().compress(); - } RequestHandler handler = new ConnectionRequestHandler(this, _fixedConnection, compress); if(getInstance().queueRequests()) diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/IncomingConnectionFactory.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/IncomingConnectionFactory.java index 98dd813fb66..2aacbaebdc3 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/IncomingConnectionFactory.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/IncomingConnectionFactory.java @@ -192,13 +192,13 @@ public final class IncomingConnectionFactory extends EventHandler implements Con } public void - flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(com.zeroc.Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync) { for(ConnectionI c : connections()) // connections() is synchronized, no need to synchronize here. { try { - outAsync.flushConnection(c); + outAsync.flushConnection(c, compressBatch); } catch(com.zeroc.Ice.LocalException ex) { diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ObjectAdapterFactory.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ObjectAdapterFactory.java index 94d1417a663..ca27509d5cc 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ObjectAdapterFactory.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ObjectAdapterFactory.java @@ -221,7 +221,7 @@ public final class ObjectAdapterFactory } public void - flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(com.zeroc.Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync) { java.util.List<ObjectAdapterI> adapters; synchronized(this) @@ -231,7 +231,7 @@ public final class ObjectAdapterFactory for(ObjectAdapterI adapter : adapters) { - adapter.flushAsyncBatchRequests(outAsync); + adapter.flushAsyncBatchRequests(compressBatch, outAsync); } } diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/OutgoingConnectionFactory.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/OutgoingConnectionFactory.java index 9bd778ecfb4..99f29b8cbd1 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/OutgoingConnectionFactory.java @@ -277,7 +277,7 @@ public final class OutgoingConnectionFactory } public void - flushAsyncBatchRequests(CommunicatorFlushBatch outAsync) + flushAsyncBatchRequests(com.zeroc.Ice.CompressBatch compressBatch, CommunicatorFlushBatch outAsync) { java.util.List<ConnectionI> c = new java.util.LinkedList<>(); @@ -302,7 +302,7 @@ public final class OutgoingConnectionFactory { try { - outAsync.flushConnection(conn); + outAsync.flushConnection(conn, compressBatch); } catch(LocalException ex) { diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ProxyFlushBatch.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ProxyFlushBatch.java index 527a26658bd..870b6be3099 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ProxyFlushBatch.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ProxyFlushBatch.java @@ -15,7 +15,8 @@ public class ProxyFlushBatch extends ProxyOutgoingAsyncBaseI<Void> { super(prx, "ice_flushBatchRequests"); _observer = ObserverHelper.get(prx, "ice_flushBatchRequests"); - _batchRequestNum = prx._getBatchRequestQueue().swap(_os); + BatchRequestQueue.SwapResult r = prx._getBatchRequestQueue().swap(_os); + _batchRequestNum = r != null ? r.batchRequestNum : 0; } @Override diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Reference.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Reference.java index 51cfc65c06e..43a9befc735 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Reference.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Reference.java @@ -247,6 +247,21 @@ public abstract class Reference implements Cloneable return _hashValue; } + public java.lang.Boolean + getCompressOverride() + { + DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCompress) + { + return Boolean.valueOf(defaultsAndOverrides.overrideCompressValue); + } + else if(_overrideCompress) + { + return Boolean.valueOf(_compress); + } + return null; // Null indicates that compress is not overriden. + } + // // Utility methods // diff --git a/java/test/src/main/java/test/Ice/ami/AMI.java b/java/test/src/main/java/test/Ice/ami/AMI.java index 0c6b125897a..bc47eb8882f 100644 --- a/java/test/src/main/java/test/Ice/ami/AMI.java +++ b/java/test/src/main/java/test/Ice/ami/AMI.java @@ -16,6 +16,7 @@ import java.util.concurrent.CompletionException; import com.zeroc.Ice.InvocationFuture; import com.zeroc.Ice.Util; +import com.zeroc.Ice.CompressBatch; import test.Ice.ami.Test.CloseMode; import test.Ice.ami.Test.TestIntfPrx; @@ -375,7 +376,8 @@ public class AMI ice_batchOneway(); b1.opBatch(); b1.opBatch(); - CompletableFuture<Void> r = b1.ice_getConnection().flushBatchRequestsAsync(); + CompletableFuture<Void> r = + b1.ice_getConnection().flushBatchRequestsAsync(CompressBatch.BasedOnProxy); Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) -> { test(ex == null); @@ -397,7 +399,8 @@ public class AMI ice_batchOneway(); b1.opBatch(); b1.ice_getConnection().close(com.zeroc.Ice.ConnectionClose.CloseGracefullyAndWait); - CompletableFuture<Void> r = b1.ice_getConnection().flushBatchRequestsAsync(); + CompletableFuture<Void> r = + b1.ice_getConnection().flushBatchRequestsAsync(CompressBatch.BasedOnProxy); Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) -> { test(ex != null); @@ -424,7 +427,7 @@ public class AMI ice_batchOneway(); b1.opBatch(); b1.opBatch(); - CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(); + CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy); Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) -> { test(ex == null); @@ -446,7 +449,7 @@ public class AMI ice_batchOneway(); b1.opBatch(); b1.ice_getConnection().close(com.zeroc.Ice.ConnectionClose.CloseGracefullyAndWait); - CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(); + CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy); Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) -> { test(ex == null); @@ -473,7 +476,7 @@ public class AMI b1.opBatch(); b2.opBatch(); b2.opBatch(); - CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(); + CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy); Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) -> { test(ex == null); @@ -502,7 +505,7 @@ public class AMI b1.opBatch(); b2.opBatch(); b1.ice_getConnection().close(com.zeroc.Ice.ConnectionClose.CloseGracefullyAndWait); - CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(); + CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy); Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) -> { test(ex == null); @@ -531,7 +534,7 @@ public class AMI b2.opBatch(); b1.ice_getConnection().close(com.zeroc.Ice.ConnectionClose.CloseGracefullyAndWait); b2.ice_getConnection().close(com.zeroc.Ice.ConnectionClose.CloseGracefullyAndWait); - CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(); + CompletableFuture<Void> r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy); Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) -> { test(ex == null); @@ -654,7 +657,8 @@ public class AMI com.zeroc.Ice.Connection con = p.ice_getConnection(); TestIntfPrx p2 = p.ice_batchOneway(); p2.ice_ping(); - InvocationFuture<Void> r = Util.getInvocationFuture(con.flushBatchRequestsAsync()); + InvocationFuture<Void> r = + Util.getInvocationFuture(con.flushBatchRequestsAsync(CompressBatch.BasedOnProxy)); test(r.getConnection() == con); test(r.getCommunicator() == communicator); test(r.getProxy() == null); // Expected @@ -665,7 +669,7 @@ public class AMI // p2 = p.ice_batchOneway(); p2.ice_ping(); - r = Util.getInvocationFuture(communicator.flushBatchRequestsAsync()); + r = Util.getInvocationFuture(communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy)); test(r.getConnection() == null); // Expected test(r.getCommunicator() == communicator); test(r.getProxy() == null); // Expected diff --git a/java/test/src/main/java/test/Ice/interrupt/AllTests.java b/java/test/src/main/java/test/Ice/interrupt/AllTests.java index 62a1d6034e1..c7b00170209 100644 --- a/java/test/src/main/java/test/Ice/interrupt/AllTests.java +++ b/java/test/src/main/java/test/Ice/interrupt/AllTests.java @@ -21,6 +21,7 @@ import com.zeroc.Ice.Connection; import com.zeroc.Ice.LocalException; import com.zeroc.Ice.Util; import com.zeroc.Ice.InvocationFuture; +import com.zeroc.Ice.CompressBatch; import test.Ice.interrupt.Test.CannotInterruptException; import test.Ice.interrupt.Test.TestIntfControllerPrx; @@ -435,7 +436,7 @@ public class AllTests p2.op(); p2.op(); - r = p2.ice_getConnection().flushBatchRequestsAsync(); + r = p2.ice_getConnection().flushBatchRequestsAsync(CompressBatch.BasedOnProxy); mainThread.interrupt(); try { @@ -462,7 +463,7 @@ public class AllTests final Callback cb = new Callback(); com.zeroc.Ice.Connection con = p2.ice_getConnection(); mainThread.interrupt(); - r = con.flushBatchRequestsAsync(); + r = con.flushBatchRequestsAsync(CompressBatch.BasedOnProxy); r.whenComplete((result, ex) -> test(ex == null)); Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) -> { @@ -486,7 +487,7 @@ public class AllTests p2.op(); p2.op(); - r = communicator.flushBatchRequestsAsync(); + r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy); mainThread.interrupt(); try { @@ -512,7 +513,7 @@ public class AllTests final Callback cb = new Callback(); mainThread.interrupt(); - r = communicator.flushBatchRequestsAsync(); + r = communicator.flushBatchRequestsAsync(CompressBatch.BasedOnProxy); r.whenComplete((result, ex) -> test(ex == null)); Util.getInvocationFuture(r).whenSent((sentSynchronously, ex) -> { diff --git a/java/test/src/main/java/test/Ice/operations/BatchOneways.java b/java/test/src/main/java/test/Ice/operations/BatchOneways.java index 30a5672e80d..be982010978 100644 --- a/java/test/src/main/java/test/Ice/operations/BatchOneways.java +++ b/java/test/src/main/java/test/Ice/operations/BatchOneways.java @@ -73,6 +73,11 @@ class BatchOneways MyClassPrx batch = p.ice_batchOneway(); batch.ice_flushBatchRequests(); // Empty flush + if(batch.ice_getConnection() != null) + { + batch.ice_getConnection().flushBatchRequests(com.zeroc.Ice.CompressBatch.BasedOnProxy); + } + batch.ice_getCommunicator().flushBatchRequests(com.zeroc.Ice.CompressBatch.BasedOnProxy); p.opByteSOnewayCallCount(); // Reset the call count @@ -171,5 +176,41 @@ class BatchOneways ic.destroy(); } + + p.ice_ping(); + if(p.ice_getConnection() != null && + p.ice_getCommunicator().getProperties().getProperty("Ice.Override.Compress").equals("")) + { + com.zeroc.Ice.ObjectPrx prx = p.ice_getConnection().createProxy(p.ice_getIdentity()).ice_batchOneway(); + + MyClassPrx batchC1 = MyClassPrx.uncheckedCast(prx.ice_compress(false)); + MyClassPrx batchC2 = MyClassPrx.uncheckedCast(prx.ice_compress(true)); + MyClassPrx batchC3 = MyClassPrx.uncheckedCast(prx.ice_identity(identity)); + + batchC1.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(com.zeroc.Ice.CompressBatch.Yes); + + batchC2.opByteSOneway(bs1); + batchC2.opByteSOneway(bs1); + batchC2.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(com.zeroc.Ice.CompressBatch.No); + + batchC1.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(com.zeroc.Ice.CompressBatch.BasedOnProxy); + + batchC1.opByteSOneway(bs1); + batchC2.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(com.zeroc.Ice.CompressBatch.BasedOnProxy); + + batchC1.opByteSOneway(bs1); + batchC3.opByteSOneway(bs1); + batchC1.opByteSOneway(bs1); + batchC1.ice_getConnection().flushBatchRequests(com.zeroc.Ice.CompressBatch.BasedOnProxy); + } } } diff --git a/js/src/Ice/ConnectRequestHandler.js b/js/src/Ice/ConnectRequestHandler.js index 6b33f5fb760..6126c277234 100644 --- a/js/src/Ice/ConnectRequestHandler.js +++ b/js/src/Ice/ConnectRequestHandler.js @@ -36,7 +36,6 @@ class ConnectRequestHandler this._initialized = false; this._connection = null; - this._compress = false; this._exception = null; this._requests = []; } @@ -67,7 +66,7 @@ class ConnectRequestHandler this._requests.push(out); return AsyncStatus.Queued; } - return out.invokeRemote(this._connection, this._compress, this._response); + return out.invokeRemote(this._connection, this._response); } asyncRequestCanceled(out, ex) @@ -113,11 +112,11 @@ class ConnectRequestHandler // // Implementation of Reference_GetConnectionCallback // - setConnection(values) + setConnection(connection) { Debug.assert(this._exception === null && this._connection === null); - [this._connection, this._compress] = values; + this._connection = connection; // // If this proxy is for a non-local object, and we are using a router, then @@ -213,7 +212,7 @@ class ConnectRequestHandler { try { - request.invokeRemote(this._connection, this._compress, this._response); + request.invokeRemote(this._connection, this._response); } catch(ex) { @@ -238,7 +237,7 @@ class ConnectRequestHandler if(this._reference.getCacheConnection() && exception === null) { - this._requestHandler = new ConnectionRequestHandler(this._reference, this._connection, this._compress); + this._requestHandler = new ConnectionRequestHandler(this._reference, this._connection); this._proxies.forEach(proxy => proxy._updateRequestHandler(this, this._requestHandler)); } diff --git a/js/src/Ice/ConnectionI.js b/js/src/Ice/ConnectionI.js index 4541098ae78..4243b438ad9 100644 --- a/js/src/Ice/ConnectionI.js +++ b/js/src/Ice/ConnectionI.js @@ -67,7 +67,6 @@ class MessageInfo this.stream = new InputStream(instance, Protocol.currentProtocolEncoding); this.invokeNum = 0; this.requestId = 0; - this.compress = false; this.servantManager = null; this.adapter = null; this.outAsync = null; @@ -367,7 +366,7 @@ class ConnectionI } } - sendAsyncRequest(out, compress, response, batchRequestNum) + sendAsyncRequest(out, response, batchRequestNum) { let requestId = 0; const ostr = out.getOs(); @@ -424,7 +423,7 @@ class ConnectionI let status; try { - status = this.sendMessage(OutgoingMessage.create(out, out.getOs(), compress, requestId)); + status = this.sendMessage(OutgoingMessage.create(out, out.getOs(), requestId)); } catch(ex) { @@ -572,7 +571,7 @@ class ConnectionI } } - sendResponse(os, compressFlag) + sendResponse(os) { Debug.assert(this._state > StateNotValidated); @@ -593,7 +592,7 @@ class ConnectionI throw this._exception; } - this.sendMessage(OutgoingMessage.createForStream(os, compressFlag !== 0, true)); + this.sendMessage(OutgoingMessage.createForStream(os, true)); if(this._state === StateClosing && this._dispatchCount === 0) { @@ -927,8 +926,7 @@ class ConnectionI if(info.invokeNum > 0) { - this.invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, - info.adapter); + this.invokeAll(info.stream, info.invokeNum, info.requestId, info.servantManager, info.adapter); // // Don't increase count, the dispatch count is @@ -1444,7 +1442,7 @@ class ConnectionI os.writeByte(0); // compression status: always report 0 for CloseConnection. os.writeInt(Protocol.headerSize); // Message size. - if((this.sendMessage(OutgoingMessage.createForStream(os, false, false)) & AsyncStatus.Sent) > 0) + if((this.sendMessage(OutgoingMessage.createForStream(os, false)) & AsyncStatus.Sent) > 0) { // // Schedule the close timeout to wait for the peer to close the connection. @@ -1479,7 +1477,7 @@ class ConnectionI os.writeInt(Protocol.headerSize); // Message size. try { - this.sendMessage(OutgoingMessage.createForStream(os, false, false)); + this.sendMessage(OutgoingMessage.createForStream(os, false)); } catch(ex) { @@ -1769,8 +1767,8 @@ class ConnectionI // info.stream.pos = 8; const messageType = info.stream.readByte(); - info.compress = info.stream.readByte(); - if(info.compress === 2) + const compress = info.stream.readByte(); + if(compress === 2) { throw new Ice.FeatureNotSupportedException("Cannot uncompress compressed message"); } @@ -1902,7 +1900,7 @@ class ConnectionI return info; } - invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter) + invokeAll(stream, invokeNum, requestId, servantManager, adapter) { try { @@ -1914,7 +1912,6 @@ class ConnectionI let inc = new IncomingAsync(this._instance, this, adapter, !this._endpoint.datagram() && requestId !== 0, // response - compress, requestId); // @@ -2102,7 +2099,6 @@ class OutgoingMessage { this.stream = null; this.outAsync = null; - this.compress = false; this.requestId = 0; this.prepared = false; } @@ -2140,11 +2136,10 @@ class OutgoingMessage } } - static createForStream(stream, compress, adopt) + static createForStream(stream, adopt) { const m = new OutgoingMessage(); m.stream = stream; - m.compress = compress; m.adopt = adopt; m.isSent = false; m.requestId = 0; @@ -2152,11 +2147,10 @@ class OutgoingMessage return m; } - static create(out, stream, compress, requestId) + static create(out, stream, requestId) { const m = new OutgoingMessage(); m.stream = stream; - m.compress = compress; m.outAsync = out; m.requestId = requestId; m.isSent = false; diff --git a/js/src/Ice/ConnectionRequestHandler.js b/js/src/Ice/ConnectionRequestHandler.js index 84e0c1a42a7..45ce1a8d29d 100644 --- a/js/src/Ice/ConnectionRequestHandler.js +++ b/js/src/Ice/ConnectionRequestHandler.js @@ -12,12 +12,11 @@ const ReferenceMode = Ice.ReferenceMode; class ConnectionRequestHandler { - constructor(ref, connection, compress) + constructor(ref, connection) { this._reference = ref; this._response = ref.getMode() == ReferenceMode.ModeTwoway; this._connection = connection; - this._compress = compress; } update(previousHandler, newHandler) @@ -47,7 +46,7 @@ class ConnectionRequestHandler sendAsyncRequest(out) { - return out.invokeRemote(this._connection, this._compress, this._response); + return out.invokeRemote(this._connection, this._response); } asyncRequestCanceled(out) diff --git a/js/src/Ice/DefaultsAndOverrides.js b/js/src/Ice/DefaultsAndOverrides.js index 420cdd048ea..753f48c46c1 100644 --- a/js/src/Ice/DefaultsAndOverrides.js +++ b/js/src/Ice/DefaultsAndOverrides.js @@ -11,9 +11,9 @@ const Ice = require("../Ice/ModuleRegistry").Ice; Ice._ModuleRegistry.require(module, [ - "../Ice/FormatType", - "../Ice/EndpointTypes", - "../Ice/Protocol", + "../Ice/FormatType", + "../Ice/EndpointTypes", + "../Ice/Protocol", "../Ice/LocalException" ]); @@ -88,7 +88,6 @@ class DefaultsAndOverrides this.overrideCloseTimeoutValue = -1; } - this.overrideCompress = false; this.overrideSecure = false; value = properties.getPropertyWithDefault("Ice.Default.EndpointSelection", "Random"); diff --git a/js/src/Ice/IncomingAsync.js b/js/src/Ice/IncomingAsync.js index 92d074dda71..b46fd8a5c77 100644 --- a/js/src/Ice/IncomingAsync.js +++ b/js/src/Ice/IncomingAsync.js @@ -33,11 +33,10 @@ const StringUtil = Ice.StringUtil; class IncomingAsync { - constructor(instance, connection, adapter, response, compress, requestId) + constructor(instance, connection, adapter, response, requestId) { this._instance = instance; this._response = response; - this._compress = compress; this._connection = connection; this._format = Ice.FormatType.DefaultFormat; @@ -230,7 +229,7 @@ class IncomingAsync this._os.writeString(ex.operation); - this._connection.sendResponse(this._os, this._compress); + this._connection.sendResponse(this._os); } else { @@ -251,7 +250,7 @@ class IncomingAsync this._os.writeInt(this._current.requestId); this._os.writeByte(Protocol.replyUnknownLocalException); this._os.writeString(ex.unknown); - this._connection.sendResponse(this._os, this._compress); + this._connection.sendResponse(this._os); } else { @@ -272,7 +271,7 @@ class IncomingAsync this._os.writeInt(this._current.requestId); this._os.writeByte(Protocol.replyUnknownUserException); this._os.writeString(ex.unknown); - this._connection.sendResponse(this._os, this._compress); + this._connection.sendResponse(this._os); } else { @@ -293,7 +292,7 @@ class IncomingAsync this._os.writeInt(this._current.requestId); this._os.writeByte(Protocol.replyUnknownException); this._os.writeString(ex.unknown); - this._connection.sendResponse(this._os, this._compress); + this._connection.sendResponse(this._os); } else { @@ -321,7 +320,7 @@ class IncomingAsync s.push(ex.stack); } this._os.writeString(s.join("")); - this._connection.sendResponse(this._os, this._compress); + this._connection.sendResponse(this._os); } else { @@ -339,7 +338,7 @@ class IncomingAsync this._os.startEncapsulation(this._current.encoding, this._format); this._os.writeUserException(ex); this._os.endEncapsulation(); - this._connection.sendResponse(this._os, this._compress); + this._connection.sendResponse(this._os); } else { @@ -361,7 +360,7 @@ class IncomingAsync this._os.writeByte(Protocol.replyUnknownException); //this._os.writeString(ex.toString()); this._os.writeString(ex.toString() + (ex.stack ? "\n" + ex.stack : "")); - this._connection.sendResponse(this._os, this._compress); + this._connection.sendResponse(this._os); } else { @@ -526,7 +525,7 @@ class IncomingAsync if(this._response) { - this._connection.sendResponse(this._os, this._compress); + this._connection.sendResponse(this._os); } else { diff --git a/js/src/Ice/ObjectPrx.js b/js/src/Ice/ObjectPrx.js index d68043eac47..506587e90d6 100644 --- a/js/src/Ice/ObjectPrx.js +++ b/js/src/Ice/ObjectPrx.js @@ -44,7 +44,7 @@ class ObjectPrx this._reference = null; this._requestHandler = null; } - + hashCode(r) { return this._reference.hashCode(); @@ -411,19 +411,6 @@ class ObjectPrx } } - ice_compress(co) - { - const ref = this._reference.changeCompress(co); - if(ref.equals(this._reference)) - { - return this; - } - else - { - return this._newInstance(ref); - } - } - ice_timeout(t) { if(t < 1 && t !== -1) @@ -661,7 +648,7 @@ class ObjectPrx } return false; } - + // // Generic invocation for operations that have input parameters. // @@ -878,7 +865,7 @@ class ObjectPrx return false; } - + static ice_staticId() { return this._id; diff --git a/js/src/Ice/OutgoingAsync.js b/js/src/Ice/OutgoingAsync.js index bce53bc29a9..168dd78976f 100644 --- a/js/src/Ice/OutgoingAsync.js +++ b/js/src/Ice/OutgoingAsync.js @@ -102,7 +102,7 @@ class ProxyOutgoingAsyncBase extends OutgoingAsyncBase { this.markFinishedEx(ex); } - + invokeImpl(userThread) { try @@ -182,7 +182,7 @@ class ProxyOutgoingAsyncBase extends OutgoingAsyncBase } super.markFinishedEx.call(this, ex); } - + handleException(ex) { const interval = { value: 0 }; @@ -276,9 +276,9 @@ class OutgoingAsync extends ProxyOutgoingAsyncBase this.markSent(!this._proxy.ice_isTwoway()); } - invokeRemote(connection, compress, response) + invokeRemote(connection, response) { - return connection.sendAsyncRequest(this, compress, response, 0); + return connection.sendAsyncRequest(this, response, 0); } abort(ex) @@ -507,14 +507,14 @@ class ProxyFlushBatch extends ProxyOutgoingAsyncBase this._batchRequestNum = prx._getBatchRequestQueue().swap(this._os); } - invokeRemote(connection, compress, response) + invokeRemote(connection, response) { if(this._batchRequestNum === 0) { this.sent(); return AsyncStatus.Sent; } - return connection.sendAsyncRequest(this, compress, response, this._batchRequestNum); + return connection.sendAsyncRequest(this, response, this._batchRequestNum); } invoke() @@ -531,12 +531,12 @@ class ProxyGetConnection extends ProxyOutgoingAsyncBase super(prx, operation); } - invokeRemote(connection, compress, response) + invokeRemote(connection, response) { this.markFinished(true, r => r.resolve(connection)); return AsyncStatus.Sent; } - + invoke() { this.invokeImpl(true); // userThread = true @@ -563,7 +563,7 @@ class ConnectionFlushBatch extends OutgoingAsyncBase } else { - status = this._connection.sendAsyncRequest(this, false, false, batchRequestNum); + status = this._connection.sendAsyncRequest(this, false, batchRequestNum); } if((status & AsyncStatus.Sent) > 0) @@ -596,7 +596,7 @@ class HeartbeatAsync extends OutgoingAsyncBase this._os.writeByte(0); this._os.writeInt(Protocol.headerSize); // Message size. - let status = this._connection.sendAsyncRequest(this, false, false, 0); + let status = this._connection.sendAsyncRequest(this, false, 0); if((status & AsyncStatus.Sent) > 0) { diff --git a/js/src/Ice/OutgoingConnectionFactory.js b/js/src/Ice/OutgoingConnectionFactory.js index 860d78fd522..48dfabe7ce9 100644 --- a/js/src/Ice/OutgoingConnectionFactory.js +++ b/js/src/Ice/OutgoingConnectionFactory.js @@ -73,7 +73,7 @@ class OutgoingConnectionFactory } // - // Returns a promise, success callback receives (connection, compress) + // Returns a promise, success callback receives the connection // create(endpts, hasMore, selType) { @@ -89,11 +89,10 @@ class OutgoingConnectionFactory // try { - const compress = { value: false }; - const connection = this.findConnectionByEndpoint(endpoints, compress); + const connection = this.findConnectionByEndpoint(endpoints); if(connection !== null) { - return Ice.Promise.resolve([connection, compress.value]); + return Ice.Promise.resolve(connection); } } catch(ex) @@ -217,7 +216,7 @@ class OutgoingConnectionFactory }); } - findConnectionByEndpoint(endpoints, compress) + findConnectionByEndpoint(endpoints) { if(this._destroyed) { @@ -246,14 +245,6 @@ class OutgoingConnectionFactory { if(connectionList[j].isActiveOrHolding()) // Don't return destroyed or un-validated connections { - if(defaultsAndOverrides.overrideCompress) - { - compress.value = defaultsAndOverrides.overrideCompressValue; - } - else - { - compress.value = endpoint.compress(); - } return connectionList[j]; } } @@ -289,7 +280,7 @@ class OutgoingConnectionFactory } } - getConnection(endpoints, cb, compress) + getConnection(endpoints, cb) { if(this._destroyed) { @@ -322,7 +313,7 @@ class OutgoingConnectionFactory // // Search for a matching connection. If we find one, we're done. // - const connection = this.findConnectionByEndpoint(endpoints, compress); + const connection = this.findConnectionByEndpoint(endpoints); if(connection !== null) { return connection; @@ -447,12 +438,8 @@ class OutgoingConnectionFactory callbacks.forEach(cc => cc.removeFromPending()); - const defaultsAndOverrides = this._instance.defaultsAndOverrides(); - const compress = defaultsAndOverrides.overrideCompress ? defaultsAndOverrides.overrideCompressValue : - endpoint.compress(); - callbacks.forEach(cc => cc.getConnection()); - connectionCallbacks.forEach(cc => cc.setConnection(connection, compress)); + connectionCallbacks.forEach(cc => cc.setConnection(connection)); this.checkFinished(); } @@ -748,13 +735,13 @@ class ConnectCallback } } - setConnection(connection, compress) + setConnection(connection) { // // Callback from the factory: the connection to one of the callback // connectors has been established. // - this._promise.resolve([connection, compress]); + this._promise.resolve(connection); this._factory.decPendingConnectCount(); // Must be called last. } @@ -824,8 +811,7 @@ class ConnectCallback // // Ask the factory to get a connection. // - const compress = { value: false }; - const connection = this._factory.getConnection(this._endpoints, this, compress); + const connection = this._factory.getConnection(this._endpoints, this); if(connection === null) { // @@ -837,7 +823,7 @@ class ConnectCallback return; } - this._promise.resolve([connection, compress.value]); + this._promise.resolve(connection); this._factory.decPendingConnectCount(); // Must be called last. } catch(ex) diff --git a/js/src/Ice/Reference.js b/js/src/Ice/Reference.js index 043947c5191..86b4d2ea4ca 100644 --- a/js/src/Ice/Reference.js +++ b/js/src/Ice/Reference.js @@ -877,8 +877,6 @@ class Reference this._encoding = encoding; this._invocationTimeout = invocationTimeout; this._hashInitialized = false; - this._overrideCompress = false; - this._compress = false; // Only used if _overrideCompress == true } getMode() @@ -1083,18 +1081,6 @@ class Reference return r; } - changeCompress(newCompress) - { - if(this._overrideCompress && this._compress === newCompress) - { - return this; - } - const r = this._instance.referenceFactory().copy(this); - r._compress = newCompress; - r._overrideCompress = true; - return r; - } - changeAdapterId(newAdapterId) { // Abstract @@ -1185,11 +1171,6 @@ class Reference } } h = HashUtil.addString(h, this._facet); - h = HashUtil.addBoolean(h, this._overrideCompress); - if(this._overrideCompress) - { - h = HashUtil.addBoolean(h, this._compress); - } h = HashUtil.addHashable(h, this._protocol); h = HashUtil.addHashable(h, this._encoding); h = HashUtil.addNumber(h, this._invocationTimeout); @@ -1424,15 +1405,6 @@ class Reference return false; } - if(this._overrideCompress !== r._overrideCompress) - { - return false; - } - if(this._overrideCompress && this._compress !== r._compress) - { - return false; - } - if(!this._protocol.equals(r._protocol)) { return false; @@ -1464,8 +1436,6 @@ class Reference // Copy the members that are not passed to the constructor. // r._context = this._context; - r._overrideCompress = this._overrideCompress; - r._compress = this._compress; } } @@ -1644,23 +1614,9 @@ class FixedReference extends Reference this._fixedConnection.throwException(); // Throw in case our connection is already destroyed. - let compress; - if(defaultsAndOverrides.overrideCompress) - { - compress = defaultsAndOverrides.overrideCompressValue; - } - else if(this._overrideCompress) - { - compress = this._compress; - } - else - { - compress = this._fixedConnection.endpoint().compress(); - } - - return proxy._setRequestHandler(new ConnectionRequestHandler(this, this._fixedConnection, compress)); + return proxy._setRequestHandler(new ConnectionRequestHandler(this, this._fixedConnection)); } - + getBatchRequestQueue() { return this._fixedConnection.getBatchRequestQueue(); @@ -1775,16 +1731,6 @@ class RoutableReference extends Reference return r; } - changeCompress(newCompress) - { - const r = super.changeCompress(newCompress); - if(r !== this && this._endpoints.length > 0) // Also override the compress flag on the endpoints if it was updated. - { - r._endpoints = this._endpoints.map(endpoint => endpoint.changeCompress(newCompress)); - } - return r; - } - changeAdapterId(newAdapterId) { if(this._adapterId === newAdapterId) @@ -2094,7 +2040,7 @@ class RoutableReference extends Reference getConnection() { - const p = new Ice.Promise(); // success callback receives (connection, compress) + const p = new Ice.Promise(); // success callback receives (connection) if(this._routerInfo !== null) { @@ -2225,10 +2171,6 @@ class RoutableReference extends Reference for(let i = 0; i < endpts.length; ++i) { endpts[i] = endpts[i].changeConnectionId(this._connectionId); - if(this._overrideCompress) - { - endpts[i] = endpts[i].changeCompress(this._compress); - } if(this._overrideTimeout) { endpts[i] = endpts[i].changeTimeout(this._timeout); @@ -2352,7 +2294,7 @@ class RoutableReference extends Reference // const cb = new CreateConnectionCallback(this, null, promise); factory.create(endpoints, false, this.getEndpointSelection()).then( - values => cb.setConnection(values)).catch(ex => cb.setException(ex)); + connection => cb.setConnection(connection)).catch(ex => cb.setException(ex)); } else { @@ -2365,7 +2307,7 @@ class RoutableReference extends Reference // const cb = new CreateConnectionCallback(this, endpoints, promise); factory.create([ endpoints[0] ], true, this.getEndpointSelection()).then( - values => cb.setConnection(values)).catch(ex => cb.setException(ex)); + connection => cb.setConnection(connection)).catch(ex => cb.setException(ex)); } return promise; } @@ -2385,9 +2327,8 @@ class CreateConnectionCallback this.exception = null; } - setConnection(values) + setConnection(connection) { - const [connection, compress] = values; // // If we have a router, set the object adapter for this router // (if any) to the new connection, so that callbacks from the @@ -2397,7 +2338,7 @@ class CreateConnectionCallback { connection.setAdapter(this.ref.getRouterInfo().getAdapter()); } - this.promise.resolve(values); + this.promise.resolve(connection); } setException(ex) @@ -2414,9 +2355,9 @@ class CreateConnectionCallback } this.ref.getInstance().outgoingConnectionFactory().create( - [ this.endpoints[this.i] ], - this.i != this.endpoints.length - 1, - this.ref.getEndpointSelection()).then(values => this.setConnection(values)) + [ this.endpoints[this.i] ], + this.i != this.endpoints.length - 1, + this.ref.getEndpointSelection()).then(connection => this.setConnection(connection)) .catch(ex => this.setException(ex)); } } diff --git a/js/src/Ice/RequestHandlerFactory.js b/js/src/Ice/RequestHandlerFactory.js index fdfbcffb766..5256b61e950 100644 --- a/js/src/Ice/RequestHandlerFactory.js +++ b/js/src/Ice/RequestHandlerFactory.js @@ -50,9 +50,9 @@ class RequestHandlerFactory if(connect) { - ref.getConnection().then(values => + ref.getConnection().then(connection => { - handler.setConnection(values); + handler.setConnection(connection); }, ex => { diff --git a/js/test/Ice/proxy/Client.js b/js/test/Ice/proxy/Client.js index e8d14bc0254..a93eae5b46b 100644 --- a/js/test/Ice/proxy/Client.js +++ b/js/test/Ice/proxy/Client.js @@ -698,8 +698,9 @@ test(compObj.ice_connectionId("id1").ice_getConnectionId() === "id1"); test(compObj.ice_connectionId("id2").ice_getConnectionId() === "id2"); - test(compObj.ice_compress(true).equals(compObj.ice_compress(true))); - test(!compObj.ice_compress(false).equals(compObj.ice_compress(true))); + // Proxy doesn't support ice_compress + //test(compObj.ice_compress(true).equals(compObj.ice_compress(true))); + //test(!compObj.ice_compress(false).equals(compObj.ice_compress(true))); test(compObj.ice_timeout(20).equals(compObj.ice_timeout(20))); test(!compObj.ice_timeout(10).equals(compObj.ice_timeout(20))); diff --git a/objective-c/src/Ice/CommunicatorI.mm b/objective-c/src/Ice/CommunicatorI.mm index 5bd82866953..050e5d9b0e7 100644 --- a/objective-c/src/Ice/CommunicatorI.mm +++ b/objective-c/src/Ice/CommunicatorI.mm @@ -445,12 +445,12 @@ { @throw [ICEFeatureNotSupportedException featureNotSupportedException:__FILE__ line:__LINE__]; } --(void) flushBatchRequests +-(void) flushBatchRequests:(ICECompressBatch)compress { NSException* nsex = nil; try { - COMMUNICATOR->flushBatchRequests(); + COMMUNICATOR->flushBatchRequests((Ice::CompressBatch)compress); } catch(const std::exception& ex) { @@ -461,22 +461,24 @@ @throw nsex; } } --(id<ICEAsyncResult>) begin_flushBatchRequests +-(id<ICEAsyncResult>) begin_flushBatchRequests:(ICECompressBatch)compress { return beginCppCall(^(Ice::AsyncResultPtr& result) { - result = COMMUNICATOR->begin_flushBatchRequests(); + result = COMMUNICATOR->begin_flushBatchRequests((Ice::CompressBatch)compress); }); } --(id<ICEAsyncResult>) begin_flushBatchRequests:(void(^)(ICEException*))exception +-(id<ICEAsyncResult>) begin_flushBatchRequests:(ICECompressBatch)compress exception:(void(^)(ICEException*))exception { - return [self begin_flushBatchRequests:exception sent:nil]; + return [self begin_flushBatchRequests:compress exception:exception sent:nil]; } --(id<ICEAsyncResult>) begin_flushBatchRequests:(void(^)(ICEException*))exception sent:(void(^)(BOOL))sent +-(id<ICEAsyncResult>) begin_flushBatchRequests:(ICECompressBatch)compress + exception:(void(^)(ICEException*))exception + sent:(void(^)(BOOL))sent { return beginCppCall(^(Ice::AsyncResultPtr& result, const Ice::CallbackPtr& cb) { - result = COMMUNICATOR->begin_flushBatchRequests(cb); + result = COMMUNICATOR->begin_flushBatchRequests((Ice::CompressBatch)compress, cb); }, ^(const Ice::AsyncResultPtr& result) { COMMUNICATOR->end_flushBatchRequests(result); diff --git a/objective-c/src/Ice/ConnectionI.mm b/objective-c/src/Ice/ConnectionI.mm index 07f6b6be8bd..f1775726ee8 100644 --- a/objective-c/src/Ice/ConnectionI.mm +++ b/objective-c/src/Ice/ConnectionI.mm @@ -326,12 +326,12 @@ private: @throw nsex; return nil; // Keep the compiler happy. } --(void) flushBatchRequests +-(void) flushBatchRequests:(ICECompressBatch)compress { NSException* nsex = nil; try { - CONNECTION->flushBatchRequests(); + CONNECTION->flushBatchRequests((Ice::CompressBatch)compress); } catch(const std::exception& ex) { @@ -342,22 +342,24 @@ private: @throw nsex; } } --(id<ICEAsyncResult>) begin_flushBatchRequests +-(id<ICEAsyncResult>) begin_flushBatchRequests:(ICECompressBatch)compress { return beginCppCall(^(Ice::AsyncResultPtr& result) { - result = CONNECTION->begin_flushBatchRequests(); + result = CONNECTION->begin_flushBatchRequests((Ice::CompressBatch)compress); }); } --(id<ICEAsyncResult>) begin_flushBatchRequests:(void(^)(ICEException*))exception +-(id<ICEAsyncResult>) begin_flushBatchRequests:(ICECompressBatch)compress exception:(void(^)(ICEException*))exception { - return [self begin_flushBatchRequests:exception sent:nil]; + return [self begin_flushBatchRequests:compress exception:exception sent:nil]; } --(id<ICEAsyncResult>) begin_flushBatchRequests:(void(^)(ICEException*))exception sent:(void(^)(BOOL))sent +-(id<ICEAsyncResult>) begin_flushBatchRequests:(ICECompressBatch)compress + exception:(void(^)(ICEException*))exception + sent:(void(^)(BOOL))sent { return beginCppCall(^(Ice::AsyncResultPtr& result, const Ice::CallbackPtr& cb) { - result = CONNECTION->begin_flushBatchRequests(cb); + result = CONNECTION->begin_flushBatchRequests((Ice::CompressBatch)compress, cb); }, ^(const Ice::AsyncResultPtr& result) { CONNECTION->end_flushBatchRequests(result); diff --git a/objective-c/test/Ice/ami/AllTests.m b/objective-c/test/Ice/ami/AllTests.m index 3f1896cb4e7..dfa478c18d1 100644 --- a/objective-c/test/Ice/ami/AllTests.m +++ b/objective-c/test/Ice/ami/AllTests.m @@ -585,12 +585,13 @@ amiAllTests(id<ICECommunicator> communicator, BOOL collocated) [b1 opBatch]; [b1 opBatch]; TestAMICallback* cb = [TestAMICallback create]; - id<ICEAsyncResult> r = [[b1 ice_getConnection] begin_flushBatchRequests:^(ICEException* ex) - { - test(NO); - } - sent:^(BOOL sentSynchronously) { [cb called]; }]; - [cb check]; + id<ICEAsyncResult> r = [[b1 ice_getConnection] begin_flushBatchRequests:ICEBasedOnProxy + exception:^(ICEException* ex) + { + test(NO); + } + sent:^(BOOL sentSynchronously) { [cb called]; }]; + [cb check]; test([r isSent]); test([r isCompleted]); test([p waitForBatch:2]); @@ -605,8 +606,8 @@ amiAllTests(id<ICECommunicator> communicator, BOOL collocated) [b1 opBatch]; [[b1 ice_getConnection] close:ICECloseGracefullyAndWait]; TestAMICallback* cb = [TestAMICallback create]; - id<ICEAsyncResult> r = [[b1 ice_getConnection] begin_flushBatchRequests: - ^(ICEException* ex) { [cb called]; } + id<ICEAsyncResult> r = [[b1 ice_getConnection] begin_flushBatchRequests:ICEBasedOnProxy + exception:^(ICEException* ex) { [cb called]; } sent:^(BOOL sentSynchronously) { test(NO); }]; [cb check]; test(![r isSent]); @@ -627,7 +628,8 @@ amiAllTests(id<ICECommunicator> communicator, BOOL collocated) [b1 opBatch]; [b1 opBatch]; TestAMICallback* cb = [TestAMICallback create]; - id<ICEAsyncResult> r = [communicator begin_flushBatchRequests:^(ICEException* ex) { test(NO); } + id<ICEAsyncResult> r = [communicator begin_flushBatchRequests:ICEBasedOnProxy + exception:^(ICEException* ex) { test(NO); } sent:^(BOOL sentSynchronously) { [cb called]; }]; [cb check]; test([r isSent]); @@ -644,7 +646,8 @@ amiAllTests(id<ICECommunicator> communicator, BOOL collocated) [b1 opBatch]; [[b1 ice_getConnection] close:ICECloseGracefullyAndWait]; TestAMICallback* cb = [TestAMICallback create]; - id<ICEAsyncResult> r = [communicator begin_flushBatchRequests:^(ICEException* ex) { test(NO); } + id<ICEAsyncResult> r = [communicator begin_flushBatchRequests:ICEBasedOnProxy + exception:^(ICEException* ex) { test(NO); } sent:^(BOOL sentSynchronously) { [cb called]; }]; [cb check]; test([r isSent]); @@ -758,7 +761,7 @@ amiAllTests(id<ICECommunicator> communicator, BOOL collocated) id<ICEConnection> con = [p ice_getConnection]; p2 = [p ice_batchOneway]; [p2 ice_ping]; - r = [con begin_flushBatchRequests]; + r = [con begin_flushBatchRequests:ICEBasedOnProxy]; test([[r getConnection] isEqual:con]); test([r getCommunicator] == communicator); test([r getProxy] == nil); // Expected @@ -769,7 +772,7 @@ amiAllTests(id<ICECommunicator> communicator, BOOL collocated) // p2 = [p ice_batchOneway]; [p2 ice_ping]; - r = [communicator begin_flushBatchRequests]; + r = [communicator begin_flushBatchRequests:ICEBasedOnProxy]; test([r getConnection] == nil); // Expected test([r getCommunicator] == communicator); test([r getProxy] == nil); // Expected diff --git a/php/src/php5/Communicator.cpp b/php/src/php5/Communicator.cpp index c22723e53d7..964a255599d 100644 --- a/php/src/php5/Communicator.cpp +++ b/php/src/php5/Communicator.cpp @@ -807,17 +807,25 @@ ZEND_METHOD(Ice_Communicator, setDefaultLocator) ZEND_METHOD(Ice_Communicator, flushBatchRequests) { - CommunicatorInfoIPtr _this = Wrapper<CommunicatorInfoIPtr>::value(getThis() TSRMLS_CC); - assert(_this); + zval* compress; + if(zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, const_cast<char*>("z"), &compress TSRMLS_CC) != SUCCESS) + { + RETURN_NULL(); + } - if(ZEND_NUM_ARGS() != 8) + if(Z_TYPE_P(compress) != IS_LONG) { - WRONG_PARAM_COUNT; + invalidArgument("value for 'compress' argument must be an enumerator of CompressBatch" TSRMLS_CC); + RETURN_NULL(); } + Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(Z_LVAL_P(compress)); + + CommunicatorInfoIPtr _this = Wrapper<CommunicatorInfoIPtr>::value(getThis() TSRMLS_CC); + assert(_this); try { - _this->getCommunicator()->flushBatchRequests(); + _this->getCommunicator()->flushBatchRequests(cb); } catch(const IceUtil::Exception& ex) { diff --git a/php/src/php5/Connection.cpp b/php/src/php5/Connection.cpp index 68827e63243..12aa1a7c2aa 100644 --- a/php/src/php5/Connection.cpp +++ b/php/src/php5/Connection.cpp @@ -128,17 +128,25 @@ ZEND_METHOD(Ice_Connection, getEndpoint) ZEND_METHOD(Ice_Connection, flushBatchRequests) { - if(ZEND_NUM_ARGS() > 0) + zval* compress; + if(zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, const_cast<char*>("z"), &compress TSRMLS_CC) != SUCCESS) { - WRONG_PARAM_COUNT; + RETURN_NULL(); + } + + if(Z_TYPE_P(compress) != IS_LONG) + { + invalidArgument("value for 'compress' argument must be an enumerator of CompressBatch" TSRMLS_CC); + RETURN_NULL(); } + Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(Z_LVAL_P(compress)); Ice::ConnectionPtr _this = Wrapper<Ice::ConnectionPtr>::value(getThis() TSRMLS_CC); assert(_this); try { - _this->flushBatchRequests(); + _this->flushBatchRequests(cb); } catch(const IceUtil::Exception& ex) { diff --git a/php/test/Ice/operations/Client.php b/php/test/Ice/operations/Client.php index 9a79908a125..08b0838674e 100644 --- a/php/test/Ice/operations/Client.php +++ b/php/test/Ice/operations/Client.php @@ -1090,6 +1090,8 @@ function twoways($communicator, $p) function allTests($communicator) { + global $NS; + $ref = "test:default -p 12010"; $base = $communicator->stringToProxy($ref); $cl = $base->ice_checkedCast("::Test::MyClass"); @@ -1102,10 +1104,17 @@ function allTests($communicator) $derived->opDerived(); echo "ok\n"; + # Test flush batch requests methods + $BasedOnProxy = $NS ? constant("Ice\\CompressBatch::BasedOnProxy") : constant("Ice_CompressBatch::BasedOnProxy"); + + $derived->ice_flushBatchRequests(); + $derived->ice_getConnection()->flushBatchRequests($BasedOnProxy); + $derived->ice_getCommunicator()->flushBatchRequests($BasedOnProxy); + return $cl; } -$communicator = $NS ? eval("return Ice\\initialize(\$argv);") : +$communicator = $NS ? eval("return Ice\\initialize(\$argv);") : eval("return Ice_initialize(\$argv);"); $myClass = allTests($communicator); diff --git a/python/modules/IcePy/Communicator.cpp b/python/modules/IcePy/Communicator.cpp index 927c3cab5d2..37a99a78641 100644 --- a/python/modules/IcePy/Communicator.cpp +++ b/python/modules/IcePy/Communicator.cpp @@ -701,13 +701,24 @@ communicatorIdentityToString(CommunicatorObject* self, PyObject* args) extern "C" #endif static PyObject* -communicatorFlushBatchRequests(CommunicatorObject* self) +communicatorFlushBatchRequests(CommunicatorObject* self, PyObject* args) { + PyObject* compressBatchType = lookupType("Ice.CompressBatch"); + PyObject* compressBatch; + if(!PyArg_ParseTuple(args, STRCAST("O!"), compressBatchType, &compressBatch)) + { + return 0; + } + + PyObjectHandle v = PyObject_GetAttrString(compressBatch, STRCAST("_value")); + assert(v.get()); + Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(PyLong_AsLong(v.get())); + assert(self->communicator); try { AllowThreads allowThreads; // Release Python's global interpreter lock to avoid a potential deadlock. - (*self->communicator)->flushBatchRequests(); + (*self->communicator)->flushBatchRequests(cb); } catch(const Ice::Exception& ex) { @@ -723,13 +734,24 @@ communicatorFlushBatchRequests(CommunicatorObject* self) extern "C" #endif static PyObject* -communicatorFlushBatchRequestsAsync(CommunicatorObject* self, PyObject* /*args*/, PyObject* /*kwds*/) +communicatorFlushBatchRequestsAsync(CommunicatorObject* self, PyObject* args, PyObject* /*kwds*/) { + PyObject* compressBatchType = lookupType("Ice.CompressBatch"); + PyObject* compressBatch; + if(!PyArg_ParseTuple(args, STRCAST("O!"), compressBatchType, &compressBatch)) + { + return 0; + } + + PyObjectHandle v = PyObject_GetAttrString(compressBatch, STRCAST("_value")); + assert(v.get()); + Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(PyLong_AsLong(v.get())); + assert(self->communicator); const string op = "flushBatchRequests"; FlushAsyncCallbackPtr d = new FlushAsyncCallback(op); - Ice::Callback_Communicator_flushBatchRequestsPtr cb = + Ice::Callback_Communicator_flushBatchRequestsPtr callback = Ice::newCallback_Communicator_flushBatchRequests(d, &FlushAsyncCallback::exception, &FlushAsyncCallback::sent); Ice::AsyncResultPtr result; @@ -738,7 +760,7 @@ communicatorFlushBatchRequestsAsync(CommunicatorObject* self, PyObject* /*args*/ { AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = (*self->communicator)->begin_flushBatchRequests(cb); + result = (*self->communicator)->begin_flushBatchRequests(cb, callback); } catch(const Ice::Exception& ex) { @@ -771,17 +793,29 @@ communicatorBeginFlushBatchRequests(CommunicatorObject* self, PyObject* args, Py static char* argNames[] = { + const_cast<char*>("compress"), const_cast<char*>("_ex"), const_cast<char*>("_sent"), 0 }; + PyObject* compressBatch; PyObject* ex = Py_None; PyObject* sent = Py_None; - if(!PyArg_ParseTupleAndKeywords(args, kwds, STRCAST("|OO"), argNames, &ex, &sent)) + if(!PyArg_ParseTupleAndKeywords(args, kwds, STRCAST("O|OO"), argNames, &compressBatch, &ex, &sent)) + { + return 0; + } + + PyObject* compressBatchType = lookupType("Ice.CompressBatch"); + if(!PyObject_IsInstance(compressBatch, reinterpret_cast<PyObject*>(compressBatchType))) { return 0; } + PyObjectHandle v = PyObject_GetAttrString(compressBatch, STRCAST("_value")); + assert(v.get()); + Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(PyLong_AsLong(v.get())); + if(ex == Py_None) { ex = 0; @@ -798,11 +832,11 @@ communicatorBeginFlushBatchRequests(CommunicatorObject* self, PyObject* args, Py return 0; } - Ice::Callback_Communicator_flushBatchRequestsPtr cb; + Ice::Callback_Communicator_flushBatchRequestsPtr callback; if(ex || sent) { FlushCallbackPtr d = new FlushCallback(ex, sent, "flushBatchRequests"); - cb = Ice::newCallback_Communicator_flushBatchRequests(d, &FlushCallback::exception, &FlushCallback::sent); + callback = Ice::newCallback_Communicator_flushBatchRequests(d, &FlushCallback::exception, &FlushCallback::sent); } Ice::AsyncResultPtr result; @@ -810,13 +844,13 @@ communicatorBeginFlushBatchRequests(CommunicatorObject* self, PyObject* args, Py { AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - if(cb) + if(callback) { - result = (*self->communicator)->begin_flushBatchRequests(cb); + result = (*self->communicator)->begin_flushBatchRequests(cb, callback); } else { - result = (*self->communicator)->begin_flushBatchRequests(); + result = (*self->communicator)->begin_flushBatchRequests(cb); } } catch(const Ice::Exception& ex) @@ -1648,13 +1682,13 @@ static PyMethodDef CommunicatorMethods[] = PyDoc_STR(STRCAST("getDefaultLocator() -> proxy")) }, { STRCAST("setDefaultLocator"), reinterpret_cast<PyCFunction>(communicatorSetDefaultLocator), METH_VARARGS, PyDoc_STR(STRCAST("setDefaultLocator(proxy) -> None")) }, - { STRCAST("flushBatchRequests"), reinterpret_cast<PyCFunction>(communicatorFlushBatchRequests), METH_NOARGS, - PyDoc_STR(STRCAST("flushBatchRequests() -> None")) }, + { STRCAST("flushBatchRequests"), reinterpret_cast<PyCFunction>(communicatorFlushBatchRequests), METH_VARARGS, + PyDoc_STR(STRCAST("flushBatchRequests(compress) -> None")) }, { STRCAST("flushBatchRequestsAsync"), reinterpret_cast<PyCFunction>(communicatorFlushBatchRequestsAsync), - METH_NOARGS, PyDoc_STR(STRCAST("flushBatchRequestsAsync() -> Ice.Future")) }, + METH_VARARGS, PyDoc_STR(STRCAST("flushBatchRequestsAsync(compress) -> Ice.Future")) }, { STRCAST("begin_flushBatchRequests"), reinterpret_cast<PyCFunction>(communicatorBeginFlushBatchRequests), METH_VARARGS | METH_KEYWORDS, - PyDoc_STR(STRCAST("begin_flushBatchRequests([_ex][, _sent]) -> Ice.AsyncResult")) }, + PyDoc_STR(STRCAST("begin_flushBatchRequests(compress[, _ex][, _sent]) -> Ice.AsyncResult")) }, { STRCAST("end_flushBatchRequests"), reinterpret_cast<PyCFunction>(communicatorEndFlushBatchRequests), METH_VARARGS, PyDoc_STR(STRCAST("end_flushBatchRequests(Ice.AsyncResult) -> None")) }, { STRCAST("createAdmin"), reinterpret_cast<PyCFunction>(communicatorCreateAdmin), METH_VARARGS, diff --git a/python/modules/IcePy/Connection.cpp b/python/modules/IcePy/Connection.cpp index 8e9e9286d02..056dc42e4ab 100644 --- a/python/modules/IcePy/Connection.cpp +++ b/python/modules/IcePy/Connection.cpp @@ -418,13 +418,24 @@ connectionGetAdapter(ConnectionObject* self) extern "C" #endif static PyObject* -connectionFlushBatchRequests(ConnectionObject* self) +connectionFlushBatchRequests(ConnectionObject* self, PyObject* args) { + PyObject* compressBatchType = lookupType("Ice.CompressBatch"); + PyObject* compressBatch; + if(!PyArg_ParseTuple(args, STRCAST("O!"), compressBatchType, &compressBatch)) + { + return 0; + } + + PyObjectHandle v = PyObject_GetAttrString(compressBatch, STRCAST("_value")); + assert(v.get()); + Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(PyLong_AsLong(v.get())); + assert(self->connection); try { AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - (*self->connection)->flushBatchRequests(); + (*self->connection)->flushBatchRequests(cb); } catch(const Ice::Exception& ex) { @@ -440,13 +451,24 @@ connectionFlushBatchRequests(ConnectionObject* self) extern "C" #endif static PyObject* -connectionFlushBatchRequestsAsync(ConnectionObject* self, PyObject* /*args*/, PyObject* /*kwds*/) +connectionFlushBatchRequestsAsync(ConnectionObject* self, PyObject* args, PyObject* /*kwds*/) { + PyObject* compressBatchType = lookupType("Ice.CompressBatch"); + PyObject* compressBatch; + if(!PyArg_ParseTuple(args, STRCAST("O!"), compressBatchType, &compressBatch)) + { + return 0; + } + + PyObjectHandle v = PyObject_GetAttrString(compressBatch, STRCAST("_value")); + assert(v.get()); + Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(PyLong_AsLong(v.get())); + assert(self->connection); const string op = "flushBatchRequests"; FlushAsyncCallbackPtr d = new FlushAsyncCallback(op); - Ice::Callback_Connection_flushBatchRequestsPtr cb = + Ice::Callback_Connection_flushBatchRequestsPtr callback = Ice::newCallback_Connection_flushBatchRequests(d, &FlushAsyncCallback::exception, &FlushAsyncCallback::sent); Ice::AsyncResultPtr result; @@ -455,7 +477,7 @@ connectionFlushBatchRequestsAsync(ConnectionObject* self, PyObject* /*args*/, Py { AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - result = (*self->connection)->begin_flushBatchRequests(cb); + result = (*self->connection)->begin_flushBatchRequests(cb, callback); } catch(const Ice::Exception& ex) { @@ -490,17 +512,29 @@ connectionBeginFlushBatchRequests(ConnectionObject* self, PyObject* args, PyObje static char* argNames[] = { + const_cast<char*>("compress"), const_cast<char*>("_ex"), const_cast<char*>("_sent"), 0 }; + PyObject* compressBatch; PyObject* ex = Py_None; PyObject* sent = Py_None; - if(!PyArg_ParseTupleAndKeywords(args, kwds, STRCAST("|OO"), argNames, &ex, &sent)) + if(!PyArg_ParseTupleAndKeywords(args, kwds, STRCAST("O|OO"), argNames, &compressBatch, &ex, &sent)) + { + return 0; + } + + PyObject* compressBatchType = lookupType("Ice.CompressBatch"); + if(!PyObject_IsInstance(compressBatch, reinterpret_cast<PyObject*>(compressBatchType))) { return 0; } + PyObjectHandle v = PyObject_GetAttrString(compressBatch, STRCAST("_value")); + assert(v.get()); + Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(PyLong_AsLong(v.get())); + if(ex == Py_None) { ex = 0; @@ -517,11 +551,11 @@ connectionBeginFlushBatchRequests(ConnectionObject* self, PyObject* args, PyObje return 0; } - Ice::Callback_Connection_flushBatchRequestsPtr cb; + Ice::Callback_Connection_flushBatchRequestsPtr callback; if(ex || sent) { FlushCallbackPtr d = new FlushCallback(ex, sent, "flushBatchRequests"); - cb = Ice::newCallback_Connection_flushBatchRequests(d, &FlushCallback::exception, &FlushCallback::sent); + callback = Ice::newCallback_Connection_flushBatchRequests(d, &FlushCallback::exception, &FlushCallback::sent); } Ice::AsyncResultPtr result; @@ -529,13 +563,13 @@ connectionBeginFlushBatchRequests(ConnectionObject* self, PyObject* args, PyObje { AllowThreads allowThreads; // Release Python's global interpreter lock during remote invocations. - if(cb) + if(callback) { - result = (*self->connection)->begin_flushBatchRequests(cb); + result = (*self->connection)->begin_flushBatchRequests(cb, callback); } else { - result = (*self->connection)->begin_flushBatchRequests(); + result = (*self->connection)->begin_flushBatchRequests(cb); } } catch(const Ice::Exception& ex) @@ -1059,19 +1093,19 @@ connectionThrowException(ConnectionObject* self) static PyMethodDef ConnectionMethods[] = { { STRCAST("close"), reinterpret_cast<PyCFunction>(connectionClose), METH_VARARGS, - PyDoc_STR(STRCAST("close(bool) -> None")) }, + PyDoc_STR(STRCAST("close(Ice.ConnectionClose) -> None")) }, { STRCAST("createProxy"), reinterpret_cast<PyCFunction>(connectionCreateProxy), METH_VARARGS, PyDoc_STR(STRCAST("createProxy(Ice.Identity) -> Ice.ObjectPrx")) }, { STRCAST("setAdapter"), reinterpret_cast<PyCFunction>(connectionSetAdapter), METH_VARARGS, PyDoc_STR(STRCAST("setAdapter(Ice.ObjectAdapter) -> None")) }, { STRCAST("getAdapter"), reinterpret_cast<PyCFunction>(connectionGetAdapter), METH_NOARGS, PyDoc_STR(STRCAST("getAdapter() -> Ice.ObjectAdapter")) }, - { STRCAST("flushBatchRequests"), reinterpret_cast<PyCFunction>(connectionFlushBatchRequests), METH_NOARGS, - PyDoc_STR(STRCAST("flushBatchRequests() -> None")) }, + { STRCAST("flushBatchRequests"), reinterpret_cast<PyCFunction>(connectionFlushBatchRequests), METH_VARARGS, + PyDoc_STR(STRCAST("flushBatchRequests(Ice.CompressBatch) -> None")) }, { STRCAST("flushBatchRequestsAsync"), reinterpret_cast<PyCFunction>(connectionFlushBatchRequestsAsync), - METH_NOARGS, PyDoc_STR(STRCAST("flushBatchRequestsAsync() -> Ice.Future")) }, + METH_VARARGS, PyDoc_STR(STRCAST("flushBatchRequestsAsync(Ice.CompressBatch) -> Ice.Future")) }, { STRCAST("begin_flushBatchRequests"), reinterpret_cast<PyCFunction>(connectionBeginFlushBatchRequests), - METH_VARARGS | METH_KEYWORDS, PyDoc_STR(STRCAST("begin_flushBatchRequests([_ex][, _sent]) -> Ice.AsyncResult")) }, + METH_VARARGS | METH_KEYWORDS, PyDoc_STR(STRCAST("begin_flushBatchRequests(Ice.CompressBatch, [_ex][, _sent]) -> Ice.AsyncResult")) }, { STRCAST("end_flushBatchRequests"), reinterpret_cast<PyCFunction>(connectionEndFlushBatchRequests), METH_VARARGS, PyDoc_STR(STRCAST("end_flushBatchRequests(Ice.AsyncResult) -> None")) }, { STRCAST("setCloseCallback"), reinterpret_cast<PyCFunction>(connectionSetCloseCallback), METH_VARARGS, diff --git a/python/python/Ice.py b/python/python/Ice.py index 80b48c639fc..9ce81306fb7 100644 --- a/python/python/Ice.py +++ b/python/python/Ice.py @@ -938,14 +938,14 @@ class CommunicatorI(Communicator): def getPluginManager(self): raise RuntimeError("operation `getPluginManager' not implemented") - def flushBatchRequests(self): - self._impl.flushBatchRequests() + def flushBatchRequests(self, compress): + self._impl.flushBatchRequests(compress) - def flushBatchRequestsAsync(self): - return self._impl.flushBatchRequestsAsync() + def flushBatchRequestsAsync(self, compress): + return self._impl.flushBatchRequestsAsync(compress) - def begin_flushBatchRequests(self, _ex=None, _sent=None): - return self._impl.begin_flushBatchRequests(_ex, _sent) + def begin_flushBatchRequests(self, compress, _ex=None, _sent=None): + return self._impl.begin_flushBatchRequests(compress, _ex, _sent) def end_flushBatchRequests(self, r): return self._impl.end_flushBatchRequests(r) diff --git a/python/test/Ice/ami/AllTests.py b/python/test/Ice/ami/AllTests.py index d29311fd9d6..78c71233218 100644 --- a/python/test/Ice/ami/AllTests.py +++ b/python/test/Ice/ami/AllTests.py @@ -805,7 +805,7 @@ def allTests(communicator, collocated): b1.opBatch() b1.opBatch() cb = FlushCallback() - r = b1.ice_getConnection().begin_flushBatchRequests(cb.exception, cb.sent) + r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent) cb.check() test(r.isSent()) test(r.isCompleted()) @@ -819,7 +819,8 @@ def allTests(communicator, collocated): b1.opBatch() b1.opBatch() cb = FlushCallback(cookie) - r = b1.ice_getConnection().begin_flushBatchRequests(lambda ex: cb.exceptionWC(ex, cookie), + r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, + lambda ex: cb.exceptionWC(ex, cookie), lambda ss: cb.sentWC(ss, cookie)) cb.check() test(p.waitForBatch(2)) @@ -832,7 +833,7 @@ def allTests(communicator, collocated): b1.opBatch() b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FlushExCallback() - r = b1.ice_getConnection().begin_flushBatchRequests(cb.exception, cb.sent) + r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent) cb.check() test(not r.isSent()) test(r.isCompleted()) @@ -846,7 +847,8 @@ def allTests(communicator, collocated): b1.opBatch() b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FlushExCallback(cookie) - r = b1.ice_getConnection().begin_flushBatchRequests(lambda ex: cb.exceptionWC(ex, cookie), + r = b1.ice_getConnection().begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, + lambda ex: cb.exceptionWC(ex, cookie), lambda ss: cb.sentWC(ss, cookie)) cb.check() test(p.opBatchCount() == 0) @@ -864,7 +866,7 @@ def allTests(communicator, collocated): b1.opBatch() b1.opBatch() cb = FlushCallback() - r = communicator.begin_flushBatchRequests(cb.exception, cb.sent) + r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent) cb.check() test(r.isSent()) test(r.isCompleted()) @@ -878,7 +880,7 @@ def allTests(communicator, collocated): b1.opBatch() b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FlushCallback() - r = communicator.begin_flushBatchRequests(cb.exception, cb.sent) + r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent) cb.check() test(r.isSent()) # Exceptions are ignored! test(r.isCompleted()) @@ -897,7 +899,7 @@ def allTests(communicator, collocated): b2.opBatch() b2.opBatch() cb = FlushCallback() - r = communicator.begin_flushBatchRequests(cb.exception, cb.sent) + r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent) cb.check() test(r.isSent()) test(r.isCompleted()) @@ -918,7 +920,7 @@ def allTests(communicator, collocated): b2.opBatch() b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FlushCallback() - r = communicator.begin_flushBatchRequests(cb.exception, cb.sent) + r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent) cb.check() test(r.isSent()) # Exceptions are ignored! test(r.isCompleted()) @@ -939,7 +941,7 @@ def allTests(communicator, collocated): b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FlushCallback() - r = communicator.begin_flushBatchRequests(cb.exception, cb.sent) + r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy, cb.exception, cb.sent) cb.check() test(r.isSent()) # Exceptions are ignored! test(r.isCompleted()) @@ -1041,7 +1043,7 @@ def allTests(communicator, collocated): con = p.ice_getConnection() p2 = p.ice_batchOneway() p2.ice_ping() - r = con.begin_flushBatchRequests() + r = con.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy) test(r.getConnection() == con) test(r.getCommunicator() == communicator) test(r.getProxy() == None) # Expected @@ -1052,7 +1054,7 @@ def allTests(communicator, collocated): # p2 = p.ice_batchOneway() p2.ice_ping() - r = communicator.begin_flushBatchRequests() + r = communicator.begin_flushBatchRequests(Ice.CompressBatch.BasedOnProxy) test(r.getConnection() == None) # Expected test(r.getCommunicator() == communicator) test(r.getProxy() == None) # Expected @@ -1191,7 +1193,7 @@ def allTests(communicator, collocated): # # This test requires two threads in the server's thread pool: one will block in sleep() and the other # will process the CloseConnection message. - # + # p.ice_ping() con = p.ice_getConnection() r = p.begin_sleep(1000) @@ -1232,7 +1234,7 @@ def allTests(communicator, collocated): # # Local case: start a lengthy operation and then close the connection forcefully on the client side. # There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException. - # + # p.ice_ping() con = p.ice_getConnection() r = p.begin_sleep(100) @@ -1520,7 +1522,7 @@ def allTestsFuture(communicator, collocated): b1.opBatch() b1.opBatch() cb = FutureFlushCallback() - f = b1.ice_getConnection().flushBatchRequestsAsync() + f = b1.ice_getConnection().flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy) f.add_sent_callback(cb.sent) cb.check() f.result() # Wait until finished. @@ -1533,7 +1535,7 @@ def allTestsFuture(communicator, collocated): b1.opBatch() b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FutureFlushExCallback() - f = b1.ice_getConnection().flushBatchRequestsAsync() + f = b1.ice_getConnection().flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy) f.add_done_callback(cb.exception) f.add_sent_callback(cb.sent) cb.check() @@ -1554,7 +1556,7 @@ def allTestsFuture(communicator, collocated): b1.opBatch() b1.opBatch() cb = FutureFlushCallback() - f = communicator.flushBatchRequestsAsync() + f = communicator.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy) f.add_sent_callback(cb.sent) cb.check() f.result() # Wait until finished. @@ -1570,7 +1572,7 @@ def allTestsFuture(communicator, collocated): b1.opBatch() b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FutureFlushCallback() - f = communicator.flushBatchRequestsAsync() + f = communicator.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy) f.add_sent_callback(cb.sent) cb.check() f.result() # Wait until finished. @@ -1591,7 +1593,7 @@ def allTestsFuture(communicator, collocated): b2.opBatch() b2.opBatch() cb = FutureFlushCallback() - f = communicator.flushBatchRequestsAsync() + f = communicator.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy) f.add_sent_callback(cb.sent) cb.check() f.result() # Wait until finished. @@ -1614,7 +1616,7 @@ def allTestsFuture(communicator, collocated): b2.opBatch() b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FutureFlushCallback() - f = communicator.flushBatchRequestsAsync() + f = communicator.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy) f.add_sent_callback(cb.sent) cb.check() f.result() # Wait until finished. @@ -1637,7 +1639,7 @@ def allTestsFuture(communicator, collocated): b1.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) b2.ice_getConnection().close(Ice.ConnectionClose.CloseGracefullyAndWait) cb = FutureFlushCallback() - f = communicator.flushBatchRequestsAsync() + f = communicator.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy) f.add_sent_callback(cb.sent) cb.check() f.result() # Wait until finished. @@ -1740,7 +1742,7 @@ def allTestsFuture(communicator, collocated): con = p.ice_getConnection() p2 = p.ice_batchOneway() p2.ice_ping() - f = con.flushBatchRequestsAsync() + f = con.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy) test(f.connection() == con) test(f.communicator() == communicator) test(f.proxy() == None) # Expected @@ -1751,7 +1753,7 @@ def allTestsFuture(communicator, collocated): # p2 = p.ice_batchOneway() p2.ice_ping() - f = communicator.flushBatchRequestsAsync() + f = communicator.flushBatchRequestsAsync(Ice.CompressBatch.BasedOnProxy) test(f.connection() == None) # Expected test(f.communicator() == communicator) test(f.proxy() == None) # Expected diff --git a/python/test/Ice/operations/BatchOneways.py b/python/test/Ice/operations/BatchOneways.py index ea0fc26c29c..3b17cd89954 100644 --- a/python/test/Ice/operations/BatchOneways.py +++ b/python/test/Ice/operations/BatchOneways.py @@ -64,6 +64,9 @@ def batchOneways(p): batch = Test.MyClassPrx.uncheckedCast(p.ice_batchOneway()) batch.ice_flushBatchRequests() # Empty flush + if batch.ice_getConnection(): + batch.ice_getConnection().flushBatchRequests(Ice.CompressBatch.BasedOnProxy) + batch.ice_getCommunicator().flushBatchRequests(Ice.CompressBatch.BasedOnProxy) p.opByteSOnewayCallCount() # Reset the call count diff --git a/ruby/src/IceRuby/Communicator.cpp b/ruby/src/IceRuby/Communicator.cpp index 232092dd9bd..cb7c24fd9a5 100644 --- a/ruby/src/IceRuby/Communicator.cpp +++ b/ruby/src/IceRuby/Communicator.cpp @@ -613,12 +613,22 @@ IceRuby_Communicator_setDefaultLocator(VALUE self, VALUE locator) extern "C" VALUE -IceRuby_Communicator_flushBatchRequests(VALUE self) +IceRuby_Communicator_flushBatchRequests(VALUE self, VALUE compress) { ICE_RUBY_TRY { Ice::CommunicatorPtr p = getCommunicator(self); - p->flushBatchRequests(); + + volatile VALUE type = callRuby(rb_path2class, "Ice::CompressBatch"); + if(callRuby(rb_obj_is_instance_of, compress, type) != Qtrue) + { + throw RubyException(rb_eTypeError, + "value for 'compress' argument must be an enumerator of Ice::CompressBatch"); + } + volatile VALUE compressValue = callRuby(rb_funcall, compress, rb_intern("to_i"), 0); + assert(TYPE(compressValue) == T_FIXNUM); + Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(FIX2LONG(compressValue)); + p->flushBatchRequests(cb); } ICE_RUBY_CATCH return Qnil; @@ -651,7 +661,7 @@ IceRuby::initCommunicator(VALUE iceModule) rb_define_method(_communicatorClass, "setDefaultRouter", CAST_METHOD(IceRuby_Communicator_setDefaultRouter), 1); rb_define_method(_communicatorClass, "getDefaultLocator", CAST_METHOD(IceRuby_Communicator_getDefaultLocator), 0); rb_define_method(_communicatorClass, "setDefaultLocator", CAST_METHOD(IceRuby_Communicator_setDefaultLocator), 1); - rb_define_method(_communicatorClass, "flushBatchRequests", CAST_METHOD(IceRuby_Communicator_flushBatchRequests), 0); + rb_define_method(_communicatorClass, "flushBatchRequests", CAST_METHOD(IceRuby_Communicator_flushBatchRequests), 1); } Ice::CommunicatorPtr diff --git a/ruby/src/IceRuby/Connection.cpp b/ruby/src/IceRuby/Connection.cpp index 2c5f65a8434..9c0a4b76ed1 100644 --- a/ruby/src/IceRuby/Connection.cpp +++ b/ruby/src/IceRuby/Connection.cpp @@ -57,7 +57,7 @@ IceRuby_Connection_close(VALUE self, VALUE mode) if(callRuby(rb_obj_is_instance_of, mode, type) != Qtrue) { throw RubyException(rb_eTypeError, - "value for 'mode' argument must be an enumerator of Ice.ConnectionClose"); + "value for 'mode' argument must be an enumerator of Ice::ConnectionClose"); } volatile VALUE modeValue = callRuby(rb_funcall, mode, rb_intern("to_i"), 0); assert(TYPE(modeValue) == T_FIXNUM); @@ -70,14 +70,23 @@ IceRuby_Connection_close(VALUE self, VALUE mode) extern "C" VALUE -IceRuby_Connection_flushBatchRequests(VALUE self) +IceRuby_Connection_flushBatchRequests(VALUE self, VALUE compress) { ICE_RUBY_TRY { Ice::ConnectionPtr* p = reinterpret_cast<Ice::ConnectionPtr*>(DATA_PTR(self)); assert(p); - (*p)->flushBatchRequests(); + volatile VALUE type = callRuby(rb_path2class, "Ice::CompressBatch"); + if(callRuby(rb_obj_is_instance_of, compress, type) != Qtrue) + { + throw RubyException(rb_eTypeError, + "value for 'compress' argument must be an enumerator of Ice::CompressBatch"); + } + volatile VALUE compressValue = callRuby(rb_funcall, compress, rb_intern("to_i"), 0); + assert(TYPE(compressValue) == T_FIXNUM); + Ice::CompressBatch cb = static_cast<Ice::CompressBatch>(FIX2LONG(compressValue)); + (*p)->flushBatchRequests(cb); } ICE_RUBY_CATCH return Qnil; @@ -416,7 +425,7 @@ IceRuby::initConnection(VALUE iceModule) // Instance methods. // rb_define_method(_connectionClass, "close", CAST_METHOD(IceRuby_Connection_close), 1); - rb_define_method(_connectionClass, "flushBatchRequests", CAST_METHOD(IceRuby_Connection_flushBatchRequests), 0); + rb_define_method(_connectionClass, "flushBatchRequests", CAST_METHOD(IceRuby_Connection_flushBatchRequests), 1); rb_define_method(_connectionClass, "heartbeat", CAST_METHOD(IceRuby_Connection_heartbeat), 0); rb_define_method(_connectionClass, "setACM", CAST_METHOD(IceRuby_Connection_setACM), 3); rb_define_method(_connectionClass, "getACM", CAST_METHOD(IceRuby_Connection_getACM), 0); diff --git a/ruby/test/Ice/operations/BatchOneways.rb b/ruby/test/Ice/operations/BatchOneways.rb index 99b2225f61b..1935c52f1e8 100644 --- a/ruby/test/Ice/operations/BatchOneways.rb +++ b/ruby/test/Ice/operations/BatchOneways.rb @@ -13,6 +13,8 @@ def batchOneways(p) batch = Test::MyClassPrx::uncheckedCast(p.ice_batchOneway()) batch.ice_flushBatchRequests() # Empty flush + batch.ice_getConnection().flushBatchRequests(Ice::CompressBatch::BasedOnProxy) + batch.ice_getCommunicator().flushBatchRequests(Ice::CompressBatch::BasedOnProxy) p.opByteSOnewayCallCount() # Reset the call count @@ -26,7 +28,7 @@ def batchOneways(p) sleep(0.01) end - batch.ice_getConnection().flushBatchRequests() + batch.ice_getConnection().flushBatchRequests(Ice::CompressBatch::BasedOnProxy) batch2 = Test::MyClassPrx::uncheckedCast(p.ice_batchOneway()) diff --git a/scripts/Expect.py b/scripts/Expect.py index 7413244075d..05d768befab 100755 --- a/scripts/Expect.py +++ b/scripts/Expect.py @@ -613,7 +613,7 @@ class Expect (object): try: self.wait(timeout) - if self.mapping == "java": + if self.mapping in ["java", "java-compat"]: if self.killed is not None: if win32: test(self.exitstatus, -self.killed) diff --git a/slice/Ice/Communicator.ice b/slice/Ice/Communicator.ice index 49821037313..95c0cbadae1 100644 --- a/slice/Ice/Communicator.ice +++ b/slice/Ice/Communicator.ice @@ -23,6 +23,7 @@ #include <Ice/Current.ice> #include <Ice/Properties.ice> #include <Ice/FacetMap.ice> +#include <Ice/Connection.ice> /** * @@ -493,8 +494,11 @@ local interface Communicator * for all connections associated with the communicator. * Any errors that occur while flushing a connection are ignored. * + * @param compress Specifies whether or not the queued batch requests + * should be compressed before being sent over the wire. + * **/ - ["async-oneway"] void flushBatchRequests(); + ["async-oneway"] void flushBatchRequests(CompressBatch compress); /** * diff --git a/slice/Ice/Connection.ice b/slice/Ice/Connection.ice index af7b03a5a38..08dd20ef498 100644 --- a/slice/Ice/Connection.ice +++ b/slice/Ice/Connection.ice @@ -24,6 +24,30 @@ module Ice { /** + * The batch compression option when flushing queued batch requests. + * + **/ +["cpp:unscoped"] +local enum CompressBatch +{ + /** + * Compress the batch requests. + **/ + Yes, + + /** + * Don't compress the batch requests. + **/ + No, + + /** + * Compress the batch requests if at least one request was + * made on a compressed proxy. + **/ + BasedOnProxy +}; + +/** * * Base class providing access to the connection details. * **/ @@ -238,8 +262,11 @@ local interface Connection * This means all batch requests invoked on fixed proxies * associated with the connection. * + * @param compress Specifies whether or not the queued batch requests + * should be compressed before being sent over the wire. + * **/ - ["async-oneway"] void flushBatchRequests(); + ["async-oneway"] void flushBatchRequests(CompressBatch compress); /** * |