summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2017-02-06 11:17:34 +0100
committerBenoit Foucher <benoit@zeroc.com>2017-02-06 11:17:34 +0100
commit18ab8207bd14def950fd399c60d9ee54fab75d3b (patch)
treea82af333127184acc6be6e0969919cb20be5e8b3 /cpp/src
parentFixed ICE-7548 - getAdminProxy no longer returns 0 if synchronization is in p... (diff)
downloadice-18ab8207bd14def950fd399c60d9ee54fab75d3b.tar.bz2
ice-18ab8207bd14def950fd399c60d9ee54fab75d3b.tar.xz
ice-18ab8207bd14def950fd399c60d9ee54fab75d3b.zip
Fixed ICE-7169 and ICE-7375 - add option to specify if batch requests flushed with the communicator/connection should be compressed
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Glacier2/RequestQueue.cpp2
-rw-r--r--cpp/src/Ice/BatchRequestQueue.cpp24
-rw-r--r--cpp/src/Ice/BatchRequestQueue.h5
-rw-r--r--cpp/src/Ice/CommunicatorI.cpp197
-rw-r--r--cpp/src/Ice/CommunicatorI.h51
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp14
-rw-r--r--cpp/src/Ice/ConnectionFactory.h4
-rw-r--r--cpp/src/Ice/ConnectionI.cpp125
-rw-r--r--cpp/src/Ice/ConnectionI.h18
-rw-r--r--cpp/src/Ice/ObjectAdapterFactory.cpp7
-rw-r--r--cpp/src/Ice/ObjectAdapterFactory.h2
-rw-r--r--cpp/src/Ice/ObjectAdapterI.cpp5
-rw-r--r--cpp/src/Ice/ObjectAdapterI.h2
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp293
-rw-r--r--cpp/src/Ice/Proxy.cpp88
-rw-r--r--cpp/src/Ice/Reference.cpp25
-rw-r--r--cpp/src/Ice/Reference.h4
-rw-r--r--cpp/src/slice2cpp/Gen.cpp7
-rw-r--r--cpp/src/slice2objc/Gen.cpp32
19 files changed, 531 insertions, 374 deletions
diff --git a/cpp/src/Glacier2/RequestQueue.cpp b/cpp/src/Glacier2/RequestQueue.cpp
index 5acc66dbfa9..423b52b4e87 100644
--- a/cpp/src/Glacier2/RequestQueue.cpp
+++ b/cpp/src/Glacier2/RequestQueue.cpp
@@ -374,7 +374,7 @@ Glacier2::RequestQueue::flush()
if(flushBatchRequests)
{
- Ice::AsyncResultPtr result = _connection->begin_flushBatchRequests(_flushCallback);
+ Ice::AsyncResultPtr result = _connection->begin_flushBatchRequests(Ice::BasedOnProxy, _flushCallback);
if(!result->sentSynchronously() && !result->isCompleted())
{
_pendingSend = true;
diff --git a/cpp/src/Ice/BatchRequestQueue.cpp b/cpp/src/Ice/BatchRequestQueue.cpp
index 5eb2758b00e..50b69a5436e 100644
--- a/cpp/src/Ice/BatchRequestQueue.cpp
+++ b/cpp/src/Ice/BatchRequestQueue.cpp
@@ -10,6 +10,7 @@
#include <Ice/BatchRequestQueue.h>
#include <Ice/Instance.h>
#include <Ice/Properties.h>
+#include <Ice/Reference.h>
using namespace std;
using namespace Ice;
@@ -34,7 +35,7 @@ public:
virtual void
enqueue() const
{
- _queue.enqueueBatchRequest();
+ _queue.enqueueBatchRequest(_proxy);
}
virtual int
@@ -70,6 +71,7 @@ BatchRequestQueue::BatchRequestQueue(const InstancePtr& instance, bool datagram)
_batchStream(instance.get(), Ice::currentProtocolEncoding),
_batchStreamInUse(false),
_batchStreamCanFlush(false),
+ _batchCompress(false),
_batchRequestNum(0)
{
_batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
@@ -101,7 +103,9 @@ BatchRequestQueue::prepareBatchRequest(OutputStream* os)
}
void
-BatchRequestQueue::finishBatchRequest(OutputStream* os, const Ice::ObjectPrxPtr& proxy, const std::string& operation)
+BatchRequestQueue::finishBatchRequest(OutputStream* os,
+ const Ice::ObjectPrxPtr& proxy,
+ const std::string& operation)
{
//
// No need for synchronization, no other threads are supposed
@@ -135,6 +139,11 @@ BatchRequestQueue::finishBatchRequest(OutputStream* os, const Ice::ObjectPrxPtr&
}
else
{
+ bool compress;
+ if(proxy->_getReference()->getCompressOverride(compress))
+ {
+ _batchCompress |= compress;
+ }
_batchMarker = _batchStream.b.size();
++_batchRequestNum;
}
@@ -170,7 +179,7 @@ BatchRequestQueue::abortBatchRequest(OutputStream* os)
}
int
-BatchRequestQueue::swap(OutputStream* os)
+BatchRequestQueue::swap(OutputStream* os, bool& compress)
{
Lock sync(*this);
if(_batchRequestNum == 0)
@@ -189,11 +198,13 @@ BatchRequestQueue::swap(OutputStream* os)
int requestNum = _batchRequestNum;
_batchStream.swap(*os);
+ compress = _batchCompress;
//
// Reset the batch.
//
_batchRequestNum = 0;
+ _batchCompress = false;
_batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
_batchMarker = _batchStream.b.size();
if(!lastRequest.empty())
@@ -231,9 +242,14 @@ BatchRequestQueue::waitStreamInUse(bool flush)
}
void
-BatchRequestQueue::enqueueBatchRequest()
+BatchRequestQueue::enqueueBatchRequest(const Ice::ObjectPrxPtr& proxy)
{
assert(_batchMarker < _batchStream.b.size());
+ bool compress;
+ if(proxy->_getReference()->getCompressOverride(compress))
+ {
+ _batchCompress |= compress;
+ }
_batchMarker = _batchStream.b.size();
++_batchRequestNum;
}
diff --git a/cpp/src/Ice/BatchRequestQueue.h b/cpp/src/Ice/BatchRequestQueue.h
index 878b2a2ef6d..9bdc2b90790 100644
--- a/cpp/src/Ice/BatchRequestQueue.h
+++ b/cpp/src/Ice/BatchRequestQueue.h
@@ -33,12 +33,12 @@ public:
void finishBatchRequest(Ice::OutputStream*, const Ice::ObjectPrxPtr&, const std::string&);
void abortBatchRequest(Ice::OutputStream*);
- int swap(Ice::OutputStream*);
+ int swap(Ice::OutputStream*, bool&);
void destroy(const Ice::LocalException&);
bool isEmpty();
- void enqueueBatchRequest();
+ void enqueueBatchRequest(const Ice::ObjectPrxPtr&);
private:
@@ -52,6 +52,7 @@ private:
Ice::OutputStream _batchStream;
bool _batchStreamInUse;
bool _batchStreamCanFlush;
+ bool _batchCompress;
int _batchRequestNum;
size_t _batchMarker;
IceInternal::UniquePtr<Ice::LocalException> _exception;
diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp
index c47a3299c19..9b9e00ce239 100644
--- a/cpp/src/Ice/CommunicatorI.cpp
+++ b/cpp/src/Ice/CommunicatorI.cpp
@@ -29,6 +29,168 @@ using namespace std;
using namespace Ice;
using namespace IceInternal;
+#ifndef ICE_CPP11_MAPPING
+IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; }
+#endif
+
+CommunicatorFlushBatchAsync::~CommunicatorFlushBatchAsync()
+{
+ // Out of line to avoid weak vtable
+}
+
+CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const InstancePtr& instance) :
+ OutgoingAsyncBase(instance)
+{
+ //
+ // _useCount is initialized to 1 to prevent premature callbacks.
+ // The caller must invoke ready() after all flush requests have
+ // been initiated.
+ //
+ _useCount = 1;
+}
+
+void
+CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con, Ice::CompressBatch compressBatch)
+{
+ class FlushBatch : public OutgoingAsyncBase
+ {
+ public:
+
+ FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync,
+ const InstancePtr& instance,
+ InvocationObserver& observer) :
+ OutgoingAsyncBase(instance), _outAsync(outAsync), _observer(observer)
+ {
+ }
+
+ virtual bool
+ sent()
+ {
+ _childObserver.detach();
+ _outAsync->check(false);
+ return false;
+ }
+
+ virtual bool
+ exception(const Exception& ex)
+ {
+ _childObserver.failed(ex.ice_id());
+ _childObserver.detach();
+ _outAsync->check(false);
+ return false;
+ }
+
+ virtual InvocationObserver&
+ getObserver()
+ {
+ return _observer;
+ }
+
+ virtual bool handleSent(bool, bool)
+ {
+ return false;
+ }
+
+ virtual bool handleException(const Ice::Exception&)
+ {
+ return false;
+ }
+
+ virtual bool handleResponse(bool)
+ {
+ return false;
+ }
+
+ virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const
+ {
+ assert(false);
+ }
+
+ virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const
+ {
+ assert(false);
+ }
+
+ virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const
+ {
+ assert(false);
+ }
+
+ private:
+
+ const CommunicatorFlushBatchAsyncPtr _outAsync;
+ InvocationObserver& _observer;
+ };
+
+ {
+ Lock sync(_m);
+ ++_useCount;
+ }
+
+ try
+ {
+ OutgoingAsyncBasePtr flushBatch = ICE_MAKE_SHARED(FlushBatch, ICE_SHARED_FROM_THIS, _instance, _observer);
+ bool compress;
+ int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs(), compress);
+ if(batchRequestNum == 0)
+ {
+ flushBatch->sent();
+ }
+ else
+ {
+ if(compressBatch == Ice::Yes)
+ {
+ compress = true;
+ }
+ else if(compressBatch == Ice::No)
+ {
+ compress = false;
+ }
+ con->sendAsyncRequest(flushBatch, compress, false, batchRequestNum);
+ }
+ }
+ catch(const LocalException&)
+ {
+ check(false);
+ throw;
+ }
+}
+
+void
+CommunicatorFlushBatchAsync::invoke(const string& operation, CompressBatch compressBatch)
+{
+ _observer.attach(_instance.get(), operation);
+ _instance->outgoingConnectionFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS, compressBatch);
+ _instance->objectAdapterFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS, compressBatch);
+ check(true);
+}
+
+void
+CommunicatorFlushBatchAsync::check(bool userThread)
+{
+ {
+ Lock sync(_m);
+ assert(_useCount > 0);
+ if(--_useCount > 0)
+ {
+ return;
+ }
+ }
+
+ if(sentImpl(true))
+ {
+ if(userThread)
+ {
+ _sentSynchronously = true;
+ invokeSent();
+ }
+ else
+ {
+ invokeSentAsync();
+ }
+ }
+}
+
void
Ice::CommunicatorI::destroy() ICE_NOEXCEPT
{
@@ -210,13 +372,15 @@ const ::std::string flushBatchRequests_name = "flushBatchRequests";
#ifdef ICE_CPP11_MAPPING
void
-Ice::CommunicatorI::flushBatchRequests()
+Ice::CommunicatorI::flushBatchRequests(CompressBatch compress)
{
- Communicator::flushBatchRequestsAsync().get();
+ Communicator::flushBatchRequestsAsync(compress).get();
}
::std::function<void()>
-Ice::CommunicatorI::flushBatchRequestsAsync(function<void(exception_ptr)> ex, function<void(bool)> sent)
+Ice::CommunicatorI::flushBatchRequestsAsync(CompressBatch compress,
+ function<void(exception_ptr)> ex,
+ function<void(bool)> sent)
{
class CommunicatorFlushBatchLambda : public CommunicatorFlushBatchAsync, public LambdaInvoke
{
@@ -230,39 +394,44 @@ Ice::CommunicatorI::flushBatchRequestsAsync(function<void(exception_ptr)> ex, fu
}
};
auto outAsync = make_shared<CommunicatorFlushBatchLambda>(_instance, ex, sent);
- outAsync->invoke(flushBatchRequests_name);
+ outAsync->invoke(flushBatchRequests_name, compress);
return [outAsync]() { outAsync->cancel(); };
}
#else
void
-Ice::CommunicatorI::flushBatchRequests()
+Ice::CommunicatorI::flushBatchRequests(CompressBatch compress)
{
- end_flushBatchRequests(begin_flushBatchRequests());
+ end_flushBatchRequests(begin_flushBatchRequests(compress));
}
AsyncResultPtr
-Ice::CommunicatorI::begin_flushBatchRequests()
+Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress)
{
- return _iceI_begin_flushBatchRequests(::IceInternal::dummyCallback, 0);
+ return _iceI_begin_flushBatchRequests(compress, ::IceInternal::dummyCallback, 0);
}
AsyncResultPtr
-Ice::CommunicatorI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie)
+Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress,
+ const CallbackPtr& cb,
+ const LocalObjectPtr& cookie)
{
- return _iceI_begin_flushBatchRequests(cb, cookie);
+ return _iceI_begin_flushBatchRequests(compress, cb, cookie);
}
AsyncResultPtr
-Ice::CommunicatorI::begin_flushBatchRequests(const Callback_Communicator_flushBatchRequestsPtr& cb,
+Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress,
+ const Callback_Communicator_flushBatchRequestsPtr& cb,
const LocalObjectPtr& cookie)
{
- return _iceI_begin_flushBatchRequests(cb, cookie);
+ return _iceI_begin_flushBatchRequests(compress, cb, cookie);
}
AsyncResultPtr
-Ice::CommunicatorI::_iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr& cb, const LocalObjectPtr& cookie)
+Ice::CommunicatorI::_iceI_begin_flushBatchRequests(CompressBatch compress,
+ const IceInternal::CallbackBasePtr& cb,
+ const LocalObjectPtr& cookie)
{
class CommunicatorFlushBatchAsyncWithCallback : public CommunicatorFlushBatchAsync, public CallbackCompletion
{
@@ -294,7 +463,7 @@ Ice::CommunicatorI::_iceI_begin_flushBatchRequests(const IceInternal::CallbackBa
};
CommunicatorFlushBatchAsyncPtr result = new CommunicatorFlushBatchAsyncWithCallback(this, _instance, cb, cookie);
- result->invoke(flushBatchRequests_name);
+ result->invoke(flushBatchRequests_name, compress);
return result;
}
diff --git a/cpp/src/Ice/CommunicatorI.h b/cpp/src/Ice/CommunicatorI.h
index c1ef6279068..5c9035261f8 100644
--- a/cpp/src/Ice/CommunicatorI.h
+++ b/cpp/src/Ice/CommunicatorI.h
@@ -16,6 +16,41 @@
#include <Ice/Initialize.h>
#include <Ice/Communicator.h>
#include <Ice/CommunicatorAsync.h>
+#include <Ice/OutgoingAsync.h>
+
+namespace IceInternal
+{
+
+//
+// Class for handling Ice::Communicator::begin_flushBatchRequests
+//
+class CommunicatorFlushBatchAsync : public OutgoingAsyncBase
+{
+public:
+
+ virtual ~CommunicatorFlushBatchAsync();
+
+ CommunicatorFlushBatchAsync(const InstancePtr&);
+
+ void flushConnection(const Ice::ConnectionIPtr&, Ice::CompressBatch);
+ void invoke(const std::string&, Ice::CompressBatch);
+
+#ifdef ICE_CPP11_MAPPING
+ std::shared_ptr<CommunicatorFlushBatchAsync> shared_from_this()
+ {
+ return std::static_pointer_cast<CommunicatorFlushBatchAsync>(OutgoingAsyncBase::shared_from_this());
+ }
+#endif
+
+private:
+
+ void check(bool);
+
+ int _useCount;
+ InvocationObserver _observer;
+};
+
+}
namespace Ice
{
@@ -68,16 +103,18 @@ public:
virtual ValueFactoryManagerPtr getValueFactoryManager() const;
- virtual void flushBatchRequests();
+ virtual void flushBatchRequests(CompressBatch);
#ifdef ICE_CPP11_MAPPING
virtual ::std::function<void()>
- flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)>,
+ flushBatchRequestsAsync(CompressBatch,
+ ::std::function<void(::std::exception_ptr)>,
::std::function<void(bool)> = nullptr);
#else
- virtual AsyncResultPtr begin_flushBatchRequests();
- virtual AsyncResultPtr begin_flushBatchRequests(const CallbackPtr&, const LocalObjectPtr& = 0);
- virtual AsyncResultPtr begin_flushBatchRequests(const Callback_Communicator_flushBatchRequestsPtr&,
+ virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch);
+ virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch, const CallbackPtr&, const LocalObjectPtr& = 0);
+ virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch,
+ const Callback_Communicator_flushBatchRequestsPtr&,
const LocalObjectPtr& = 0);
virtual void end_flushBatchRequests(const AsyncResultPtr&);
@@ -113,7 +150,9 @@ private:
friend ICE_API ::IceUtil::TimerPtr IceInternal::getInstanceTimer(const ::Ice::CommunicatorPtr&);
#ifndef ICE_CPP11_MAPPING
- AsyncResultPtr _iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
+ AsyncResultPtr _iceI_begin_flushBatchRequests(CompressBatch,
+ const IceInternal::CallbackBasePtr&,
+ const LocalObjectPtr&);
#endif
const ::IceInternal::InstancePtr _instance;
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 755ab969003..0234c1c33dd 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -25,6 +25,7 @@
#include <Ice/LocalException.h>
#include <Ice/Functional.h>
#include <Ice/OutgoingAsync.h>
+#include <Ice/CommunicatorI.h>
#include <IceUtil/Random.h>
#include <iterator>
@@ -223,7 +224,8 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
}
void
-IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore,
+IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts,
+ bool hasMore,
Ice::EndpointSelectionType selType,
const CreateConnectionCallbackPtr& callback)
{
@@ -335,7 +337,8 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad
}
void
-IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync)
+IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync,
+ Ice::CompressBatch compress)
{
list<ConnectionIPtr> c;
@@ -355,7 +358,7 @@ IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const Communicat
{
try
{
- outAsync->flushConnection(*p);
+ outAsync->flushConnection(*p, compress);
}
catch(const LocalException&)
{
@@ -1276,7 +1279,8 @@ IceInternal::IncomingConnectionFactory::connections() const
}
void
-IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync)
+IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync,
+ Ice::CompressBatch compress)
{
list<ConnectionIPtr> c = connections(); // connections() is synchronized, so no need to synchronize here.
@@ -1284,7 +1288,7 @@ IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const Communicat
{
try
{
- outAsync->flushConnection(*p);
+ outAsync->flushConnection(*p, compress);
}
catch(const LocalException&)
{
diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h
index 69331e04f77..b8058ab593e 100644
--- a/cpp/src/Ice/ConnectionFactory.h
+++ b/cpp/src/Ice/ConnectionFactory.h
@@ -66,7 +66,7 @@ public:
void create(const std::vector<EndpointIPtr>&, bool, Ice::EndpointSelectionType, const CreateConnectionCallbackPtr&);
void setRouterInfo(const RouterInfoPtr&);
void removeAdapter(const Ice::ObjectAdapterPtr&);
- void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&);
+ void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&, Ice::CompressBatch);
OutgoingConnectionFactory(const Ice::CommunicatorPtr&, const InstancePtr&);
virtual ~OutgoingConnectionFactory();
@@ -191,7 +191,7 @@ public:
EndpointIPtr endpoint() const;
std::list<Ice::ConnectionIPtr> connections() const;
- void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&);
+ void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&, Ice::CompressBatch);
//
// Operations from EventHandler
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 23388d399c6..598f4c983d3 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -133,6 +133,25 @@ private:
const bool _close;
};
+//
+// Class for handling Ice::Connection::begin_flushBatchRequests
+//
+class ConnectionFlushBatchAsync : public OutgoingAsyncBase
+{
+public:
+
+ ConnectionFlushBatchAsync(const Ice::ConnectionIPtr&, const InstancePtr&);
+
+ virtual Ice::ConnectionPtr getConnection() const;
+
+ void invoke(const std::string&, Ice::CompressBatch);
+
+private:
+
+ const Ice::ConnectionIPtr _connection;
+};
+typedef IceUtil::Handle<ConnectionFlushBatchAsync> ConnectionFlushBatchAsyncPtr;
+
ConnectionState connectionStateMap[] = {
ConnectionStateValidating, // StateNotInitialized
ConnectionStateValidating, // StateNotValidated
@@ -146,6 +165,72 @@ ConnectionState connectionStateMap[] = {
}
+ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, const InstancePtr& instance) :
+ OutgoingAsyncBase(instance), _connection(connection)
+{
+}
+
+ConnectionPtr
+ConnectionFlushBatchAsync::getConnection() const
+{
+ return _connection;
+}
+
+void
+ConnectionFlushBatchAsync::invoke(const string& operation, Ice::CompressBatch compressBatch)
+{
+ _observer.attach(_instance.get(), operation);
+ try
+ {
+ AsyncStatus status;
+ bool compress;
+ int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os, compress);
+ if(batchRequestNum == 0)
+ {
+ status = AsyncStatusSent;
+ if(sent())
+ {
+ status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
+ }
+ }
+ else
+ {
+ if(compressBatch == Ice::Yes)
+ {
+ compress = true;
+ }
+ else if(compressBatch == Ice::No)
+ {
+ compress = false;
+ }
+ status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, batchRequestNum);
+ }
+
+ if(status & AsyncStatusSent)
+ {
+ _sentSynchronously = true;
+ if(status & AsyncStatusInvokeSentCallback)
+ {
+ invokeSent();
+ }
+ }
+ }
+ catch(const RetryException& ex)
+ {
+ if(exception(*ex.get()))
+ {
+ invokeExceptionAsync();
+ }
+ }
+ catch(const Exception& ex)
+ {
+ if(exception(ex))
+ {
+ invokeExceptionAsync();
+ }
+ }
+}
+
Ice::ConnectionI::Observer::Observer() : _readStreamPos(0), _writeStreamPos(0)
{
}
@@ -698,13 +783,14 @@ Ice::ConnectionI::getBatchRequestQueue() const
#ifdef ICE_CPP11_MAPPING
void
-Ice::ConnectionI::flushBatchRequests()
+Ice::ConnectionI::flushBatchRequests(CompressBatch compress)
{
- Connection::flushBatchRequestsAsync().get();
+ Connection::flushBatchRequestsAsync(compress).get();
}
std::function<void()>
-Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)> ex,
+Ice::ConnectionI::flushBatchRequestsAsync(CompressBatch compress,
+ ::std::function<void(::std::exception_ptr)> ex,
::std::function<void(bool)> sent)
{
class ConnectionFlushBatchLambda : public ConnectionFlushBatchAsync, public LambdaInvoke
@@ -720,37 +806,42 @@ Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_
}
};
auto outAsync = make_shared<ConnectionFlushBatchLambda>(ICE_SHARED_FROM_THIS, _instance, ex, sent);
- outAsync->invoke(flushBatchRequests_name);
+ outAsync->invoke(flushBatchRequests_name, compress);
return [outAsync]() { outAsync->cancel(); };
}
#else
void
-Ice::ConnectionI::flushBatchRequests()
+Ice::ConnectionI::flushBatchRequests(CompressBatch compress)
{
- end_flushBatchRequests(begin_flushBatchRequests());
+ end_flushBatchRequests(begin_flushBatchRequests(compress));
}
AsyncResultPtr
-Ice::ConnectionI::begin_flushBatchRequests()
+Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress)
{
- return _iceI_begin_flushBatchRequests(dummyCallback, 0);
+ return _iceI_begin_flushBatchRequests(compress, dummyCallback, 0);
}
AsyncResultPtr
-Ice::ConnectionI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie)
+Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress,
+ const CallbackPtr& cb,
+ const LocalObjectPtr& cookie)
{
- return _iceI_begin_flushBatchRequests(cb, cookie);
+ return _iceI_begin_flushBatchRequests(compress, cb, cookie);
}
AsyncResultPtr
-Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchRequestsPtr& cb,
+Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress,
+ const Callback_Connection_flushBatchRequestsPtr& cb,
const LocalObjectPtr& cookie)
{
- return _iceI_begin_flushBatchRequests(cb, cookie);
+ return _iceI_begin_flushBatchRequests(compress, cb, cookie);
}
AsyncResultPtr
-Ice::ConnectionI::_iceI_begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
+Ice::ConnectionI::_iceI_begin_flushBatchRequests(CompressBatch compress,
+ const CallbackBasePtr& cb,
+ const LocalObjectPtr& cookie)
{
class ConnectionFlushBatchAsyncWithCallback : public ConnectionFlushBatchAsync, public CallbackCompletion
{
@@ -791,8 +882,12 @@ Ice::ConnectionI::_iceI_begin_flushBatchRequests(const CallbackBasePtr& cb, cons
Ice::ConnectionPtr _connection;
};
- ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this, _communicator, _instance, cb, cookie);
- result->invoke(flushBatchRequests_name);
+ ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this,
+ _communicator,
+ _instance,
+ cb,
+ cookie);
+ result->invoke(flushBatchRequests_name, compress);
return result;
}
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 7fb3781e880..bd6398362df 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -175,16 +175,18 @@ public:
IceInternal::BatchRequestQueuePtr getBatchRequestQueue() const;
- virtual void flushBatchRequests();
+ virtual void flushBatchRequests(CompressBatch);
#ifdef ICE_CPP11_MAPPING
virtual std::function<void()>
- flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)>,
- ::std::function<void(bool)> = nullptr);
+ flushBatchRequestsAsync(CompressBatch,
+ ::std::function<void(::std::exception_ptr)>,
+ ::std::function<void(bool)> = nullptr);
#else
- virtual AsyncResultPtr begin_flushBatchRequests();
- virtual AsyncResultPtr begin_flushBatchRequests(const CallbackPtr&, const LocalObjectPtr& = 0);
- virtual AsyncResultPtr begin_flushBatchRequests(const Callback_Connection_flushBatchRequestsPtr&,
+ virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch);
+ virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch, const CallbackPtr&, const LocalObjectPtr& = 0);
+ virtual AsyncResultPtr begin_flushBatchRequests(CompressBatch,
+ const Callback_Connection_flushBatchRequestsPtr&,
const LocalObjectPtr& = 0);
virtual void end_flushBatchRequests(const AsyncResultPtr&);
@@ -320,7 +322,9 @@ private:
void reap();
#ifndef ICE_CPP11_MAPPING
- AsyncResultPtr _iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
+ AsyncResultPtr _iceI_begin_flushBatchRequests(CompressBatch,
+ const IceInternal::CallbackBasePtr&,
+ const LocalObjectPtr&);
AsyncResultPtr _iceI_begin_heartbeat(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
#endif
diff --git a/cpp/src/Ice/ObjectAdapterFactory.cpp b/cpp/src/Ice/ObjectAdapterFactory.cpp
index 47398bc8720..2e9dd66ff0c 100644
--- a/cpp/src/Ice/ObjectAdapterFactory.cpp
+++ b/cpp/src/Ice/ObjectAdapterFactory.cpp
@@ -134,7 +134,7 @@ IceInternal::ObjectAdapterFactory::updateObservers(void (ObjectAdapterI::*fn)())
adapters = _adapters;
}
#ifdef ICE_CPP11_MAPPING
- for_each(adapters.begin(), adapters.end(),
+ for_each(adapters.begin(), adapters.end(),
[fn](const ObjectAdapterIPtr& adapter)
{
(adapter.get() ->* fn)();
@@ -231,7 +231,8 @@ IceInternal::ObjectAdapterFactory::removeObjectAdapter(const ObjectAdapterPtr& a
}
void
-IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync) const
+IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync,
+ CompressBatch compressBatch) const
{
list<ObjectAdapterIPtr> adapters;
{
@@ -242,7 +243,7 @@ IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorFlu
for(list<ObjectAdapterIPtr>::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
{
- (*p)->flushAsyncBatchRequests(outAsync);
+ (*p)->flushAsyncBatchRequests(outAsync, compressBatch);
}
}
diff --git a/cpp/src/Ice/ObjectAdapterFactory.h b/cpp/src/Ice/ObjectAdapterFactory.h
index 93a6aede7df..523ec46b304 100644
--- a/cpp/src/Ice/ObjectAdapterFactory.h
+++ b/cpp/src/Ice/ObjectAdapterFactory.h
@@ -38,7 +38,7 @@ public:
::Ice::ObjectAdapterPtr createObjectAdapter(const std::string&, const Ice::RouterPrxPtr&);
::Ice::ObjectAdapterPtr findObjectAdapter(const ::Ice::ObjectPrxPtr&);
void removeObjectAdapter(const ::Ice::ObjectAdapterPtr&);
- void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&) const;
+ void flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr&, ::Ice::CompressBatch) const;
ObjectAdapterFactory(const InstancePtr&, const ::Ice::CommunicatorPtr&);
virtual ~ObjectAdapterFactory();
diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp
index ac60baa4a0c..e7c6e529095 100644
--- a/cpp/src/Ice/ObjectAdapterI.cpp
+++ b/cpp/src/Ice/ObjectAdapterI.cpp
@@ -9,6 +9,7 @@
#include <Ice/UUID.h>
#include <Ice/ObjectAdapterI.h>
+#include <Ice/CommunicatorI.h>
#include <Ice/ObjectAdapterFactory.h>
#include <Ice/Instance.h>
#include <Ice/Proxy.h>
@@ -786,7 +787,7 @@ Ice::ObjectAdapterI::isLocal(const ObjectPrxPtr& proxy) const
}
void
-Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync)
+Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync, CompressBatch compress)
{
vector<IncomingConnectionFactoryPtr> f;
{
@@ -796,7 +797,7 @@ Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPt
for(vector<IncomingConnectionFactoryPtr>::const_iterator p = f.begin(); p != f.end(); ++p)
{
- (*p)->flushAsyncBatchRequests(outAsync);
+ (*p)->flushAsyncBatchRequests(outAsync, compress);
}
}
diff --git a/cpp/src/Ice/ObjectAdapterI.h b/cpp/src/Ice/ObjectAdapterI.h
index 855d836f471..d86ce43235c 100644
--- a/cpp/src/Ice/ObjectAdapterI.h
+++ b/cpp/src/Ice/ObjectAdapterI.h
@@ -92,7 +92,7 @@ public:
bool isLocal(const ObjectPrxPtr&) const;
- void flushAsyncBatchRequests(const IceInternal::CommunicatorFlushBatchAsyncPtr&);
+ void flushAsyncBatchRequests(const IceInternal::CommunicatorFlushBatchAsyncPtr&, CompressBatch);
void updateConnectionObservers();
void updateThreadObservers();
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index bd82a31e9a5..15a2d819260 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -29,7 +29,6 @@ using namespace IceInternal;
IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; }
IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; }
IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; }
#endif
const unsigned char OutgoingAsyncBase::OK = 0x1;
@@ -1181,298 +1180,6 @@ OutgoingAsync::throwUserException()
#endif
-ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy) : ProxyOutgoingAsyncBase(proxy)
-{
-}
-
-AsyncStatus
-ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool)
-{
- if(_batchRequestNum == 0)
- {
- if(sent())
- {
- return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
- }
- else
- {
- return AsyncStatusSent;
- }
- }
- _cachedConnection = connection;
- return connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, _batchRequestNum);
-}
-
-AsyncStatus
-ProxyFlushBatchAsync::invokeCollocated(CollocatedRequestHandler* handler)
-{
- if(_batchRequestNum == 0)
- {
- if(sent())
- {
- return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
- }
- else
- {
- return AsyncStatusSent;
- }
- }
- return handler->invokeAsyncRequest(this, _batchRequestNum, false);
-}
-
-void
-ProxyFlushBatchAsync::invoke(const string& operation)
-{
- checkSupportedProtocol(getCompatibleProtocol(_proxy->_getReference()->getProtocol()));
- _observer.attach(_proxy, operation, ::Ice::noExplicitContext);
- _batchRequestNum = _proxy->_getBatchRequestQueue()->swap(&_os);
- invokeImpl(true); // userThread = true
-}
-
-ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx) : ProxyOutgoingAsyncBase(prx)
-{
-}
-
-AsyncStatus
-ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool)
-{
- _cachedConnection = connection;
- if(responseImpl(true))
- {
- invokeResponseAsync();
- }
- return AsyncStatusSent;
-}
-
-AsyncStatus
-ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*)
-{
- if(responseImpl(true))
- {
- invokeResponseAsync();
- }
- return AsyncStatusSent;
-}
-
-Ice::ConnectionPtr
-ProxyGetConnection::getConnection() const
-{
- return _cachedConnection;
-}
-
-void
-ProxyGetConnection::invoke(const string& operation)
-{
- _observer.attach(_proxy, operation, ::Ice::noExplicitContext);
- invokeImpl(true); // userThread = true
-}
-
-ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, const InstancePtr& instance) :
- OutgoingAsyncBase(instance), _connection(connection)
-{
-}
-
-ConnectionPtr
-ConnectionFlushBatchAsync::getConnection() const
-{
- return _connection;
-}
-
-void
-ConnectionFlushBatchAsync::invoke(const string& operation)
-{
- _observer.attach(_instance.get(), operation);
- try
- {
- AsyncStatus status;
- int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os);
- if(batchRequestNum == 0)
- {
- status = AsyncStatusSent;
- if(sent())
- {
- status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
- }
- }
- else
- {
- status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, false, false, batchRequestNum);
- }
-
- if(status & AsyncStatusSent)
- {
- _sentSynchronously = true;
- if(status & AsyncStatusInvokeSentCallback)
- {
- invokeSent();
- }
- }
- }
- catch(const RetryException& ex)
- {
- if(exception(*ex.get()))
- {
- invokeExceptionAsync();
- }
- }
- catch(const Exception& ex)
- {
- if(exception(ex))
- {
- invokeExceptionAsync();
- }
- }
-}
-
-CommunicatorFlushBatchAsync::~CommunicatorFlushBatchAsync()
-{
- // Out of line to avoid weak vtable
-}
-
-CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const InstancePtr& instance) :
- OutgoingAsyncBase(instance)
-{
- //
- // _useCount is initialized to 1 to prevent premature callbacks.
- // The caller must invoke ready() after all flush requests have
- // been initiated.
- //
- _useCount = 1;
-}
-
-void
-CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con)
-{
- class FlushBatch : public OutgoingAsyncBase
- {
- public:
-
- FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync,
- const InstancePtr& instance,
- InvocationObserver& observer) :
- OutgoingAsyncBase(instance), _outAsync(outAsync), _observer(observer)
- {
- }
-
- virtual bool
- sent()
- {
- _childObserver.detach();
- _outAsync->check(false);
- return false;
- }
-
- virtual bool
- exception(const Exception& ex)
- {
- _childObserver.failed(ex.ice_id());
- _childObserver.detach();
- _outAsync->check(false);
- return false;
- }
-
- virtual InvocationObserver&
- getObserver()
- {
- return _observer;
- }
-
- virtual bool handleSent(bool, bool)
- {
- return false;
- }
-
- virtual bool handleException(const Ice::Exception&)
- {
- return false;
- }
-
- virtual bool handleResponse(bool)
- {
- return false;
- }
-
- virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const
- {
- assert(false);
- }
-
- virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const
- {
- assert(false);
- }
-
- virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const
- {
- assert(false);
- }
-
- private:
-
- const CommunicatorFlushBatchAsyncPtr _outAsync;
- InvocationObserver& _observer;
- };
-
- {
- Lock sync(_m);
- ++_useCount;
- }
-
- try
- {
- OutgoingAsyncBasePtr flushBatch = ICE_MAKE_SHARED(FlushBatch, ICE_SHARED_FROM_THIS, _instance, _observer);
- int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs());
- if(batchRequestNum == 0)
- {
- flushBatch->sent();
- }
- else
- {
- con->sendAsyncRequest(flushBatch, false, false, batchRequestNum);
- }
- }
- catch(const LocalException&)
- {
- check(false);
- throw;
- }
-}
-
-void
-CommunicatorFlushBatchAsync::invoke(const string& operation)
-{
- _observer.attach(_instance.get(), operation);
- _instance->outgoingConnectionFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS);
- _instance->objectAdapterFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS);
- check(true);
-}
-
-void
-CommunicatorFlushBatchAsync::check(bool userThread)
-{
- {
- Lock sync(_m);
- assert(_useCount > 0);
- if(--_useCount > 0)
- {
- return;
- }
- }
-
- if(sentImpl(true))
- {
- if(userThread)
- {
- _sentSynchronously = true;
- invokeSent();
- }
- else
- {
- invokeSentAsync();
- }
- }
-}
-
#ifdef ICE_CPP11_MAPPING
bool
diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp
index 4c36e74e369..2535c05c1e4 100644
--- a/cpp/src/Ice/Proxy.cpp
+++ b/cpp/src/Ice/Proxy.cpp
@@ -14,6 +14,7 @@
#include <Ice/ObjectAdapterFactory.h>
#include <Ice/OutgoingAsync.h>
#include <Ice/Reference.h>
+#include <Ice/CollocatedRequestHandler.h>
#include <Ice/EndpointI.h>
#include <Ice/Instance.h>
#include <Ice/RouterInfo.h>
@@ -48,6 +49,93 @@ const string ice_flushBatchRequests_name = "ice_flushBatchRequests";
}
+ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy) : ProxyOutgoingAsyncBase(proxy)
+{
+}
+
+AsyncStatus
+ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool)
+{
+ if(_batchRequestNum == 0)
+ {
+ if(sent())
+ {
+ return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
+ }
+ else
+ {
+ return AsyncStatusSent;
+ }
+ }
+ _cachedConnection = connection;
+ return connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, _batchRequestNum);
+}
+
+AsyncStatus
+ProxyFlushBatchAsync::invokeCollocated(CollocatedRequestHandler* handler)
+{
+ if(_batchRequestNum == 0)
+ {
+ if(sent())
+ {
+ return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback);
+ }
+ else
+ {
+ return AsyncStatusSent;
+ }
+ }
+ return handler->invokeAsyncRequest(this, _batchRequestNum, false);
+}
+
+void
+ProxyFlushBatchAsync::invoke(const string& operation)
+{
+ checkSupportedProtocol(getCompatibleProtocol(_proxy->_getReference()->getProtocol()));
+ _observer.attach(_proxy, operation, ::Ice::noExplicitContext);
+ bool compress; // Ignore for proxy flushBatchRequests
+ _batchRequestNum = _proxy->_getBatchRequestQueue()->swap(&_os, compress);
+ invokeImpl(true); // userThread = true
+}
+
+ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx) : ProxyOutgoingAsyncBase(prx)
+{
+}
+
+AsyncStatus
+ProxyGetConnection::invokeRemote(const ConnectionIPtr& connection, bool, bool)
+{
+ _cachedConnection = connection;
+ if(responseImpl(true))
+ {
+ invokeResponseAsync();
+ }
+ return AsyncStatusSent;
+}
+
+AsyncStatus
+ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*)
+{
+ if(responseImpl(true))
+ {
+ invokeResponseAsync();
+ }
+ return AsyncStatusSent;
+}
+
+Ice::ConnectionPtr
+ProxyGetConnection::getConnection() const
+{
+ return _cachedConnection;
+}
+
+void
+ProxyGetConnection::invoke(const string& operation)
+{
+ _observer.attach(_proxy, operation, ::Ice::noExplicitContext);
+ invokeImpl(true); // userThread = true
+}
+
#ifdef ICE_CPP11_MAPPING // C++11 mapping
namespace Ice
diff --git a/cpp/src/Ice/Reference.cpp b/cpp/src/Ice/Reference.cpp
index 188ad33e5ee..31cd7631479 100644
--- a/cpp/src/Ice/Reference.cpp
+++ b/cpp/src/Ice/Reference.cpp
@@ -174,6 +174,25 @@ IceInternal::Reference::changeCompress(bool newCompress) const
return r;
}
+bool
+IceInternal::Reference::getCompressOverride(bool& compress) const
+{
+ DefaultsAndOverridesPtr defaultsAndOverrides = getInstance()->defaultsAndOverrides();
+ if(defaultsAndOverrides->overrideCompress)
+ {
+ compress = defaultsAndOverrides->overrideCompressValue;
+ }
+ else if(_overrideCompress)
+ {
+ compress = _compress;
+ }
+ else
+ {
+ return false;
+ }
+ return true;
+}
+
Int
Reference::hash() const
{
@@ -791,7 +810,7 @@ IceInternal::FixedReference::getRequestHandler(const Ice::ObjectPrxPtr& proxy) c
_fixedConnection->throwException(); // Throw in case our connection is already destroyed.
- bool compress;
+ bool compress = false;
if(defaultsAndOverrides->overrideCompress)
{
compress = defaultsAndOverrides->overrideCompressValue;
@@ -800,10 +819,6 @@ IceInternal::FixedReference::getRequestHandler(const Ice::ObjectPrxPtr& proxy) c
{
compress = _compress;
}
- else
- {
- compress = _fixedConnection->endpoint()->compress();
- }
ReferencePtr ref = const_cast<FixedReference*>(this);
return proxy->_setRequestHandler(ICE_MAKE_SHARED(ConnectionRequestHandler, ref, _fixedConnection, compress));
diff --git a/cpp/src/Ice/Reference.h b/cpp/src/Ice/Reference.h
index a48a61c66e4..7a19ab3525f 100644
--- a/cpp/src/Ice/Reference.h
+++ b/cpp/src/Ice/Reference.h
@@ -41,7 +41,7 @@ class Reference : public IceUtil::Shared
{
public:
- class GetConnectionCallback
+ class GetConnectionCallback
#ifndef ICE_CPP11_MAPPING
: public virtual IceUtil::Shared
#endif
@@ -116,6 +116,8 @@ public:
int hash() const; // Conceptually const.
+ bool getCompressOverride(bool&) const;
+
//
// Utility methods.
//
diff --git a/cpp/src/slice2cpp/Gen.cpp b/cpp/src/slice2cpp/Gen.cpp
index 7beb1fb7371..0173509b925 100644
--- a/cpp/src/slice2cpp/Gen.cpp
+++ b/cpp/src/slice2cpp/Gen.cpp
@@ -6620,7 +6620,7 @@ Slice::Gen::Cpp11LocalObjectVisitor::visitOperation(const OperationPtr& p)
{
vector<string> paramsDeclAMI;
vector<string> outParamsDeclAMI;
-
+ vector<string> paramsArgAMI;
ParamDeclList paramList = p->parameters();
for(ParamDeclList::const_iterator r = paramList.begin(); r != paramList.end(); ++r)
{
@@ -6632,6 +6632,7 @@ Slice::Gen::Cpp11LocalObjectVisitor::visitOperation(const OperationPtr& p)
{
typeString = inputTypeToString((*r)->type(), (*r)->optional(), metaData, typeCtx);
paramsDeclAMI.push_back(typeString + ' ' + paramName);
+ paramsArgAMI.push_back(paramName);
}
}
@@ -6663,11 +6664,11 @@ Slice::Gen::Cpp11LocalObjectVisitor::visitOperation(const OperationPtr& p)
H << nl << name << "Async(";
H.useCurrentPosAsIndent();
- for(vector<string>::const_iterator i = paramsDeclAMI.begin(); i != paramsDeclAMI.end(); ++i)
+ for(vector<string>::const_iterator i = paramsArgAMI.begin(); i != paramsArgAMI.end(); ++i)
{
H << *i << ",";
}
- if(!paramsDeclAMI.empty())
+ if(!paramsArgAMI.empty())
{
H << nl;
}
diff --git a/cpp/src/slice2objc/Gen.cpp b/cpp/src/slice2objc/Gen.cpp
index 80b5609417e..7ad7d9883e6 100644
--- a/cpp/src/slice2objc/Gen.cpp
+++ b/cpp/src/slice2objc/Gen.cpp
@@ -1214,15 +1214,29 @@ Slice::Gen::TypesVisitor::visitOperation(const OperationPtr& p)
if(cl->isLocal() && (cl->hasMetaData("async-oneway") || p->hasMetaData("async-oneway")))
{
- // TODO: add support for parameters when needed.
- _H << nl << "-(id<ICEAsyncResult>) begin_" << name << deprecateSymbol << ";";
- _H << nl << "-(id<ICEAsyncResult>) begin_" << name << ":(void(^)(ICEException*))exception"
- << deprecateSymbol << ";";
- _H << nl << "-(id<ICEAsyncResult>) begin_" << name
- << ":(void(^)(ICEException*))exception sent:(void(^)(BOOL))sent"
- << deprecateSymbol << ";";
- _H << nl << "-(void) end_" << name << ":(id<ICEAsyncResult>)result"
- << deprecateSymbol << ";";
+ string marshalParams = getMarshalParams(p);
+ string unmarshalParams = getUnmarshalParams(p);
+
+ _H << nl << "-(id<ICEAsyncResult>) begin_" << name << marshalParams << deprecateSymbol << ";";
+ _H << nl << "-(id<ICEAsyncResult>) begin_" << name << marshalParams;
+ if(!marshalParams.empty())
+ {
+ _H << " exception";
+ }
+ _H << ":(void(^)(ICEException*))exception" << deprecateSymbol << ";";
+ _H << nl << "-(id<ICEAsyncResult>) begin_" << name << marshalParams;
+ if(!marshalParams.empty())
+ {
+ _H << " exception";
+ }
+ _H << ":(void(^)(ICEException*))exception sent:(void(^)(BOOL))sent" << deprecateSymbol << ";";
+
+ _H << nl << "-(void) end_" << name << unmarshalParams;
+ if(!unmarshalParams.empty())
+ {
+ _H << " result";
+ }
+ _H << ":(id<ICEAsyncResult>)result" << deprecateSymbol << ";";
}
}