diff options
Diffstat (limited to 'cpp/src/Ice/CommunicatorI.cpp')
-rw-r--r-- | cpp/src/Ice/CommunicatorI.cpp | 197 |
1 files changed, 183 insertions, 14 deletions
diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp index c47a3299c19..9b9e00ce239 100644 --- a/cpp/src/Ice/CommunicatorI.cpp +++ b/cpp/src/Ice/CommunicatorI.cpp @@ -29,6 +29,168 @@ using namespace std; using namespace Ice; using namespace IceInternal; +#ifndef ICE_CPP11_MAPPING +IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; } +#endif + +CommunicatorFlushBatchAsync::~CommunicatorFlushBatchAsync() +{ + // Out of line to avoid weak vtable +} + +CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const InstancePtr& instance) : + OutgoingAsyncBase(instance) +{ + // + // _useCount is initialized to 1 to prevent premature callbacks. + // The caller must invoke ready() after all flush requests have + // been initiated. + // + _useCount = 1; +} + +void +CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con, Ice::CompressBatch compressBatch) +{ + class FlushBatch : public OutgoingAsyncBase + { + public: + + FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync, + const InstancePtr& instance, + InvocationObserver& observer) : + OutgoingAsyncBase(instance), _outAsync(outAsync), _observer(observer) + { + } + + virtual bool + sent() + { + _childObserver.detach(); + _outAsync->check(false); + return false; + } + + virtual bool + exception(const Exception& ex) + { + _childObserver.failed(ex.ice_id()); + _childObserver.detach(); + _outAsync->check(false); + return false; + } + + virtual InvocationObserver& + getObserver() + { + return _observer; + } + + virtual bool handleSent(bool, bool) + { + return false; + } + + virtual bool handleException(const Ice::Exception&) + { + return false; + } + + virtual bool handleResponse(bool) + { + return false; + } + + virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const + { + assert(false); + } + + virtual void handleInvokeException(const Ice::Exception&, OutgoingAsyncBase*) const + { + assert(false); + } + + virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const + { + assert(false); + } + + private: + + const CommunicatorFlushBatchAsyncPtr _outAsync; + InvocationObserver& _observer; + }; + + { + Lock sync(_m); + ++_useCount; + } + + try + { + OutgoingAsyncBasePtr flushBatch = ICE_MAKE_SHARED(FlushBatch, ICE_SHARED_FROM_THIS, _instance, _observer); + bool compress; + int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs(), compress); + if(batchRequestNum == 0) + { + flushBatch->sent(); + } + else + { + if(compressBatch == Ice::Yes) + { + compress = true; + } + else if(compressBatch == Ice::No) + { + compress = false; + } + con->sendAsyncRequest(flushBatch, compress, false, batchRequestNum); + } + } + catch(const LocalException&) + { + check(false); + throw; + } +} + +void +CommunicatorFlushBatchAsync::invoke(const string& operation, CompressBatch compressBatch) +{ + _observer.attach(_instance.get(), operation); + _instance->outgoingConnectionFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS, compressBatch); + _instance->objectAdapterFactory()->flushAsyncBatchRequests(ICE_SHARED_FROM_THIS, compressBatch); + check(true); +} + +void +CommunicatorFlushBatchAsync::check(bool userThread) +{ + { + Lock sync(_m); + assert(_useCount > 0); + if(--_useCount > 0) + { + return; + } + } + + if(sentImpl(true)) + { + if(userThread) + { + _sentSynchronously = true; + invokeSent(); + } + else + { + invokeSentAsync(); + } + } +} + void Ice::CommunicatorI::destroy() ICE_NOEXCEPT { @@ -210,13 +372,15 @@ const ::std::string flushBatchRequests_name = "flushBatchRequests"; #ifdef ICE_CPP11_MAPPING void -Ice::CommunicatorI::flushBatchRequests() +Ice::CommunicatorI::flushBatchRequests(CompressBatch compress) { - Communicator::flushBatchRequestsAsync().get(); + Communicator::flushBatchRequestsAsync(compress).get(); } ::std::function<void()> -Ice::CommunicatorI::flushBatchRequestsAsync(function<void(exception_ptr)> ex, function<void(bool)> sent) +Ice::CommunicatorI::flushBatchRequestsAsync(CompressBatch compress, + function<void(exception_ptr)> ex, + function<void(bool)> sent) { class CommunicatorFlushBatchLambda : public CommunicatorFlushBatchAsync, public LambdaInvoke { @@ -230,39 +394,44 @@ Ice::CommunicatorI::flushBatchRequestsAsync(function<void(exception_ptr)> ex, fu } }; auto outAsync = make_shared<CommunicatorFlushBatchLambda>(_instance, ex, sent); - outAsync->invoke(flushBatchRequests_name); + outAsync->invoke(flushBatchRequests_name, compress); return [outAsync]() { outAsync->cancel(); }; } #else void -Ice::CommunicatorI::flushBatchRequests() +Ice::CommunicatorI::flushBatchRequests(CompressBatch compress) { - end_flushBatchRequests(begin_flushBatchRequests()); + end_flushBatchRequests(begin_flushBatchRequests(compress)); } AsyncResultPtr -Ice::CommunicatorI::begin_flushBatchRequests() +Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress) { - return _iceI_begin_flushBatchRequests(::IceInternal::dummyCallback, 0); + return _iceI_begin_flushBatchRequests(compress, ::IceInternal::dummyCallback, 0); } AsyncResultPtr -Ice::CommunicatorI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie) +Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress, + const CallbackPtr& cb, + const LocalObjectPtr& cookie) { - return _iceI_begin_flushBatchRequests(cb, cookie); + return _iceI_begin_flushBatchRequests(compress, cb, cookie); } AsyncResultPtr -Ice::CommunicatorI::begin_flushBatchRequests(const Callback_Communicator_flushBatchRequestsPtr& cb, +Ice::CommunicatorI::begin_flushBatchRequests(CompressBatch compress, + const Callback_Communicator_flushBatchRequestsPtr& cb, const LocalObjectPtr& cookie) { - return _iceI_begin_flushBatchRequests(cb, cookie); + return _iceI_begin_flushBatchRequests(compress, cb, cookie); } AsyncResultPtr -Ice::CommunicatorI::_iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr& cb, const LocalObjectPtr& cookie) +Ice::CommunicatorI::_iceI_begin_flushBatchRequests(CompressBatch compress, + const IceInternal::CallbackBasePtr& cb, + const LocalObjectPtr& cookie) { class CommunicatorFlushBatchAsyncWithCallback : public CommunicatorFlushBatchAsync, public CallbackCompletion { @@ -294,7 +463,7 @@ Ice::CommunicatorI::_iceI_begin_flushBatchRequests(const IceInternal::CallbackBa }; CommunicatorFlushBatchAsyncPtr result = new CommunicatorFlushBatchAsyncWithCallback(this, _instance, cb, cookie); - result->invoke(flushBatchRequests_name); + result->invoke(flushBatchRequests_name, compress); return result; } |