summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/ReapThread.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/ReapThread.cpp')
-rw-r--r--cpp/src/IceGrid/ReapThread.cpp167
1 files changed, 64 insertions, 103 deletions
diff --git a/cpp/src/IceGrid/ReapThread.cpp b/cpp/src/IceGrid/ReapThread.cpp
index 54626b1cdbf..649c5092795 100644
--- a/cpp/src/IceGrid/ReapThread.cpp
+++ b/cpp/src/IceGrid/ReapThread.cpp
@@ -8,54 +8,11 @@
using namespace std;
using namespace IceGrid;
-namespace
-{
-
-class CloseCallbackI : public Ice::CloseCallback
-{
-public:
-
- CloseCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper)
- {
- }
-
- virtual void
- closed(const Ice::ConnectionPtr& con)
- {
- _reaper->connectionClosed(con);
- }
-
-private:
-
- const ReapThreadPtr _reaper;
-};
-
-class HeartbeatCallbackI : public Ice::HeartbeatCallback
-{
-public:
-
- HeartbeatCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper)
- {
- }
-
- virtual void
- heartbeat(const Ice::ConnectionPtr& con)
- {
- _reaper->connectionHeartbeat(con);
- }
-
-private:
-
- const ReapThreadPtr _reaper;
-};
-
-}
-
ReapThread::ReapThread() :
- IceUtil::Thread("Icegrid reaper thread"),
- _closeCallback(new CloseCallbackI(this)),
- _heartbeatCallback(new HeartbeatCallbackI(this)),
- _terminated(false)
+ _closeCallback([this](const auto& con) { connectionClosed(con); }),
+ _heartbeatCallback([this](const auto& con) { connectionHeartbeat(con); }),
+ _terminated(false),
+ _thread([this] { run(); })
{
}
@@ -66,7 +23,7 @@ ReapThread::run()
while(true)
{
{
- Lock sync(*this);
+ unique_lock lock(_mutex);
if(_terminated)
{
break;
@@ -77,13 +34,13 @@ ReapThread::run()
//
// If the wake interval is zero then we wait forever.
//
- if(_wakeInterval == IceUtil::Time())
+ if(_wakeInterval == 0s)
{
- wait();
+ _condVar.wait(lock);
}
else
{
- timedWait(_wakeInterval);
+ _condVar.wait_for(lock, _wakeInterval);
}
if(_terminated)
@@ -91,18 +48,18 @@ ReapThread::run()
break;
}
- list<ReapableItem>::iterator p = _sessions.begin();
+ auto p = _sessions.begin();
while(p != _sessions.end())
{
try
{
- if(p->timeout == IceUtil::Time())
+ if(p->timeout == 0s)
{
p->item->timestamp(); // This should throw if the reapable is destroyed.
++p;
continue;
}
- else if((IceUtil::Time::now(IceUtil::Time::Monotonic) - p->item->timestamp()) > p->timeout)
+ else if((chrono::steady_clock::now() - p->item->timestamp()) > p->timeout)
{
reap.push_back(*p);
}
@@ -121,14 +78,14 @@ ReapThread::run()
//
if(p->connection)
{
- map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator q = _connections.find(p->connection);
+ auto q = _connections.find(p->connection);
if(q != _connections.end())
{
q->second.erase(p->item);
if(q->second.empty())
{
- p->connection->setCloseCallback(0);
- p->connection->setHeartbeatCallback(0);
+ p->connection->setCloseCallback(nullptr);
+ p->connection->setHeartbeatCallback(nullptr);
_connections.erase(q);
}
}
@@ -137,9 +94,9 @@ ReapThread::run()
}
}
- for(vector<ReapableItem>::const_iterator p = reap.begin(); p != reap.end(); ++p)
+ for(const auto& r : reap)
{
- p->item->destroy(false);
+ r.item->destroy(false);
}
reap.clear();
}
@@ -150,36 +107,43 @@ ReapThread::terminate()
{
list<ReapableItem> reap;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_terminated)
{
assert(_sessions.empty());
return;
}
_terminated = true;
- notify();
+ _condVar.notify_one();
reap.swap(_sessions);
- for(map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.begin(); p != _connections.end(); ++p)
+ for(const auto& conn : _connections)
{
- p->first->setCloseCallback(0);
- p->first->setHeartbeatCallback(0);
+ conn.first->setCloseCallback(nullptr);
+ conn.first->setHeartbeatCallback(nullptr);
}
_connections.clear();
- _closeCallback = 0;
- _heartbeatCallback = 0;
+ _closeCallback = nullptr;
+ _heartbeatCallback = nullptr;
}
- for(list<ReapableItem>::iterator p = reap.begin(); p != reap.end(); ++p)
+ for(const auto& r : reap)
{
- p->item->destroy(true);
+ r.item->destroy(true);
}
}
void
-ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionPtr& connection)
+ReapThread::join()
{
- Lock sync(*this);
+ _thread.join();
+}
+
+void
+ReapThread::add(const shared_ptr<Reapable>& reapable, chrono::seconds timeout,
+ const shared_ptr<Ice::Connection>& connection)
+{
+ lock_guard lock(_mutex);
if(_terminated)
{
return;
@@ -193,31 +157,26 @@ ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionP
//
// 10 seconds is the minimum permissable timeout.
//
- if(timeout > 0 && timeout < 10)
+ if(timeout > 0s && timeout < 10s)
{
- timeout = 10;
+ timeout = 10s;
}
- ReapableItem item;
- item.item = reapable;
- item.connection = connection;
- item.timeout = timeout == 0 ? IceUtil::Time() : IceUtil::Time::seconds(timeout);
- _sessions.push_back(item);
+ _sessions.push_back({ reapable, connection, timeout });
if(connection)
{
- map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.find(connection);
+ auto p = _connections.find(connection);
if(p == _connections.end())
{
- p = _connections.insert(make_pair(connection, set<ReapablePtr>())).first;
+ p = _connections.insert({connection, {} }).first;
connection->setCloseCallback(_closeCallback);
connection->setHeartbeatCallback(_heartbeatCallback);
-
}
p->second.insert(reapable);
}
- if(timeout > 0)
+ if(timeout > 0s)
{
//
// If there is a new minimum wake interval then wake the reaping
@@ -225,50 +184,52 @@ ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionP
//
if(calcWakeInterval())
{
- notify();
+ _condVar.notify_one();
}
//
// Since we just added a new session with a non null timeout there
// must be a non-zero wakeInterval.
//
- assert(_wakeInterval != IceUtil::Time());
+ assert(_wakeInterval != 0s);
}
}
void
-ReapThread::connectionHeartbeat(const Ice::ConnectionPtr& con)
+ReapThread::connectionHeartbeat(const shared_ptr<Ice::Connection>& con)
{
- Lock sync(*this);
- map<Ice::ConnectionPtr, set<ReapablePtr> >::const_iterator p = _connections.find(con);
+ lock_guard lock(_mutex);
+
+ auto p = _connections.find(con);
if(p == _connections.end())
{
- con->setCloseCallback(0);
- con->setHeartbeatCallback(0);
+ con->setCloseCallback(nullptr);
+ con->setHeartbeatCallback(nullptr);
return;
}
- for(set<ReapablePtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q)
+ for(const auto& reapable : p->second)
{
- (*q)->heartbeat();
+ reapable->heartbeat();
}
}
void
-ReapThread::connectionClosed(const Ice::ConnectionPtr& con)
+ReapThread::connectionClosed(const shared_ptr<Ice::Connection>& con)
{
- Lock sync(*this);
- map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.find(con);
+ lock_guard lock(_mutex);
+
+ auto p = _connections.find(con);
if(p == _connections.end())
{
- con->setCloseCallback(0);
- con->setHeartbeatCallback(0);
+ con->setCloseCallback(nullptr);
+ con->setHeartbeatCallback(nullptr);
return;
}
- for(set<ReapablePtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q)
+ for(const auto& reapable : p->second)
{
- (*q)->destroy(false);
+ reapable->destroy(false);
}
_connections.erase(p);
}
@@ -281,18 +242,18 @@ bool
ReapThread::calcWakeInterval()
{
// Re-calculate minimum timeout
- IceUtil::Time oldWakeInterval = _wakeInterval;
- IceUtil::Time minimum;
+ auto oldWakeInterval = _wakeInterval;
+ chrono::milliseconds minimum = 0s;
bool first = true;
- for(list<ReapableItem>::const_iterator p = _sessions.begin(); p != _sessions.end(); ++p)
+ for(const auto& session : _sessions)
{
- if(p->timeout != IceUtil::Time() && (first || p->timeout < minimum))
+ if(session.timeout != 0s && (first || session.timeout < minimum))
{
- minimum = p->timeout;
+ minimum = session.timeout;
first = false;
}
}
_wakeInterval = minimum;
- return oldWakeInterval == IceUtil::Time() || minimum < oldWakeInterval;
+ return oldWakeInterval == 0s || minimum < oldWakeInterval;
}