diff options
Diffstat (limited to 'cpp/src/IceGrid/ReapThread.cpp')
-rw-r--r-- | cpp/src/IceGrid/ReapThread.cpp | 167 |
1 files changed, 64 insertions, 103 deletions
diff --git a/cpp/src/IceGrid/ReapThread.cpp b/cpp/src/IceGrid/ReapThread.cpp index 54626b1cdbf..649c5092795 100644 --- a/cpp/src/IceGrid/ReapThread.cpp +++ b/cpp/src/IceGrid/ReapThread.cpp @@ -8,54 +8,11 @@ using namespace std; using namespace IceGrid; -namespace -{ - -class CloseCallbackI : public Ice::CloseCallback -{ -public: - - CloseCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper) - { - } - - virtual void - closed(const Ice::ConnectionPtr& con) - { - _reaper->connectionClosed(con); - } - -private: - - const ReapThreadPtr _reaper; -}; - -class HeartbeatCallbackI : public Ice::HeartbeatCallback -{ -public: - - HeartbeatCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper) - { - } - - virtual void - heartbeat(const Ice::ConnectionPtr& con) - { - _reaper->connectionHeartbeat(con); - } - -private: - - const ReapThreadPtr _reaper; -}; - -} - ReapThread::ReapThread() : - IceUtil::Thread("Icegrid reaper thread"), - _closeCallback(new CloseCallbackI(this)), - _heartbeatCallback(new HeartbeatCallbackI(this)), - _terminated(false) + _closeCallback([this](const auto& con) { connectionClosed(con); }), + _heartbeatCallback([this](const auto& con) { connectionHeartbeat(con); }), + _terminated(false), + _thread([this] { run(); }) { } @@ -66,7 +23,7 @@ ReapThread::run() while(true) { { - Lock sync(*this); + unique_lock lock(_mutex); if(_terminated) { break; @@ -77,13 +34,13 @@ ReapThread::run() // // If the wake interval is zero then we wait forever. // - if(_wakeInterval == IceUtil::Time()) + if(_wakeInterval == 0s) { - wait(); + _condVar.wait(lock); } else { - timedWait(_wakeInterval); + _condVar.wait_for(lock, _wakeInterval); } if(_terminated) @@ -91,18 +48,18 @@ ReapThread::run() break; } - list<ReapableItem>::iterator p = _sessions.begin(); + auto p = _sessions.begin(); while(p != _sessions.end()) { try { - if(p->timeout == IceUtil::Time()) + if(p->timeout == 0s) { p->item->timestamp(); // This should throw if the reapable is destroyed. ++p; continue; } - else if((IceUtil::Time::now(IceUtil::Time::Monotonic) - p->item->timestamp()) > p->timeout) + else if((chrono::steady_clock::now() - p->item->timestamp()) > p->timeout) { reap.push_back(*p); } @@ -121,14 +78,14 @@ ReapThread::run() // if(p->connection) { - map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator q = _connections.find(p->connection); + auto q = _connections.find(p->connection); if(q != _connections.end()) { q->second.erase(p->item); if(q->second.empty()) { - p->connection->setCloseCallback(0); - p->connection->setHeartbeatCallback(0); + p->connection->setCloseCallback(nullptr); + p->connection->setHeartbeatCallback(nullptr); _connections.erase(q); } } @@ -137,9 +94,9 @@ ReapThread::run() } } - for(vector<ReapableItem>::const_iterator p = reap.begin(); p != reap.end(); ++p) + for(const auto& r : reap) { - p->item->destroy(false); + r.item->destroy(false); } reap.clear(); } @@ -150,36 +107,43 @@ ReapThread::terminate() { list<ReapableItem> reap; { - Lock sync(*this); + lock_guard lock(_mutex); if(_terminated) { assert(_sessions.empty()); return; } _terminated = true; - notify(); + _condVar.notify_one(); reap.swap(_sessions); - for(map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.begin(); p != _connections.end(); ++p) + for(const auto& conn : _connections) { - p->first->setCloseCallback(0); - p->first->setHeartbeatCallback(0); + conn.first->setCloseCallback(nullptr); + conn.first->setHeartbeatCallback(nullptr); } _connections.clear(); - _closeCallback = 0; - _heartbeatCallback = 0; + _closeCallback = nullptr; + _heartbeatCallback = nullptr; } - for(list<ReapableItem>::iterator p = reap.begin(); p != reap.end(); ++p) + for(const auto& r : reap) { - p->item->destroy(true); + r.item->destroy(true); } } void -ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionPtr& connection) +ReapThread::join() { - Lock sync(*this); + _thread.join(); +} + +void +ReapThread::add(const shared_ptr<Reapable>& reapable, chrono::seconds timeout, + const shared_ptr<Ice::Connection>& connection) +{ + lock_guard lock(_mutex); if(_terminated) { return; @@ -193,31 +157,26 @@ ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionP // // 10 seconds is the minimum permissable timeout. // - if(timeout > 0 && timeout < 10) + if(timeout > 0s && timeout < 10s) { - timeout = 10; + timeout = 10s; } - ReapableItem item; - item.item = reapable; - item.connection = connection; - item.timeout = timeout == 0 ? IceUtil::Time() : IceUtil::Time::seconds(timeout); - _sessions.push_back(item); + _sessions.push_back({ reapable, connection, timeout }); if(connection) { - map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.find(connection); + auto p = _connections.find(connection); if(p == _connections.end()) { - p = _connections.insert(make_pair(connection, set<ReapablePtr>())).first; + p = _connections.insert({connection, {} }).first; connection->setCloseCallback(_closeCallback); connection->setHeartbeatCallback(_heartbeatCallback); - } p->second.insert(reapable); } - if(timeout > 0) + if(timeout > 0s) { // // If there is a new minimum wake interval then wake the reaping @@ -225,50 +184,52 @@ ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionP // if(calcWakeInterval()) { - notify(); + _condVar.notify_one(); } // // Since we just added a new session with a non null timeout there // must be a non-zero wakeInterval. // - assert(_wakeInterval != IceUtil::Time()); + assert(_wakeInterval != 0s); } } void -ReapThread::connectionHeartbeat(const Ice::ConnectionPtr& con) +ReapThread::connectionHeartbeat(const shared_ptr<Ice::Connection>& con) { - Lock sync(*this); - map<Ice::ConnectionPtr, set<ReapablePtr> >::const_iterator p = _connections.find(con); + lock_guard lock(_mutex); + + auto p = _connections.find(con); if(p == _connections.end()) { - con->setCloseCallback(0); - con->setHeartbeatCallback(0); + con->setCloseCallback(nullptr); + con->setHeartbeatCallback(nullptr); return; } - for(set<ReapablePtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q) + for(const auto& reapable : p->second) { - (*q)->heartbeat(); + reapable->heartbeat(); } } void -ReapThread::connectionClosed(const Ice::ConnectionPtr& con) +ReapThread::connectionClosed(const shared_ptr<Ice::Connection>& con) { - Lock sync(*this); - map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.find(con); + lock_guard lock(_mutex); + + auto p = _connections.find(con); if(p == _connections.end()) { - con->setCloseCallback(0); - con->setHeartbeatCallback(0); + con->setCloseCallback(nullptr); + con->setHeartbeatCallback(nullptr); return; } - for(set<ReapablePtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q) + for(const auto& reapable : p->second) { - (*q)->destroy(false); + reapable->destroy(false); } _connections.erase(p); } @@ -281,18 +242,18 @@ bool ReapThread::calcWakeInterval() { // Re-calculate minimum timeout - IceUtil::Time oldWakeInterval = _wakeInterval; - IceUtil::Time minimum; + auto oldWakeInterval = _wakeInterval; + chrono::milliseconds minimum = 0s; bool first = true; - for(list<ReapableItem>::const_iterator p = _sessions.begin(); p != _sessions.end(); ++p) + for(const auto& session : _sessions) { - if(p->timeout != IceUtil::Time() && (first || p->timeout < minimum)) + if(session.timeout != 0s && (first || session.timeout < minimum)) { - minimum = p->timeout; + minimum = session.timeout; first = false; } } _wakeInterval = minimum; - return oldWakeInterval == IceUtil::Time() || minimum < oldWakeInterval; + return oldWakeInterval == 0s || minimum < oldWakeInterval; } |