diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 104 |
1 files changed, 63 insertions, 41 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index afd26ebdb32..f86bf0e6b58 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -47,7 +47,6 @@ namespace const ::std::string __flushBatchRequests_name = "flushBatchRequests"; - class TimeoutCallback : public IceUtil::TimerTask { public: @@ -74,7 +73,7 @@ public: DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, - const OutgoingAsyncBasePtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, + const OutgoingAsyncBasePtr& outAsync, const ICE_HEARTBEAT_CALLBACK& heartbeatCallback, BasicStream& stream) : DispatchWorkItem(connection), _connection(connection), @@ -110,7 +109,7 @@ private: const ServantManagerPtr _servantManager; const ObjectAdapterPtr _adapter; const OutgoingAsyncBasePtr _outAsync; - const ConnectionCallbackPtr _heartbeatCallback; + const ICE_HEARTBEAT_CALLBACK _heartbeatCallback; BasicStream _stream; }; @@ -540,7 +539,7 @@ Ice::ConnectionI::updateObserver() } assert(_instance->initializationData().observer); - + ConnectionObserverPtr o = _instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), _endpoint, toConnectionState(_state), @@ -959,50 +958,66 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) #endif void -Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback) +#ifdef ICE_CPP11_MAPPING +Ice::ConnectionI::setHeartbeatCallback(::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)> callback) +#else +Ice::ConnectionI::setHeartbeatCallback(const Ice::HeartbeatCallbackPtr& callback) +#endif { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + _heartbeatCallback = callback; +} +void +#ifdef ICE_CPP11_MAPPING +Ice::ConnectionI::setCloseCallback(::std::function<void (const ::std::shared_ptr<::Ice::Connection>&)> callback) +#else +Ice::ConnectionI::setCloseCallback(const Ice::CloseCallbackPtr& callback) +#endif +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state >= StateClosed) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_state >= StateClosed) + if(callback) { - if(callback) + class CallbackWorkItem : public DispatchWorkItem { - class CallbackWorkItem : public DispatchWorkItem - { - public: + public: - CallbackWorkItem(const ConnectionIPtr& connection, const ConnectionCallbackPtr& callback) : - _connection(connection), - _callback(callback) - { - } + CallbackWorkItem(const ConnectionIPtr& connection, const ICE_CLOSE_CALLBACK& callback) : + _connection(connection), + _callback(callback) + { + } - virtual void run() - { - _connection->closeCallback(_callback); - } + virtual void run() + { + _connection->closeCallback(_callback); + } - private: + private: - const ConnectionIPtr _connection; - const ConnectionCallbackPtr _callback; - }; - _threadPool->dispatch(new CallbackWorkItem(shared_from_this(), callback)); - } - } - else - { - _callback = callback; + const ConnectionIPtr _connection; + const ICE_CLOSE_CALLBACK _callback; + }; + _threadPool->dispatch(new CallbackWorkItem(shared_from_this(), callback)); } } + else + { + _closeCallback = callback; + } } void -Ice::ConnectionI::closeCallback(const ConnectionCallbackPtr& callback) +Ice::ConnectionI::closeCallback(const ICE_CLOSE_CALLBACK& callback) { try { +#ifdef ICE_CPP11_MAPPING + callback(shared_from_this()); +#else callback->closed(shared_from_this()); +#endif } catch(const std::exception& ex) { @@ -1537,7 +1552,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) ServantManagerPtr servantManager; ObjectAdapterPtr adapter; OutgoingAsyncBasePtr outAsync; - ConnectionCallbackPtr heartbeatCallback; + ICE_HEARTBEAT_CALLBACK heartbeatCallback; int dispatchCount = 0; ThreadPoolMessage<ConnectionI> msg(current, *this); @@ -1807,7 +1822,7 @@ void ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMessage>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, const OutgoingAsyncBasePtr& outAsync, - const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream) + const ICE_HEARTBEAT_CALLBACK& heartbeatCallback, BasicStream& stream) { int dispatchedCount = 0; @@ -1862,7 +1877,11 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess { try { +#ifdef ICE_CPP11_MAPPING + heartbeatCallback(shared_from_this()); +#else heartbeatCallback->heartbeat(shared_from_this()); +#endif } catch(const std::exception& ex) { @@ -1941,7 +1960,7 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close) // to call code that will potentially block (this avoids promoting a new leader and // unecessary thread creation, especially if this is called on shutdown). // - if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_callback) + if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_closeCallback && !_heartbeatCallback) { finish(close); return; @@ -2080,12 +2099,14 @@ Ice::ConnectionI::finish(bool close) _readStream.clear(); _readStream.b.clear(); - if(_callback) + if(_closeCallback) { - closeCallback(_callback); - _callback = 0; + closeCallback(_closeCallback); + _closeCallback = 0; } + _heartbeatCallback = 0; + // // This must be done last as this will cause waitUntilFinished() to return (and communicator // objects such as the timer might be destroyed too). @@ -2264,7 +2285,8 @@ Ice::ConnectionI::create(const CommunicatorPtr& communicator, Ice::ConnectionI::~ConnectionI() { assert(!_startCallback); - assert(!_callback); + assert(!_closeCallback); + assert(!_heartbeatCallback); assert(_state == StateFinished); assert(_dispatchCount == 0); assert(_sendStreams.empty()); @@ -3206,7 +3228,7 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse SocketOperation Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, - OutgoingAsyncBasePtr& outAsync, ConnectionCallbackPtr& heartbeatCallback, + OutgoingAsyncBasePtr& outAsync, ICE_HEARTBEAT_CALLBACK& heartbeatCallback, int& dispatchCount) { assert(_state > StateNotValidated && _state < StateClosed); @@ -3433,9 +3455,9 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request case validateConnectionMsg: { traceRecv(stream, _logger, _traceLevels); - if(_callback) + if(_heartbeatCallback) { - heartbeatCallback = _callback; + heartbeatCallback = _heartbeatCallback; ++dispatchCount; } break; |