diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 274 |
1 files changed, 239 insertions, 35 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 4a078bcc1f8..a824fbf0107 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -38,7 +38,9 @@ using namespace Ice; using namespace Ice::Instrumentation; using namespace IceInternal; +#ifndef ICE_CPP11_MAPPING Ice::LocalObject* Ice::upCast(ConnectionI* p) { return p; } +#endif namespace { @@ -353,7 +355,12 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback) exception(ex); if(callback) { +#ifdef ICE_CPP11_MAPPING + callback->connectionStartFailed(dynamic_pointer_cast<ConnectionI>(shared_from_this()), + *_exception.get()); +#else callback->connectionStartFailed(this, *_exception.get()); +#endif return; } else @@ -365,7 +372,11 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback) if(callback) { +#ifdef ICE_CPP11_MAPPING + callback->connectionStartCompleted(dynamic_pointer_cast<ConnectionI>(shared_from_this())); +#else callback->connectionStartCompleted(this); +#endif } } @@ -725,8 +736,11 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compres // Notify the request that it's cancelable with this connection. // This will throw if the request is canceled. // +#ifdef ICE_CPP11_MAPPING + out->cancelable(dynamic_pointer_cast<CancellationHandler>(shared_from_this())); +#else out->cancelable(this); - +#endif Int requestId = 0; if(response) { @@ -799,6 +813,81 @@ Ice::ConnectionI::flushBatchRequests() out.invoke(); } +#ifdef ICE_CPP11_MAPPING +function<void ()> +Ice::ConnectionI::flushBatchRequests_async(function<void ()>, function<void (exception_ptr)> exception, + function<void (bool)> sent) +{ + class FlushBatchRequestsCallback : public CallbackBase + { + public: + + FlushBatchRequestsCallback(function<void (exception_ptr)> exception, + function<void (bool)> sent, + shared_ptr<Connection> connection) : + _exception(move(exception)), + _sent(move(sent)), + _connection(move(connection)) + { + } + + virtual void sent(const AsyncResultPtr& result) const + { + try + { + AsyncResult::__check(result, _connection.get(), __flushBatchRequests_name); + result->__wait(); + } + catch(const ::Ice::Exception&) + { + _exception(current_exception()); + } + + if(_sent) + { + _sent(result->sentSynchronously()); + } + } + + virtual bool hasSentCallback() const + { + return true; + } + + + virtual void + completed(const ::Ice::AsyncResultPtr& result) const + { + try + { + AsyncResult::__check(result, _connection.get(), __flushBatchRequests_name); + result->__wait(); + } + catch(const ::Ice::Exception&) + { + _exception(current_exception()); + } + } + + private: + + function<void (exception_ptr)> _exception; + function<void (bool)> _sent; + shared_ptr<Connection> _connection; + }; + + auto self = dynamic_pointer_cast<ConnectionI>(shared_from_this()); + + auto result = make_shared<ConnectionFlushBatchAsync>(self, _communicator, _instance, __flushBatchRequests_name, + make_shared<FlushBatchRequestsCallback>(move(exception), move(sent), self)); + result->invoke(); + return [result]() + { + result->cancel(); + }; +} +#else + AsyncResultPtr Ice::ConnectionI::begin_flushBatchRequests() { @@ -822,7 +911,7 @@ AsyncResultPtr Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (const Exception&)>& exception, const IceInternal::Function<void (bool)>& sent) { -#ifdef ICE_CPP11 +#ifdef ICE_CPP11_COMPILER class Cpp11CB : public IceInternal::Cpp11FnCallbackNC { public: @@ -851,7 +940,7 @@ Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (con } }; - return __begin_flushBatchRequests(new Cpp11CB(exception, sent), 0); + return __begin_flushBatchRequests(ICE_MAKE_SHARED(Cpp11CB, exception, sent), 0); #else assert(false); // Ice not built with C++11 support. return 0; @@ -861,12 +950,17 @@ Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (con AsyncResultPtr Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie) { - ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsync(this, - _communicator, - _instance, - __flushBatchRequests_name, - cb, - cookie); + ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsync( +#ifdef ICE_CPP11_MAPPING + dynamic_pointer_cast<EventHandler>(shared_from_this()), +#else + this, +#endif + _communicator, + _instance, + __flushBatchRequests_name, + cb, + cookie); result->invoke(); return result; } @@ -877,6 +971,7 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) AsyncResult::__check(r, this, __flushBatchRequests_name); r->__wait(); } +#endif void Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback) @@ -907,7 +1002,11 @@ Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback) const ConnectionIPtr _connection; const ConnectionCallbackPtr _callback; }; +#ifdef ICE_CPP11_MAPPING + _threadPool->dispatch(new CallbackWorkItem(dynamic_pointer_cast<ConnectionI>(shared_from_this()), callback)); +#else _threadPool->dispatch(new CallbackWorkItem(this, callback)); +#endif } } else @@ -922,7 +1021,11 @@ Ice::ConnectionI::closeCallback(const ConnectionCallbackPtr& callback) { try { +#ifdef ICE_CPP11_MAPPING + callback->closed(dynamic_pointer_cast<ConnectionI>(shared_from_this())); +#else callback->closed(this); +#endif } catch(const std::exception& ex) { @@ -949,7 +1052,11 @@ Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout, if(_state == StateActive) { +#ifdef ICE_CPP11_MAPPING + _monitor->remove(dynamic_pointer_cast<ConnectionI>(shared_from_this())); +#else _monitor->remove(this); +#endif } _monitor = _monitor->acm(timeout, close, heartbeat); @@ -964,7 +1071,11 @@ Ice::ConnectionI::setACM(const IceUtil::Optional<int>& timeout, if(_state == StateActive) { +#ifdef ICE_CPP11_MAPPING + _monitor->add(dynamic_pointer_cast<ConnectionI>(shared_from_this())); +#else _monitor->add(this); +#endif } } @@ -1089,7 +1200,7 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con if(o->requestId) { if(_asyncRequestsHint != _asyncRequests.end() && - _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync)) + _asyncRequestsHint->second == ICE_DYNAMIC_CAST(OutgoingAsync, outAsync)) { _asyncRequests.erase(_asyncRequestsHint); _asyncRequestsHint = _asyncRequests.end(); @@ -1128,7 +1239,7 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con } } - if(OutgoingAsyncPtr::dynamicCast(outAsync)) + if(ICE_DYNAMIC_CAST(OutgoingAsync, outAsync)) { if(_asyncRequestsHint != _asyncRequests.end()) { @@ -1335,14 +1446,19 @@ Ice::ConnectionI::getEndpoint() const return _endpoint; // No mutex protection necessary, _endpoint is immutable. } -ObjectPrx +ObjectPrxPtr Ice::ConnectionI::createProxy(const Identity& ident) const { // // Create a reference and return a reverse proxy for this // reference. // - ConnectionIPtr self = const_cast<ConnectionI*>(this); + ConnectionIPtr self = +#ifdef ICE_CPP11_MAPPING + dynamic_pointer_cast<ConnectionI>(const_pointer_cast<VirtualShared>(shared_from_this())); +#else + const_cast<ConnectionI*>(this); +#endif return _instance->proxyFactory()->referenceToProxy(_instance->referenceFactory()->create(ident, self)); } @@ -1461,7 +1577,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) int dispatchCount = 0; ThreadPoolMessage<ConnectionI> msg(current, *this); - { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1593,7 +1708,12 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // satisfied before continuing. // scheduleTimeout(newOp); +#ifdef ICE_CPP11_MAPPING + _threadPool->update(dynamic_pointer_cast<EventHandler>(shared_from_this()), + current.operation, newOp); +#else _threadPool->update(this, current.operation, newOp); +#endif return; } @@ -1607,7 +1727,12 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) return; } +#ifdef ICE_CPP11_MAPPING + _threadPool->unregister(dynamic_pointer_cast<EventHandler>(shared_from_this()), + current.operation); +#else _threadPool->unregister(this, current.operation); +#endif // // We start out in holding state. @@ -1655,7 +1780,12 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) if(_state < StateClosed) { scheduleTimeout(newOp); +#ifdef ICE_CPP11_MAPPING + _threadPool->update(dynamic_pointer_cast<EventHandler>(shared_from_this()), current.operation, + newOp); +#else _threadPool->update(this, current.operation, newOp); +#endif } } @@ -1717,9 +1847,15 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } else { - _threadPool->dispatchFromThisThread(new DispatchCall(this, startCB, sentCBs, compress, requestId, invokeNum, - servantManager, adapter, outAsync, heartbeatCallback, - current.stream)); +#ifdef ICE_CPP11_MAPPING + _threadPool->dispatchFromThisThread(new DispatchCall(dynamic_pointer_cast<ConnectionI>(shared_from_this()), + startCB, sentCBs, compress, requestId, invokeNum, servantManager, adapter, outAsync, heartbeatCallback, + current.stream)); +#else + _threadPool->dispatchFromThisThread(new DispatchCall(this, startCB, sentCBs, compress, requestId, + invokeNum, servantManager, adapter, outAsync, heartbeatCallback, current.stream)); +#endif + } } @@ -1737,7 +1873,12 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess // if(startCB) { + +#ifdef ICE_CPP11_MAPPING + startCB->connectionStartCompleted(dynamic_pointer_cast<ConnectionI>(shared_from_this())); +#else startCB->connectionStartCompleted(this); +#endif ++dispatchedCount; } @@ -1755,7 +1896,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess } if(p->receivedReply) { - OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync); + OutgoingAsyncPtr outAsync = ICE_DYNAMIC_CAST(OutgoingAsync, p->outAsync); if(outAsync->completed()) { outAsync->invokeCompleted(); @@ -1782,7 +1923,11 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess { try { +#ifdef ICE_CPP11_MAPPING + heartbeatCallback->heartbeat(dynamic_pointer_cast<ConnectionI>(shared_from_this())); +#else heartbeatCallback->heartbeat(this); +#endif } catch(const std::exception& ex) { @@ -1874,7 +2019,12 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close) } else { +#ifdef ICE_CPP11_MAPPING + _threadPool->dispatchFromThisThread(new FinishCall( + dynamic_pointer_cast<ConnectionI>(shared_from_this()), close)); +#else _threadPool->dispatchFromThisThread(new FinishCall(this, close)); +#endif } } @@ -1919,7 +2069,12 @@ Ice::ConnectionI::finish(bool close) if(_startCallback) { +#ifdef ICE_CPP11_MAPPING + _startCallback->connectionStartFailed( + dynamic_pointer_cast<ConnectionI>(shared_from_this()), *_exception.get()); +#else _startCallback->connectionStartFailed(this, *_exception.get()); +#endif _startCallback = 0; } @@ -1948,7 +2103,7 @@ Ice::ConnectionI::finish(bool close) } if(message->receivedReply) { - OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync); + OutgoingAsyncPtr outAsync = ICE_DYNAMIC_CAST(OutgoingAsync, message->outAsync); if(outAsync->completed()) { outAsync->invokeCompleted(); @@ -2156,26 +2311,29 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, { _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); } +} - __setNoDelete(true); - try +Ice::ConnectionIPtr +Ice::ConnectionI::create(const CommunicatorPtr& communicator, + const InstancePtr& instance, + const ACMMonitorPtr& monitor, + const TransceiverPtr& transceiver, + const ConnectorPtr& connector, + const EndpointIPtr& endpoint, + const ObjectAdapterIPtr& adapter) +{ + Ice::ConnectionIPtr conn(new ConnectionI(communicator, instance, monitor, transceiver, connector, + endpoint, adapter)); + if(adapter) { - if(adapter) - { - const_cast<ThreadPoolPtr&>(_threadPool) = adapter->getThreadPool(); - } - else - { - const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool(); - } - _threadPool->initialize(this); + const_cast<ThreadPoolPtr&>(conn->_threadPool) = adapter->getThreadPool(); } - catch(const IceUtil::Exception&) + else { - __setNoDelete(false); - throw; + const_cast<ThreadPoolPtr&>(conn->_threadPool) = conn->_instance->clientThreadPool(); } - __setNoDelete(false); + conn->_threadPool->initialize(conn); + return conn; } Ice::ConnectionI::~ConnectionI() @@ -2296,7 +2454,12 @@ Ice::ConnectionI::setState(State state) { return; } +#ifdef ICE_CPP11_MAPPING + _threadPool->_register(dynamic_pointer_cast<EventHandler>(shared_from_this()), + SocketOperationRead); +#else _threadPool->_register(this, SocketOperationRead); +#endif break; } @@ -2312,7 +2475,12 @@ Ice::ConnectionI::setState(State state) } if(_state == StateActive) { +#ifdef ICE_CPP11_MAPPING + _threadPool->unregister(dynamic_pointer_cast<EventHandler>(shared_from_this()), + SocketOperationRead); +#else _threadPool->unregister(this, SocketOperationRead); +#endif } break; } @@ -2343,7 +2511,11 @@ Ice::ConnectionI::setState(State state) // Don't need to close now for connections so only close the transceiver // if the selector request it. // +#ifdef ICE_CPP11_MAPPING + if(_threadPool->finish(dynamic_pointer_cast<EventHandler>(shared_from_this()), false)) +#else if(_threadPool->finish(this, false)) +#endif { _transceiver->close(); } @@ -2378,11 +2550,19 @@ Ice::ConnectionI::setState(State state) { _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); } +#ifdef ICE_CPP11_MAPPING + _monitor->add(dynamic_pointer_cast<ConnectionI>(shared_from_this())); +#else _monitor->add(this); +#endif } else if(_state == StateActive) { +#ifdef ICE_CPP11_MAPPING + _monitor->remove(dynamic_pointer_cast<ConnectionI>(shared_from_this())); +#else _monitor->remove(this); +#endif } } @@ -2467,7 +2647,11 @@ Ice::ConnectionI::initiateShutdown() if(op) { scheduleTimeout(op); +#ifdef ICE_CPP11_MAPPING + _threadPool->_register(dynamic_pointer_cast<EventHandler>(shared_from_this()), op); +#else _threadPool->_register(this, op); +#endif } } } @@ -2511,7 +2695,11 @@ Ice::ConnectionI::initialize(SocketOperation operation) if(s != SocketOperationNone) { scheduleTimeout(s); +#ifdef ICE_CPP11_MAPPING + _threadPool->update(dynamic_pointer_cast<EventHandler>(shared_from_this()), operation, s); +#else _threadPool->update(this, operation, s); +#endif return false; } @@ -2557,7 +2745,11 @@ Ice::ConnectionI::validate(SocketOperation operation) if(op) { scheduleTimeout(op); +#ifdef ICE_CPP11_MAPPING + _threadPool->update(dynamic_pointer_cast<EventHandler>(shared_from_this()), operation, op); +#else _threadPool->update(this, operation, op); +#endif return false; } } @@ -2586,7 +2778,11 @@ Ice::ConnectionI::validate(SocketOperation operation) if(op) { scheduleTimeout(op); +#ifdef ICE_CPP11_MAPPING + _threadPool->update(dynamic_pointer_cast<EventHandler>(shared_from_this()), operation, op); +#else _threadPool->update(this, operation, op); +#endif return false; } } @@ -2964,7 +3160,11 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) _writeStream.swap(*_sendStreams.back().stream); scheduleTimeout(op); +#ifdef ICE_CPP11_MAPPING + _threadPool->_register(dynamic_pointer_cast<EventHandler>(shared_from_this()), op); +#else _threadPool->_register(this, op); +#endif return AsyncStatusQueued; } @@ -3520,7 +3720,7 @@ Ice::ConnectionI::initConnectionInfo() const } catch(const Ice::LocalException&) { - _info = new ConnectionInfo(); + _info = ICE_MAKE_SHARED(ConnectionInfo); } _info->connectionId = _endpoint->connectionId(); _info->incoming = _connector == 0; @@ -3579,7 +3779,11 @@ ConnectionI::reap() { if(_monitor) { +#ifdef ICE_CPP11_MAPPING + _monitor->reap(dynamic_pointer_cast<ConnectionI>(shared_from_this())); +#else _monitor->reap(this); +#endif } if(_observer) { |