summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/ReapThread.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/ReapThread.cpp')
-rw-r--r--cpp/src/IceGrid/ReapThread.cpp109
1 files changed, 106 insertions, 3 deletions
diff --git a/cpp/src/IceGrid/ReapThread.cpp b/cpp/src/IceGrid/ReapThread.cpp
index 569e6f7a4b2..68eb1181d7d 100644
--- a/cpp/src/IceGrid/ReapThread.cpp
+++ b/cpp/src/IceGrid/ReapThread.cpp
@@ -13,8 +13,39 @@
using namespace std;
using namespace IceGrid;
+namespace
+{
+
+class ConnectionCallbackI : public Ice::ConnectionCallback
+{
+public:
+
+ ConnectionCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper)
+ {
+ }
+
+ virtual void
+ heartbeat(const Ice::ConnectionPtr& con)
+ {
+ _reaper->connectionHeartbeat(con);
+ }
+
+ virtual void
+ closed(const Ice::ConnectionPtr& con)
+ {
+ _reaper->connectionClosed(con);
+ }
+
+private:
+
+ const ReapThreadPtr _reaper;
+};
+
+}
+
ReapThread::ReapThread() :
IceUtil::Thread("Icegrid reaper thread"),
+ _callback(new ConnectionCallbackI(this)),
_terminated(false)
{
}
@@ -60,21 +91,39 @@ ReapThread::run()
{
p->item->timestamp(); // This should throw if the reapable is destroyed.
++p;
+ continue;
}
else if((IceUtil::Time::now(IceUtil::Time::Monotonic) - p->item->timestamp()) > p->timeout)
{
reap.push_back(*p);
- p = _sessions.erase(p);
}
else
{
++p;
+ continue;
}
}
catch(const Ice::ObjectNotExistException&)
{
- p = _sessions.erase(p);
}
+
+ //
+ // Remove the reapable
+ //
+ if(p->connection)
+ {
+ map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator q = _connections.find(p->connection);
+ if(q != _connections.end())
+ {
+ q->second.erase(p->item);
+ if(q->second.empty())
+ {
+ p->connection->setCallback(0);
+ _connections.erase(q);
+ }
+ }
+ }
+ p = _sessions.erase(p);
}
}
@@ -100,6 +149,13 @@ ReapThread::terminate()
_terminated = true;
notify();
reap.swap(_sessions);
+
+ for(map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.begin(); p != _connections.end(); ++p)
+ {
+ p->first->setCallback(0);
+ }
+ _connections.clear();
+ _callback = 0;
}
for(list<ReapableItem>::iterator p = reap.begin(); p != reap.end(); ++p)
@@ -109,7 +165,7 @@ ReapThread::terminate()
}
void
-ReapThread::add(const ReapablePtr& reapable, int timeout)
+ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionPtr& connection)
{
Lock sync(*this);
if(_terminated)
@@ -132,9 +188,21 @@ ReapThread::add(const ReapablePtr& reapable, int timeout)
ReapableItem item;
item.item = reapable;
+ item.connection = connection;
item.timeout = timeout == 0 ? IceUtil::Time() : IceUtil::Time::seconds(timeout);
_sessions.push_back(item);
+ if(connection)
+ {
+ map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.find(connection);
+ if(p == _connections.end())
+ {
+ p = _connections.insert(make_pair(connection, set<ReapablePtr>())).first;
+ connection->setCallback(_callback);
+ }
+ p->second.insert(reapable);
+ }
+
if(timeout > 0)
{
//
@@ -154,6 +222,41 @@ ReapThread::add(const ReapablePtr& reapable, int timeout)
}
}
+void
+ReapThread::connectionHeartbeat(const Ice::ConnectionPtr& con)
+{
+ Lock sync(*this);
+ map<Ice::ConnectionPtr, set<ReapablePtr> >::const_iterator p = _connections.find(con);
+ if(p == _connections.end())
+ {
+ con->setCallback(0);
+ return;
+ }
+
+ for(set<ReapablePtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q)
+ {
+ (*q)->heartbeat();
+ }
+}
+
+void
+ReapThread::connectionClosed(const Ice::ConnectionPtr& con)
+{
+ Lock sync(*this);
+ map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.find(con);
+ if(p == _connections.end())
+ {
+ con->setCallback(0);
+ return;
+ }
+
+ for(set<ReapablePtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q)
+ {
+ (*q)->destroy(false);
+ }
+ _connections.erase(p);
+}
+
//
// Returns true if the calculated wake interval is less than the current wake
// interval (or if the original wake interval was "forever").