summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp125
1 files changed, 110 insertions, 15 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 23388d399c6..598f4c983d3 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -133,6 +133,25 @@ private:
const bool _close;
};
+//
+// Class for handling Ice::Connection::begin_flushBatchRequests
+//
+class ConnectionFlushBatchAsync : public OutgoingAsyncBase
+{
+public:
+
+ ConnectionFlushBatchAsync(const Ice::ConnectionIPtr&, const InstancePtr&);
+
+ virtual Ice::ConnectionPtr getConnection() const;
+
+ void invoke(const std::string&, Ice::CompressBatch);
+
+private:
+
+ const Ice::ConnectionIPtr _connection;
+};
+typedef IceUtil::Handle<ConnectionFlushBatchAsync> ConnectionFlushBatchAsyncPtr;
+
ConnectionState connectionStateMap[] = {
ConnectionStateValidating, // StateNotInitialized
ConnectionStateValidating, // StateNotValidated
@@ -146,6 +165,72 @@ ConnectionState connectionStateMap[] = {
}
+ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection, const InstancePtr& instance) :
+ OutgoingAsyncBase(instance), _connection(connection)
+{
+}
+
+ConnectionPtr
+ConnectionFlushBatchAsync::getConnection() const
+{
+ return _connection;
+}
+
+void
+ConnectionFlushBatchAsync::invoke(const string& operation, Ice::CompressBatch compressBatch)
+{
+ _observer.attach(_instance.get(), operation);
+ try
+ {
+ AsyncStatus status;
+ bool compress;
+ int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os, compress);
+ if(batchRequestNum == 0)
+ {
+ status = AsyncStatusSent;
+ if(sent())
+ {
+ status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
+ }
+ }
+ else
+ {
+ if(compressBatch == Ice::Yes)
+ {
+ compress = true;
+ }
+ else if(compressBatch == Ice::No)
+ {
+ compress = false;
+ }
+ status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, compress, false, batchRequestNum);
+ }
+
+ if(status & AsyncStatusSent)
+ {
+ _sentSynchronously = true;
+ if(status & AsyncStatusInvokeSentCallback)
+ {
+ invokeSent();
+ }
+ }
+ }
+ catch(const RetryException& ex)
+ {
+ if(exception(*ex.get()))
+ {
+ invokeExceptionAsync();
+ }
+ }
+ catch(const Exception& ex)
+ {
+ if(exception(ex))
+ {
+ invokeExceptionAsync();
+ }
+ }
+}
+
Ice::ConnectionI::Observer::Observer() : _readStreamPos(0), _writeStreamPos(0)
{
}
@@ -698,13 +783,14 @@ Ice::ConnectionI::getBatchRequestQueue() const
#ifdef ICE_CPP11_MAPPING
void
-Ice::ConnectionI::flushBatchRequests()
+Ice::ConnectionI::flushBatchRequests(CompressBatch compress)
{
- Connection::flushBatchRequestsAsync().get();
+ Connection::flushBatchRequestsAsync(compress).get();
}
std::function<void()>
-Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)> ex,
+Ice::ConnectionI::flushBatchRequestsAsync(CompressBatch compress,
+ ::std::function<void(::std::exception_ptr)> ex,
::std::function<void(bool)> sent)
{
class ConnectionFlushBatchLambda : public ConnectionFlushBatchAsync, public LambdaInvoke
@@ -720,37 +806,42 @@ Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_
}
};
auto outAsync = make_shared<ConnectionFlushBatchLambda>(ICE_SHARED_FROM_THIS, _instance, ex, sent);
- outAsync->invoke(flushBatchRequests_name);
+ outAsync->invoke(flushBatchRequests_name, compress);
return [outAsync]() { outAsync->cancel(); };
}
#else
void
-Ice::ConnectionI::flushBatchRequests()
+Ice::ConnectionI::flushBatchRequests(CompressBatch compress)
{
- end_flushBatchRequests(begin_flushBatchRequests());
+ end_flushBatchRequests(begin_flushBatchRequests(compress));
}
AsyncResultPtr
-Ice::ConnectionI::begin_flushBatchRequests()
+Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress)
{
- return _iceI_begin_flushBatchRequests(dummyCallback, 0);
+ return _iceI_begin_flushBatchRequests(compress, dummyCallback, 0);
}
AsyncResultPtr
-Ice::ConnectionI::begin_flushBatchRequests(const CallbackPtr& cb, const LocalObjectPtr& cookie)
+Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress,
+ const CallbackPtr& cb,
+ const LocalObjectPtr& cookie)
{
- return _iceI_begin_flushBatchRequests(cb, cookie);
+ return _iceI_begin_flushBatchRequests(compress, cb, cookie);
}
AsyncResultPtr
-Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchRequestsPtr& cb,
+Ice::ConnectionI::begin_flushBatchRequests(CompressBatch compress,
+ const Callback_Connection_flushBatchRequestsPtr& cb,
const LocalObjectPtr& cookie)
{
- return _iceI_begin_flushBatchRequests(cb, cookie);
+ return _iceI_begin_flushBatchRequests(compress, cb, cookie);
}
AsyncResultPtr
-Ice::ConnectionI::_iceI_begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
+Ice::ConnectionI::_iceI_begin_flushBatchRequests(CompressBatch compress,
+ const CallbackBasePtr& cb,
+ const LocalObjectPtr& cookie)
{
class ConnectionFlushBatchAsyncWithCallback : public ConnectionFlushBatchAsync, public CallbackCompletion
{
@@ -791,8 +882,12 @@ Ice::ConnectionI::_iceI_begin_flushBatchRequests(const CallbackBasePtr& cb, cons
Ice::ConnectionPtr _connection;
};
- ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this, _communicator, _instance, cb, cookie);
- result->invoke(flushBatchRequests_name);
+ ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this,
+ _communicator,
+ _instance,
+ cb,
+ cookie);
+ result->invoke(flushBatchRequests_name, compress);
return result;
}