summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/CommunicatorI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/CommunicatorI.cpp')
-rw-r--r--cpp/src/Ice/CommunicatorI.cpp197
1 files changed, 183 insertions, 14 deletions
diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp
index c47a3299c19..9b9e00ce239 100644
--- a/cpp/src/Ice/CommunicatorI.cpp
+++ b/cpp/src/Ice/CommunicatorI.cpp
@@ -29,6 +29,168 @@ using namespace std;
using namespace Ice;
using namespace IceInternal;
+#ifndef ICE_CPP11_MAPPING
+IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; }
+#endif
+
+CommunicatorFlushBatchAsync::~CommunicatorFlushBatchAsync()
+{
+ // Out of line to avoid weak vtable
+}
+
+CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const InstancePtr& instance) :
+ OutgoingAsyncBase(instance)
+{
+ //
+ // _useCount is initialized to 1 to prevent premature callbacks.
+ // The caller must invoke ready() after all flush requests have
+ // been initiated.
+ //
+ _useCount = 1;
+}
+
+void
+CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con, Ice::CompressBatch compressBatch)
+{
+ class FlushBatch : public OutgoingAsyncBase
+ {
+ public:
+
+ FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync,
+ const InstancePtr& instance,
+ InvocationObserver& observer) :
+ OutgoingAsyncBase(instance), _outAsync(outAsync), _observer(observer)
+ {
+ }
+
+ virtual bool
+ sent()
+ {
+ _childObserver.detach();
+ _outAsync->check(false);
+ return false;
+ }
+
+ virtual bool
+ exception(const Exception& ex)
+ {
+ _childObserver.failed(ex.ice_id());
+ _childObserver.detach();
+ _outAsync->check(false);
+ return false;
+ }
+
+ virtual InvocationObserver&
+ getObserver()
+ {
+ return _observer;
+ }
+
+ virtual bool handleSent(bool, bool)
+ {
+ return false;
+ }
+
+ virtual bool handleException(const Ice::Exception&)
+ {
+ return false;
+ }
+
+ virtual bool handleResponse(bool)
+ {
+ return false;
+ }
+
+ virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const
+ {
+ assert(false);
+ }
+
+ virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const
+ {
+ assert(false);
+ }
+
+ virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const
+ {
+ assert(false);
+ }
+
+ private:
+
+ const CommunicatorFlushBatchAsyncPtr _outAsync;
+ InvocationObserver& _observer;
+ };
+
+ {
+ Lock sync(_m);
+ ++_useCount;
+ }
+
+ try
+ {
+ OutgoingAsyncBasePtr flushBatch = ICE_MAKE_SHARED(FlushBatch, ICE_SHARED_FROM_THIS, _instance, _observer);
+ bool compress;
+ int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs(), compress);
+ if(batchRequestNum == 0)
+ {
+ flushBatch->sent();
+ }
+ else
+ {
+ if(compressBatch == Ice::Yes)
+ {
+ compress = true;
+ }
+ else if(compressBatch == Ice::No)
+ {
+ compress = false;
+ }
+ con->sendAsyncRequest(flushBatch, compress, false, batchRequestNum);
+ }
+ }
+ catch(const LocalException&)
+ {
+ check(false);
+ throw;
+ }
+}
+
+void
+CommunicatorFlushBatchAsync::invoke(const string& operation, CompressBatch compressBatch)
+{
+ _observer.attach(_instance.get(), operation);
+ _instance->outgoingConnectionFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS, compressBatch);
+ _instance->objectAdapterFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS, compressBatch);
+ check(true);
+}
+
+void
+CommunicatorFlushBatchAsync::check(bool userThread)
+{
+ {
+ Lock sync(_m);
+ assert(_useCount > 0);
+ if(--_useCount > 0)
+ {
+ return;
+ }
+ }
+
+ if(sentImpl(true))
+ {
+ if(userThread)
+ {
+ _sentSynchronously = true;
+ invokeSent();
+ }
+ else
+ {
+ invokeSentAsync();
+ }
+ }
+}
+
void
Ice::CommunicatorI::destroy() ICE_NOEXCEPT
{
@@ -210,13 +372,15 @@ const ::std::string flushBatchRequests_name = "flushBatchRequests";
#ifdef ICE_CPP11_MAPPING
void
-Ice::CommunicatorI::flushBatchRequests()
+Ice::CommunicatorI::flushBatchRequests(CompressBatch compress)
{
- Communicator::flushBatchRequestsAsync().get();
+ Communicator::flushBatchRequestsAsync(compress).get();
}
::std::function<void()>
-Ice::CommunicatorI::flushBatchRequestsAsync(function<void(exception_ptr)> ex, function<void(bool)> sent)
+Ice::CommunicatorI::flushBatchRequestsAsync(CompressBatch compress,
+ function<void(exception_ptr)> ex,
+ function<void(bool)> sent)
{
class CommunicatorFlushBatchLambda : public CommunicatorFlushBatchAsync, public LambdaInvoke
{
@@ -230,39 +394,44 @@ Ice::CommunicatorI::flushBatchRequestsAsync(function<void(exception_ptr)> ex, fu
}
};
auto outAsync = make_shared<CommunicatorFlushBatchLambda>(_instance, ex, sent);
- outAsync->invoke(flushBatchRequests_name);
+ outAsync->invoke(flushBatchRequests_name, compress);
return [outAsync]() { outAsync->cancel(); };
}
#else
void
-Ice::CommunicatorI::flushBatchRequests()
+Ice::CommunicatorI::flushBatchRequests(CompressBatch compress)
{
- end_flushBatchRequests(begin_flushBatchRequests());
+ end_flushBatchRequests(begin_flushBatchRequests(compress));
}
AsyncResultPtr
-Ice::CommunicatorI::begin_flushBatchRequests()
+Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress)
{
- return _iceI_begin_flushBatchRequests(::IceInternal::dummyCallback, 0);
+ return _iceI_begin_flushBatchRequests(compress, ::IceInternal::dummyCallback, 0);
}
AsyncResultPtr
-Ice::CommunicatorI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie)
+Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress,
+ const CallbackPtr& cb,
+ const LocalObjectPtr& cookie)
{
- return _iceI_begin_flushBatchRequests(cb, cookie);
+ return _iceI_begin_flushBatchRequests(compress, cb, cookie);
}
AsyncResultPtr
-Ice::CommunicatorI::begin_flushBatchRequests(const Callback_Communicator_flushBatchRequestsPtr& cb,
+Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress,
+ const Callback_Communicator_flushBatchRequestsPtr& cb,
const LocalObjectPtr& cookie)
{
- return _iceI_begin_flushBatchRequests(cb, cookie);
+ return _iceI_begin_flushBatchRequests(compress, cb, cookie);
}
AsyncResultPtr
-Ice::CommunicatorI::_iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr& cb, const LocalObjectPtr& cookie)
+Ice::CommunicatorI::_iceI_begin_flushBatchRequests(CompressBatch compress,
+ const IceInternal::CallbackBasePtr& cb,
+ const LocalObjectPtr& cookie)
{
class CommunicatorFlushBatchAsyncWithCallback : public CommunicatorFlushBatchAsync, public CallbackCompletion
{
@@ -294,7 +463,7 @@ Ice::CommunicatorI::_iceI_begin_flushBatchRequests(const IceInternal::CallbackBa
};
CommunicatorFlushBatchAsyncPtr result = new CommunicatorFlushBatchAsyncWithCallback(this, _instance, cb, cookie);
- result->invoke(flushBatchRequests_name);
+ result->invoke(flushBatchRequests_name, compress);
return result;
}