diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 125 |
1 files changed, 110 insertions, 15 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 23388d399c6..598f4c983d3 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -133,6 +133,25 @@ private: const bool _close; }; +// +// Class for handling Ice::Connection::begin_flushBatchRequests +// +class ConnectionFlushBatchAsync : public OutgoingAsyncBase +{ +public: + + ConnectionFlushBatchAsync(const Ice::ConnectionIPtr&, const InstancePtr&); + + virtual Ice::ConnectionPtr getConnection() const; + + void invoke(const std::string&, Ice::CompressBatch); + +private: + + const Ice::ConnectionIPtr _connection; +}; +typedef IceUtil::Handle<ConnectionFlushBatchAsync> ConnectionFlushBatchAsyncPtr; + ConnectionState connectionStateMap[] = { ConnectionStateValidating, // StateNotInitialized ConnectionStateValidating, // StateNotValidated @@ -146,6 +165,72 @@ ConnectionState connectionStateMap[] = { } +ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, const InstancePtr& instance) : + OutgoingAsyncBase(instance), _connection(connection) +{ +} + +ConnectionPtr +ConnectionFlushBatchAsync::getConnection() const +{ + return _connection; +} + +void +ConnectionFlushBatchAsync::invoke(const string& operation, Ice::CompressBatch compressBatch) +{ + _observer.attach(_instance.get(), operation); + try + { + AsyncStatus status; + bool compress; + int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os, compress); + if(batchRequestNum == 0) + { + status = AsyncStatusSent; + if(sent()) + { + status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback); + } + } + else + { + if(compressBatch == Ice::Yes) + { + compress = true; + } + else if(compressBatch == Ice::No) + { + compress = false; + } + status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, batchRequestNum); + } + + if(status & AsyncStatusSent) + { + _sentSynchronously = true; + if(status & AsyncStatusInvokeSentCallback) + { + invokeSent(); + } + } + } + catch(const RetryException& ex) + { + if(exception(*ex.get())) + { + invokeExceptionAsync(); + } + } + catch(const Exception& ex) + { + if(exception(ex)) + { + invokeExceptionAsync(); + } + } +} + Ice::ConnectionI::Observer::Observer() : _readStreamPos(0), _writeStreamPos(0) { } @@ -698,13 +783,14 @@ Ice::ConnectionI::getBatchRequestQueue() const #ifdef ICE_CPP11_MAPPING void -Ice::ConnectionI::flushBatchRequests() +Ice::ConnectionI::flushBatchRequests(CompressBatch compress) { - Connection::flushBatchRequestsAsync().get(); + Connection::flushBatchRequestsAsync(compress).get(); } std::function<void()> -Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)> ex, +Ice::ConnectionI::flushBatchRequestsAsync(CompressBatch compress, + ::std::function<void(::std::exception_ptr)> ex, ::std::function<void(bool)> sent) { class ConnectionFlushBatchLambda : public ConnectionFlushBatchAsync, public LambdaInvoke @@ -720,37 +806,42 @@ Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_ } }; auto outAsync = make_shared<ConnectionFlushBatchLambda>(ICE_SHARED_FROM_THIS, _instance, ex, sent); - outAsync->invoke(flushBatchRequests_name); + outAsync->invoke(flushBatchRequests_name, compress); return [outAsync]() { outAsync->cancel(); }; } #else void -Ice::ConnectionI::flushBatchRequests() +Ice::ConnectionI::flushBatchRequests(CompressBatch compress) { - end_flushBatchRequests(begin_flushBatchRequests()); + end_flushBatchRequests(begin_flushBatchRequests(compress)); } AsyncResultPtr -Ice::ConnectionI::begin_flushBatchRequests() +Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress) { - return _iceI_begin_flushBatchRequests(dummyCallback, 0); + return _iceI_begin_flushBatchRequests(compress, dummyCallback, 0); } AsyncResultPtr -Ice::ConnectionI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie) +Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress, + const CallbackPtr& cb, + const LocalObjectPtr& cookie) { - return _iceI_begin_flushBatchRequests(cb, cookie); + return _iceI_begin_flushBatchRequests(compress, cb, cookie); } AsyncResultPtr -Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchRequestsPtr& cb, +Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress, + const Callback_Connection_flushBatchRequestsPtr& cb, const LocalObjectPtr& cookie) { - return _iceI_begin_flushBatchRequests(cb, cookie); + return _iceI_begin_flushBatchRequests(compress, cb, cookie); } AsyncResultPtr -Ice::ConnectionI::_iceI_begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie) +Ice::ConnectionI::_iceI_begin_flushBatchRequests(CompressBatch compress, + const CallbackBasePtr& cb, + const LocalObjectPtr& cookie) { class ConnectionFlushBatchAsyncWithCallback : public ConnectionFlushBatchAsync, public CallbackCompletion { @@ -791,8 +882,12 @@ Ice::ConnectionI::_iceI_begin_flushBatchRequests(const CallbackBasePtr& cb, cons Ice::ConnectionPtr _connection; }; - ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this, _communicator, _instance, cb, cookie); - result->invoke(flushBatchRequests_name); + ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this, + _communicator, + _instance, + cb, + cookie); + result->invoke(flushBatchRequests_name, compress); return result; } |