summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
authorJose <jose@zeroc.com>2015-12-01 17:36:19 +0100
committerJose <jose@zeroc.com>2015-12-01 17:36:19 +0100
commit2029ff368e49fae489b8ec8fc12f7d126b182dfb (patch)
tree6075d652b7851831c39e6f22004d8720864f1a60 /cpp/src/Ice/ConnectionI.cpp
parentThe default LMDB map size for IceGrid and IceStorm is now 10MB (Windows) (diff)
downloadice-2029ff368e49fae489b8ec8fc12f7d126b182dfb.tar.bz2
ice-2029ff368e49fae489b8ec8fc12f7d126b182dfb.tar.xz
ice-2029ff368e49fae489b8ec8fc12f7d126b182dfb.zip
C++11 mapping initial commit
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp274
1 files changed, 239 insertions, 35 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 4a078bcc1f8..a824fbf0107 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -38,7 +38,9 @@ using namespace Ice;
using namespace Ice::Instrumentation;
using namespace IceInternal;
+#ifndef ICE_CPP11_MAPPING
Ice::LocalObject* Ice::upCast(ConnectionI* p) { return p; }
+#endif
namespace
{
@@ -353,7 +355,12 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback)
exception(ex);
if(callback)
{
+#ifdef ICE_CPP11_MAPPING
+ callback->connectionStartFailed(dynamic_pointer_cast<ConnectionI>(shared_from_this()),
+ *_exception.get());
+#else
callback->connectionStartFailed(this, *_exception.get());
+#endif
return;
}
else
@@ -365,7 +372,11 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback)
if(callback)
{
+#ifdef ICE_CPP11_MAPPING
+ callback->connectionStartCompleted(dynamic_pointer_cast<ConnectionI>(shared_from_this()));
+#else
callback->connectionStartCompleted(this);
+#endif
}
}
@@ -725,8 +736,11 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compres
// Notify the request that it's cancelable with this connection.
// This will throw if the request is canceled.
//
+#ifdef ICE_CPP11_MAPPING
+ out->cancelable(dynamic_pointer_cast<CancellationHandler>(shared_from_this()));
+#else
out->cancelable(this);
-
+#endif
Int requestId = 0;
if(response)
{
@@ -799,6 +813,81 @@ Ice::ConnectionI::flushBatchRequests()
out.invoke();
}
+#ifdef ICE_CPP11_MAPPING
+function<void ()>
+Ice::ConnectionI::flushBatchRequests_async(function<void ()>, function<void (exception_ptr)> exception,
+ function<void (bool)> sent)
+{
+ class FlushBatchRequestsCallback : public CallbackBase
+ {
+ public:
+
+ FlushBatchRequestsCallback(function<void (exception_ptr)> exception,
+ function<void (bool)> sent,
+ shared_ptr<Connection> connection) :
+ _exception(move(exception)),
+ _sent(move(sent)),
+ _connection(move(connection))
+ {
+ }
+
+ virtual void sent(const AsyncResultPtr& result) const
+ {
+ try
+ {
+ AsyncResult::__check(result, _connection.get(), __flushBatchRequests_name);
+ result->__wait();
+ }
+ catch(const ::Ice::Exception&)
+ {
+ _exception(current_exception());
+ }
+
+ if(_sent)
+ {
+ _sent(result->sentSynchronously());
+ }
+ }
+
+ virtual bool hasSentCallback() const
+ {
+ return true;
+ }
+
+
+ virtual void
+ completed(const ::Ice::AsyncResultPtr& result) const
+ {
+ try
+ {
+ AsyncResult::__check(result, _connection.get(), __flushBatchRequests_name);
+ result->__wait();
+ }
+ catch(const ::Ice::Exception&)
+ {
+ _exception(current_exception());
+ }
+ }
+
+ private:
+
+ function<void (exception_ptr)> _exception;
+ function<void (bool)> _sent;
+ shared_ptr<Connection> _connection;
+ };
+
+ auto self = dynamic_pointer_cast<ConnectionI>(shared_from_this());
+
+ auto result = make_shared<ConnectionFlushBatchAsync>(self, _communicator, _instance, __flushBatchRequests_name,
+ make_shared<FlushBatchRequestsCallback>(move(exception), move(sent), self));
+ result->invoke();
+ return [result]()
+ {
+ result->cancel();
+ };
+}
+#else
+
AsyncResultPtr
Ice::ConnectionI::begin_flushBatchRequests()
{
@@ -822,7 +911,7 @@ AsyncResultPtr
Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (const Exception&)>& exception,
const IceInternal::Function<void (bool)>& sent)
{
-#ifdef ICE_CPP11
+#ifdef ICE_CPP11_COMPILER
class Cpp11CB : public IceInternal::Cpp11FnCallbackNC
{
public:
@@ -851,7 +940,7 @@ Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (con
}
};
- return __begin_flushBatchRequests(new Cpp11CB(exception, sent), 0);
+ return __begin_flushBatchRequests(ICE_MAKE_SHARED(Cpp11CB, exception, sent), 0);
#else
assert(false); // Ice not built with C++11 support.
return 0;
@@ -861,12 +950,17 @@ Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (con
AsyncResultPtr
Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
{
- ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsync(this,
- _communicator,
- _instance,
- __flushBatchRequests_name,
- cb,
- cookie);
+ ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsync(
+#ifdef ICE_CPP11_MAPPING
+ dynamic_pointer_cast<EventHandler>(shared_from_this()),
+#else
+ this,
+#endif
+ _communicator,
+ _instance,
+ __flushBatchRequests_name,
+ cb,
+ cookie);
result->invoke();
return result;
}
@@ -877,6 +971,7 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r)
AsyncResult::__check(r, this, __flushBatchRequests_name);
r->__wait();
}
+#endif
void
Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback)
@@ -907,7 +1002,11 @@ Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback)
const ConnectionIPtr _connection;
const ConnectionCallbackPtr _callback;
};
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->dispatch(new CallbackWorkItem(dynamic_pointer_cast<ConnectionI>(shared_from_this()), callback));
+#else
_threadPool->dispatch(new CallbackWorkItem(this, callback));
+#endif
}
}
else
@@ -922,7 +1021,11 @@ Ice::ConnectionI::closeCallback(const ConnectionCallbackPtr& callback)
{
try
{
+#ifdef ICE_CPP11_MAPPING
+ callback->closed(dynamic_pointer_cast<ConnectionI>(shared_from_this()));
+#else
callback->closed(this);
+#endif
}
catch(const std::exception& ex)
{
@@ -949,7 +1052,11 @@ Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout,
if(_state == StateActive)
{
+#ifdef ICE_CPP11_MAPPING
+ _monitor->remove(dynamic_pointer_cast<ConnectionI>(shared_from_this()));
+#else
_monitor->remove(this);
+#endif
}
_monitor = _monitor->acm(timeout, close, heartbeat);
@@ -964,7 +1071,11 @@ Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout,
if(_state == StateActive)
{
+#ifdef ICE_CPP11_MAPPING
+ _monitor->add(dynamic_pointer_cast<ConnectionI>(shared_from_this()));
+#else
_monitor->add(this);
+#endif
}
}
@@ -1089,7 +1200,7 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
if(o->requestId)
{
if(_asyncRequestsHint != _asyncRequests.end() &&
- _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync))
+ _asyncRequestsHint->second == ICE_DYNAMIC_CAST(OutgoingAsync, outAsync))
{
_asyncRequests.erase(_asyncRequestsHint);
_asyncRequestsHint = _asyncRequests.end();
@@ -1128,7 +1239,7 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
}
}
- if(OutgoingAsyncPtr::dynamicCast(outAsync))
+ if(ICE_DYNAMIC_CAST(OutgoingAsync, outAsync))
{
if(_asyncRequestsHint != _asyncRequests.end())
{
@@ -1335,14 +1446,19 @@ Ice::ConnectionI::getEndpoint() const
return _endpoint; // No mutex protection necessary, _endpoint is immutable.
}
-ObjectPrx
+ObjectPrxPtr
Ice::ConnectionI::createProxy(const Identity& ident) const
{
//
// Create a reference and return a reverse proxy for this
// reference.
//
- ConnectionIPtr self = const_cast<ConnectionI*>(this);
+ ConnectionIPtr self =
+#ifdef ICE_CPP11_MAPPING
+ dynamic_pointer_cast<ConnectionI>(const_pointer_cast<VirtualShared>(shared_from_this()));
+#else
+ const_cast<ConnectionI*>(this);
+#endif
return _instance->proxyFactory()->referenceToProxy(_instance->referenceFactory()->create(ident, self));
}
@@ -1461,7 +1577,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
int dispatchCount = 0;
ThreadPoolMessage<ConnectionI> msg(current, *this);
-
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -1593,7 +1708,12 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
// satisfied before continuing.
//
scheduleTimeout(newOp);
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->update(dynamic_pointer_cast<EventHandler>(shared_from_this()),
+ current.operation, newOp);
+#else
_threadPool->update(this, current.operation, newOp);
+#endif
return;
}
@@ -1607,7 +1727,12 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
return;
}
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->unregister(dynamic_pointer_cast<EventHandler>(shared_from_this()),
+ current.operation);
+#else
_threadPool->unregister(this, current.operation);
+#endif
//
// We start out in holding state.
@@ -1655,7 +1780,12 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
if(_state < StateClosed)
{
scheduleTimeout(newOp);
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->update(dynamic_pointer_cast<EventHandler>(shared_from_this()), current.operation,
+ newOp);
+#else
_threadPool->update(this, current.operation, newOp);
+#endif
}
}
@@ -1717,9 +1847,15 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
else
{
- _threadPool->dispatchFromThisThread(new DispatchCall(this, startCB, sentCBs, compress, requestId, invokeNum,
- servantManager, adapter, outAsync, heartbeatCallback,
- current.stream));
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->dispatchFromThisThread(new DispatchCall(dynamic_pointer_cast<ConnectionI>(shared_from_this()),
+ startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, heartbeatCallback,
+ current.stream));
+#else
+ _threadPool->dispatchFromThisThread(new DispatchCall(this, startCB, sentCBs, compress, requestId,
+ invokeNum, servantManager, adapter, outAsync, heartbeatCallback, current.stream));
+#endif
+
}
}
@@ -1737,7 +1873,12 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
//
if(startCB)
{
+
+#ifdef ICE_CPP11_MAPPING
+ startCB->connectionStartCompleted(dynamic_pointer_cast<ConnectionI>(shared_from_this()));
+#else
startCB->connectionStartCompleted(this);
+#endif
++dispatchedCount;
}
@@ -1755,7 +1896,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
}
if(p->receivedReply)
{
- OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync);
+ OutgoingAsyncPtr outAsync = ICE_DYNAMIC_CAST(OutgoingAsync, p->outAsync);
if(outAsync->completed())
{
outAsync->invokeCompleted();
@@ -1782,7 +1923,11 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
{
try
{
+#ifdef ICE_CPP11_MAPPING
+ heartbeatCallback->heartbeat(dynamic_pointer_cast<ConnectionI>(shared_from_this()));
+#else
heartbeatCallback->heartbeat(this);
+#endif
}
catch(const std::exception& ex)
{
@@ -1874,7 +2019,12 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close)
}
else
{
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->dispatchFromThisThread(new FinishCall(
+ dynamic_pointer_cast<ConnectionI>(shared_from_this()), close));
+#else
_threadPool->dispatchFromThisThread(new FinishCall(this, close));
+#endif
}
}
@@ -1919,7 +2069,12 @@ Ice::ConnectionI::finish(bool close)
if(_startCallback)
{
+#ifdef ICE_CPP11_MAPPING
+ _startCallback->connectionStartFailed(
+ dynamic_pointer_cast<ConnectionI>(shared_from_this()), *_exception.get());
+#else
_startCallback->connectionStartFailed(this, *_exception.get());
+#endif
_startCallback = 0;
}
@@ -1948,7 +2103,7 @@ Ice::ConnectionI::finish(bool close)
}
if(message->receivedReply)
{
- OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync);
+ OutgoingAsyncPtr outAsync = ICE_DYNAMIC_CAST(OutgoingAsync, message->outAsync);
if(outAsync->completed())
{
outAsync->invokeCompleted();
@@ -2156,26 +2311,29 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
{
_acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
}
+}
- __setNoDelete(true);
- try
+Ice::ConnectionIPtr
+Ice::ConnectionI::create(const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const ACMMonitorPtr& monitor,
+ const TransceiverPtr& transceiver,
+ const ConnectorPtr& connector,
+ const EndpointIPtr& endpoint,
+ const ObjectAdapterIPtr& adapter)
+{
+ Ice::ConnectionIPtr conn(new ConnectionI(communicator, instance, monitor, transceiver, connector,
+ endpoint, adapter));
+ if(adapter)
{
- if(adapter)
- {
- const_cast<ThreadPoolPtr&>(_threadPool) = adapter->getThreadPool();
- }
- else
- {
- const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool();
- }
- _threadPool->initialize(this);
+ const_cast<ThreadPoolPtr&>(conn->_threadPool) = adapter->getThreadPool();
}
- catch(const IceUtil::Exception&)
+ else
{
- __setNoDelete(false);
- throw;
+ const_cast<ThreadPoolPtr&>(conn->_threadPool) = conn->_instance->clientThreadPool();
}
- __setNoDelete(false);
+ conn->_threadPool->initialize(conn);
+ return conn;
}
Ice::ConnectionI::~ConnectionI()
@@ -2296,7 +2454,12 @@ Ice::ConnectionI::setState(State state)
{
return;
}
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->_register(dynamic_pointer_cast<EventHandler>(shared_from_this()),
+ SocketOperationRead);
+#else
_threadPool->_register(this, SocketOperationRead);
+#endif
break;
}
@@ -2312,7 +2475,12 @@ Ice::ConnectionI::setState(State state)
}
if(_state == StateActive)
{
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->unregister(dynamic_pointer_cast<EventHandler>(shared_from_this()),
+ SocketOperationRead);
+#else
_threadPool->unregister(this, SocketOperationRead);
+#endif
}
break;
}
@@ -2343,7 +2511,11 @@ Ice::ConnectionI::setState(State state)
// Don't need to close now for connections so only close the transceiver
// if the selector request it.
//
+#ifdef ICE_CPP11_MAPPING
+ if(_threadPool->finish(dynamic_pointer_cast<EventHandler>(shared_from_this()), false))
+#else
if(_threadPool->finish(this, false))
+#endif
{
_transceiver->close();
}
@@ -2378,11 +2550,19 @@ Ice::ConnectionI::setState(State state)
{
_acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
}
+#ifdef ICE_CPP11_MAPPING
+ _monitor->add(dynamic_pointer_cast<ConnectionI>(shared_from_this()));
+#else
_monitor->add(this);
+#endif
}
else if(_state == StateActive)
{
+#ifdef ICE_CPP11_MAPPING
+ _monitor->remove(dynamic_pointer_cast<ConnectionI>(shared_from_this()));
+#else
_monitor->remove(this);
+#endif
}
}
@@ -2467,7 +2647,11 @@ Ice::ConnectionI::initiateShutdown()
if(op)
{
scheduleTimeout(op);
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->_register(dynamic_pointer_cast<EventHandler>(shared_from_this()), op);
+#else
_threadPool->_register(this, op);
+#endif
}
}
}
@@ -2511,7 +2695,11 @@ Ice::ConnectionI::initialize(SocketOperation operation)
if(s != SocketOperationNone)
{
scheduleTimeout(s);
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->update(dynamic_pointer_cast<EventHandler>(shared_from_this()), operation, s);
+#else
_threadPool->update(this, operation, s);
+#endif
return false;
}
@@ -2557,7 +2745,11 @@ Ice::ConnectionI::validate(SocketOperation operation)
if(op)
{
scheduleTimeout(op);
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->update(dynamic_pointer_cast<EventHandler>(shared_from_this()), operation, op);
+#else
_threadPool->update(this, operation, op);
+#endif
return false;
}
}
@@ -2586,7 +2778,11 @@ Ice::ConnectionI::validate(SocketOperation operation)
if(op)
{
scheduleTimeout(op);
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->update(dynamic_pointer_cast<EventHandler>(shared_from_this()), operation, op);
+#else
_threadPool->update(this, operation, op);
+#endif
return false;
}
}
@@ -2964,7 +3160,11 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message)
_writeStream.swap(*_sendStreams.back().stream);
scheduleTimeout(op);
+#ifdef ICE_CPP11_MAPPING
+ _threadPool->_register(dynamic_pointer_cast<EventHandler>(shared_from_this()), op);
+#else
_threadPool->_register(this, op);
+#endif
return AsyncStatusQueued;
}
@@ -3520,7 +3720,7 @@ Ice::ConnectionI::initConnectionInfo() const
}
catch(const Ice::LocalException&)
{
- _info = new ConnectionInfo();
+ _info = ICE_MAKE_SHARED(ConnectionInfo);
}
_info->connectionId = _endpoint->connectionId();
_info->incoming = _connector == 0;
@@ -3579,7 +3779,11 @@ ConnectionI::reap()
{
if(_monitor)
{
+#ifdef ICE_CPP11_MAPPING
+ _monitor->reap(dynamic_pointer_cast<ConnectionI>(shared_from_this()));
+#else
_monitor->reap(this);
+#endif
}
if(_observer)
{