summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
authorJose <jose@zeroc.com>2015-12-01 17:36:19 +0100
committerJose <jose@zeroc.com>2015-12-01 17:36:19 +0100
commit2029ff368e49fae489b8ec8fc12f7d126b182dfb (patch)
tree6075d652b7851831c39e6f22004d8720864f1a60 /cpp/src/Ice/OutgoingAsync.cpp
parentThe default LMDB map size for IceGrid and IceStorm is now 10MB (Windows) (diff)
downloadice-2029ff368e49fae489b8ec8fc12f7d126b182dfb.tar.bz2
ice-2029ff368e49fae489b8ec8fc12f7d126b182dfb.tar.xz
ice-2029ff368e49fae489b8ec8fc12f7d126b182dfb.zip
C++11 mapping initial commit
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp397
1 files changed, 383 insertions, 14 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index ee1e6d5a437..cfe5adda62e 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -22,10 +22,12 @@ using namespace std;
using namespace Ice;
using namespace IceInternal;
+#ifndef ICE_CPP11_MAPPING
IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; }
IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; }
IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; }
IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatchAsync* p) { return p; }
+#endif
bool
OutgoingAsyncBase::sent()
@@ -52,12 +54,20 @@ OutgoingAsyncBase::getIs()
return 0; // Must be overriden by request that can handle responses
}
+#ifdef ICE_CPP11_MAPPING
+OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const string& operation,
+ const CallbackBasePtr& delegate) :
+ AsyncResult(communicator, instance, operation, delegate),
+#else
OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator,
const InstancePtr& instance,
const string& operation,
const CallbackBasePtr& delegate,
const LocalObjectPtr& cookie) :
AsyncResult(communicator, instance, operation, delegate, cookie),
+#endif
_os(instance.get(), Ice::currentProtocolEncoding)
{
}
@@ -83,7 +93,7 @@ OutgoingAsyncBase::finished(const Exception& ex)
return AsyncResult::finished(ex);
}
-Ice::ObjectPrx
+Ice::ObjectPrxPtr
ProxyOutgoingAsyncBase::getProxy() const
{
return _proxy;
@@ -101,7 +111,11 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc)
_cachedConnection = 0;
if(_proxy->__reference()->getInvocationTimeout() == -2)
{
+#ifdef ICE_CPP11_MAPPING
+ _instance->timer()->cancel(dynamic_pointer_cast<TimerTask>(shared_from_this()));
+#else
_instance->timer()->cancel(this);
+#endif
}
//
@@ -115,7 +129,12 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc)
// the retry interval is 0. This method can be called with the
// connection locked so we can't just retry here.
//
+#ifdef ICE_CPP11_MAPPING
+ _instance->retryQueue()->add(dynamic_pointer_cast<ProxyOutgoingAsyncBase>(shared_from_this()),
+ handleException(exc));
+#else
_instance->retryQueue()->add(this, handleException(exc));
+#endif
return false;
}
catch(const Exception& ex)
@@ -136,7 +155,11 @@ ProxyOutgoingAsyncBase::retryException(const Exception& ex)
// connection to be done.
//
_proxy->__updateRequestHandler(_handler, 0); // Clear request handler and always retry.
+#ifdef ICE_CPP11_MAPPING
+ _instance->retryQueue()->add(dynamic_pointer_cast<ProxyOutgoingAsyncBase>(shared_from_this()), 0);
+#else
_instance->retryQueue()->add(this, 0);
+#endif
}
catch(const Ice::Exception& exc)
{
@@ -155,7 +178,12 @@ ProxyOutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler)
const int timeout = _cachedConnection->timeout();
if(timeout > 0)
{
+#ifdef ICE_CPP11_MAPPING
+ _instance->timer()->schedule(dynamic_pointer_cast<TimerTask>(shared_from_this()),
+ IceUtil::Time::milliSeconds(timeout));
+#else
_instance->timer()->schedule(this, IceUtil::Time::milliSeconds(timeout));
+#endif
}
}
AsyncResult::cancelable(handler);
@@ -187,13 +215,20 @@ ProxyOutgoingAsyncBase::abort(const Ice::Exception& ex)
}
}
-ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrx& prx,
+#ifdef ICE_CPP11_MAPPING
+ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrxPtr& prx,
+ const string& operation,
+ const CallbackBasePtr& delegate) :
+ OutgoingAsyncBase(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate),
+#else
+ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrxPtr& prx,
const string& operation,
const CallbackBasePtr& delegate,
const LocalObjectPtr& cookie) :
OutgoingAsyncBase(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie),
+#endif
_proxy(prx),
- _mode(Normal),
+ _mode(ICE_ENUM(OperationMode, Normal)),
_cnt(0),
_sent(false)
{
@@ -209,7 +244,12 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
int invocationTimeout = _proxy->__reference()->getInvocationTimeout();
if(invocationTimeout > 0)
{
+#ifdef ICE_CPP11_MAPPING
+ _instance->timer()->schedule(dynamic_pointer_cast<TimerTask>(shared_from_this()),
+ IceUtil::Time::milliSeconds(invocationTimeout));
+#else
_instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout));
+#endif
}
}
else
@@ -223,7 +263,12 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
{
_sent = false;
_handler = _proxy->__getRequestHandler();
+#ifdef ICE_CPP11_MAPPING
+ AsyncStatus status = _handler->sendAsyncRequest(
+ dynamic_pointer_cast<ProxyOutgoingAsyncBase>(shared_from_this()));
+#else
AsyncStatus status = _handler->sendAsyncRequest(this);
+#endif
if(status & AsyncStatusSent)
{
if(userThread)
@@ -258,7 +303,12 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
int interval = handleException(ex);
if(interval > 0)
{
+#ifdef ICE_CPP11_MAPPING
+ _instance->retryQueue()->add(dynamic_pointer_cast<ProxyOutgoingAsyncBase>(shared_from_this()),
+ interval);
+#else
_instance->retryQueue()->add(this, interval);
+#endif
return;
}
else
@@ -293,7 +343,11 @@ ProxyOutgoingAsyncBase::sent(bool done)
{
if(_proxy->__reference()->getInvocationTimeout() != -1)
{
- _instance->timer()->cancel(this);
+#ifdef ICE_CPP11_MAPPING
+ _instance->timer()->cancel(dynamic_pointer_cast<TimerTask>(shared_from_this()));
+#else
+ _instance->timer()->cancel(this);
+#endif
}
}
return OutgoingAsyncBase::sent(done);
@@ -304,7 +358,11 @@ ProxyOutgoingAsyncBase::finished(const Exception& ex)
{
if(_proxy->__reference()->getInvocationTimeout() != -1)
{
- _instance->timer()->cancel(this);
+#ifdef ICE_CPP11_MAPPING
+ _instance->timer()->cancel(dynamic_pointer_cast<TimerTask>(shared_from_this()));
+#else
+ _instance->timer()->cancel(this);
+#endif
}
return OutgoingAsyncBase::finished(ex);
}
@@ -314,7 +372,11 @@ ProxyOutgoingAsyncBase::finished(bool ok)
{
if(_proxy->__reference()->getInvocationTimeout() != -1)
{
- _instance->timer()->cancel(this);
+#ifdef ICE_CPP11_MAPPING
+ _instance->timer()->cancel(dynamic_pointer_cast<TimerTask>(shared_from_this()));
+#else
+ _instance->timer()->cancel(this);
+#endif
}
return AsyncResult::finished(ok);
}
@@ -338,11 +400,18 @@ ProxyOutgoingAsyncBase::runTimerTask()
}
}
-OutgoingAsync::OutgoingAsync(const ObjectPrx& prx,
+#ifdef ICE_CPP11_MAPPING
+OutgoingAsync::OutgoingAsync(const ObjectPrxPtr& prx,
+ const string& operation,
+ const CallbackBasePtr& delegate) :
+ ProxyOutgoingAsyncBase(prx, operation, delegate),
+#else
+OutgoingAsync::OutgoingAsync(const ObjectPrxPtr& prx,
const string& operation,
const CallbackBasePtr& delegate,
const LocalObjectPtr& cookie) :
ProxyOutgoingAsyncBase(prx, operation, delegate, cookie),
+#endif
_encoding(getCompatibleEncoding(prx->__reference()->getEncoding()))
{
}
@@ -353,7 +422,7 @@ OutgoingAsync::prepare(const string& operation, OperationMode mode, const Contex
checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol()));
_mode = mode;
- _observer.attach(_proxy.get(), operation, context);
+ _observer.attach(_proxy, operation, context);
switch(_proxy->__reference()->getMode())
{
@@ -394,7 +463,7 @@ OutgoingAsync::prepare(const string& operation, OperationMode mode, const Contex
_os.write(static_cast<Byte>(_mode));
- if(context != 0)
+ if(context != &Ice::noExplicitContext)
{
//
// Explicit context
@@ -429,7 +498,11 @@ AsyncStatus
OutgoingAsync::invokeRemote(const ConnectionIPtr& connection, bool compress, bool response)
{
_cachedConnection = connection;
+#ifdef ICE_CPP11_MAPPING
+ return connection->sendAsyncRequest(dynamic_pointer_cast<OutgoingAsyncBase>(shared_from_this()), compress, response, 0);
+#else
return connection->sendAsyncRequest(this, compress, response, 0);
+#endif
}
AsyncStatus
@@ -620,13 +693,20 @@ OutgoingAsync::completed()
}
}
-ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrx& proxy,
+#ifdef ICE_CPP11_MAPPING
+ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy,
+ const string& operation,
+ const CallbackBasePtr& delegate) :
+ ProxyOutgoingAsyncBase(proxy, operation, delegate)
+#else
+ProxyFlushBatchAsync::ProxyFlushBatchAsync(const ObjectPrxPtr& proxy,
const string& operation,
const CallbackBasePtr& delegate,
const LocalObjectPtr& cookie) :
ProxyOutgoingAsyncBase(proxy, operation, delegate, cookie)
+#endif
{
- _observer.attach(proxy.get(), operation, 0);
+ _observer.attach(proxy, operation, 0);
_batchRequestNum = proxy->__getBatchRequestQueue()->swap(&_os);
}
@@ -645,7 +725,12 @@ ProxyFlushBatchAsync::invokeRemote(const ConnectionIPtr& connection, bool compre
}
}
_cachedConnection = connection;
+#ifdef ICE_CPP11_MAPPING
+ return connection->sendAsyncRequest(dynamic_pointer_cast<OutgoingAsyncBase>(shared_from_this()),
+ compress, false, _batchRequestNum);
+#else
return connection->sendAsyncRequest(this, compress, false, _batchRequestNum);
+#endif
}
AsyncStatus
@@ -672,13 +757,20 @@ ProxyFlushBatchAsync::invoke()
invokeImpl(true); // userThread = true
}
-ProxyGetConnection::ProxyGetConnection(const ObjectPrx& prx,
+#ifdef ICE_CPP11_MAPPING
+ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx,
+ const string& operation,
+ const CallbackBasePtr& delegate) :
+ ProxyOutgoingAsyncBase(prx, operation, delegate)
+#else
+ProxyGetConnection::ProxyGetConnection(const ObjectPrxPtr& prx,
const string& operation,
const CallbackBasePtr& delegate,
const LocalObjectPtr& cookie) :
ProxyOutgoingAsyncBase(prx, operation, delegate, cookie)
+#endif
{
- _observer.attach(prx.get(), operation, 0);
+ _observer.attach(prx, operation, 0);
}
AsyncStatus
@@ -708,13 +800,23 @@ ProxyGetConnection::invoke()
invokeImpl(true); // userThread = true
}
+#ifdef ICE_CPP11_MAPPING
+ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection,
+ const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const string& operation,
+ const CallbackBasePtr& delegate) :
+ OutgoingAsyncBase(communicator, instance, operation, delegate),
+#else
ConnectionFlushBatchAsync::ConnectionFlushBatchAsync(const ConnectionIPtr& connection,
const CommunicatorPtr& communicator,
const InstancePtr& instance,
const string& operation,
const CallbackBasePtr& delegate,
const LocalObjectPtr& cookie) :
- OutgoingAsyncBase(communicator, instance, operation, delegate, cookie), _connection(connection)
+ OutgoingAsyncBase(communicator, instance, operation, delegate, cookie),
+#endif
+ _connection(connection)
{
_observer.attach(instance.get(), operation);
}
@@ -742,7 +844,12 @@ ConnectionFlushBatchAsync::invoke()
}
else
{
+#ifdef ICE_CPP11_MAPPING
+ status = _connection->sendAsyncRequest(
+ dynamic_pointer_cast<OutgoingAsyncBase>(shared_from_this()), false, false, batchRequestNum);
+#else
status = _connection->sendAsyncRequest(this, false, false, batchRequestNum);
+#endif
}
if(status & AsyncStatusSent)
@@ -770,12 +877,20 @@ ConnectionFlushBatchAsync::invoke()
}
}
+#ifdef ICE_CPP11_MAPPING
+CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const string& operation,
+ const CallbackBasePtr& cb) :
+ AsyncResult(communicator, instance, operation, cb)
+#else
CommunicatorFlushBatchAsync::CommunicatorFlushBatchAsync(const CommunicatorPtr& communicator,
const InstancePtr& instance,
const string& operation,
const CallbackBasePtr& cb,
const LocalObjectPtr& cookie) :
AsyncResult(communicator, instance, operation, cb, cookie)
+#endif
{
_observer.attach(instance.get(), operation);
@@ -797,7 +912,11 @@ CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con)
FlushBatch(const CommunicatorFlushBatchAsyncPtr& outAsync,
const InstancePtr& instance,
InvocationObserver& observer) :
+#ifdef ICE_CPP11_MAPPING
+ OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback),
+#else
OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0),
+#endif
_outAsync(outAsync),
_observer(observer)
{
@@ -836,7 +955,12 @@ CommunicatorFlushBatchAsync::flushConnection(const ConnectionIPtr& con)
try
{
+#ifdef ICE_CPP11_MAPPING
+ auto flushBatch = make_shared<FlushBatch>(
+ dynamic_pointer_cast<CommunicatorFlushBatchAsync>(shared_from_this()), _instance, _observer);
+#else
OutgoingAsyncBasePtr flushBatch = new FlushBatch(this, _instance, _observer);
+#endif
int batchRequestNum = con->getBatchRequestQueue()->swap(flushBatch->getOs());
if(batchRequestNum == 0)
{
@@ -885,3 +1009,248 @@ CommunicatorFlushBatchAsync::check(bool userThread)
}
}
}
+
+#ifdef ICE_CPP11_MAPPING
+OnewayClosureCallback::OnewayClosureCallback(
+ const string& name,
+ const shared_ptr<Ice::ObjectPrx>& proxy,
+ function<void ()> response,
+ function<void (exception_ptr)> exception,
+ function<void (bool)> sent) :
+ __name(name),
+ __proxy(proxy),
+ __response(response),
+ __exception(exception),
+ __sent(sent)
+{
+}
+
+void
+OnewayClosureCallback::sent(const AsyncResultPtr& __result) const
+{
+ if(__sent)
+ {
+ __sent(__result->sentSynchronously());
+ }
+}
+
+bool
+OnewayClosureCallback::hasSentCallback() const
+{
+ return __sent != nullptr;
+}
+
+void
+OnewayClosureCallback::completed(const AsyncResultPtr& __result) const
+{
+ try
+ {
+ AsyncResult::__check(__result, __proxy.get(), __name);
+ bool ok = __result->__wait();
+ if(__proxy->__reference()->getMode() == Reference::ModeTwoway)
+ {
+ if(!ok)
+ {
+ try
+ {
+ __result->__throwUserException();
+ }
+ catch(const UserException& __ex)
+ {
+ throw UnknownUserException(__FILE__, __LINE__, __ex.ice_name());
+ }
+ }
+ __result->__readEmptyParams();
+ if(__response)
+ {
+ try
+ {
+ __response();
+ }
+ catch(...)
+ {
+ throw current_exception();
+ }
+ }
+ }
+ }
+ catch(const exception_ptr& ex)
+ {
+ rethrow_exception(ex);
+ }
+ catch(const Ice::Exception&)
+ {
+ if(__exception)
+ {
+ __exception(current_exception());
+ }
+ }
+}
+
+function<void ()>
+OnewayClosureCallback::invoke(
+ const string& __name,
+ const shared_ptr<Ice::ObjectPrx>& __proxy,
+ Ice::OperationMode __mode,
+ Ice::FormatType __format,
+ function<void (BasicStream*)> __marshal,
+ function<void ()> __response,
+ function<void (exception_ptr)> __exception,
+ function<void (bool)> __sent,
+ const Ice::Context& __context)
+{
+ auto __result = make_shared<OutgoingAsync>(__proxy, __name,
+ make_shared<OnewayClosureCallback>(__name, __proxy, move(__response), move(__exception), move(__sent)));
+ try
+ {
+ __result->prepare(__name, __mode, &__context);
+ if(__marshal)
+ {
+ __marshal(__result->startWriteParams(__format));
+ __result->endWriteParams();
+ }
+ else
+ {
+ __result->writeEmptyParams();
+ }
+ __result->invoke();
+ }
+ catch(const exception_ptr& ex)
+ {
+ rethrow_exception(ex);
+ }
+ catch(const Exception& __ex)
+ {
+ __result->abort(__ex);
+ }
+
+ return [__result]()
+ {
+ __result->cancel();
+ };
+}
+
+TwowayClosureCallback::TwowayClosureCallback(
+ const string& name,
+ const shared_ptr<Ice::ObjectPrx>& proxy,
+ bool readEmptyParams,
+ function<void (BasicStream*)> read,
+ function<void (const UserException&)> userException,
+ function<void (exception_ptr)> exception,
+ function<void (bool)> sent) :
+ __name(name),
+ __proxy(proxy),
+ __readEmptyParams(readEmptyParams),
+ __read(move(read)),
+ __userException(move(userException)),
+ __exception(move(exception)),
+ __sent(move(sent))
+{
+}
+
+void
+TwowayClosureCallback::sent(const AsyncResultPtr& result) const
+{
+ if(__sent != nullptr)
+ {
+ __sent(result->sentSynchronously());
+ }
+}
+
+bool
+TwowayClosureCallback::hasSentCallback() const
+{
+ return __sent != nullptr;
+}
+
+void
+TwowayClosureCallback::completed(const AsyncResultPtr& __result) const
+{
+ try
+ {
+ AsyncResult::__check(__result, __proxy.get(), __name);
+ if(!__result->__wait())
+ {
+ try
+ {
+ __result->__throwUserException();
+ }
+ catch(const Ice::UserException& __ex)
+ {
+ if(__userException)
+ {
+ __userException(__ex);
+ }
+ throw ::Ice::UnknownUserException(__FILE__, __LINE__, __ex.ice_name());
+ }
+ }
+ else
+ {
+ if(__readEmptyParams)
+ {
+ __result->__readEmptyParams();
+ __read(0);
+ }
+ else
+ {
+ __read(__result->__startReadParams());
+ }
+ }
+ }
+ catch(const exception_ptr& ex)
+ {
+ rethrow_exception(ex);
+ }
+ catch(const Ice::Exception&)
+ {
+ if(__exception)
+ {
+ __exception(current_exception());
+ }
+ }
+}
+
+function<void ()>
+TwowayClosureCallback::invoke(
+ const string& __name,
+ const shared_ptr<Ice::ObjectPrx>& __proxy,
+ OperationMode __mode,
+ FormatType __format,
+ function<void (BasicStream*)> __write,
+ bool __readEmptyParams,
+ function<void (BasicStream*)> __read,
+ function<void (const UserException&)> __userException,
+ function<void (exception_ptr)> __exception,
+ function<void (bool)> __sent,
+ const Context& __context)
+{
+ assert(__proxy);
+ auto __result = make_shared<OutgoingAsync>(__proxy, __name,
+ make_shared<TwowayClosureCallback>(__name, __proxy, __readEmptyParams, move(__read),
+ move(__userException), move(__exception), move(__sent)));
+ __proxy->__checkAsyncTwowayOnly(__name);
+ try
+ {
+ __result->prepare(__name, __mode, &__context);
+ if(__write)
+ {
+ __write(__result->startWriteParams(__format));
+ __result->endWriteParams();
+ }
+ else
+ {
+ __result->writeEmptyParams();
+ }
+ __result->invoke();
+ }
+ catch(const Exception& __ex)
+ {
+ __result->abort(__ex);
+ }
+
+ return [__result]()
+ {
+ __result->cancel();
+ };
+}
+#endif