diff options
Diffstat (limited to 'cpp/src/IceGrid/ReapThread.cpp')
-rw-r--r-- | cpp/src/IceGrid/ReapThread.cpp | 109 |
1 files changed, 106 insertions, 3 deletions
diff --git a/cpp/src/IceGrid/ReapThread.cpp b/cpp/src/IceGrid/ReapThread.cpp index 569e6f7a4b2..68eb1181d7d 100644 --- a/cpp/src/IceGrid/ReapThread.cpp +++ b/cpp/src/IceGrid/ReapThread.cpp @@ -13,8 +13,39 @@ using namespace std; using namespace IceGrid; +namespace +{ + +class ConnectionCallbackI : public Ice::ConnectionCallback +{ +public: + + ConnectionCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper) + { + } + + virtual void + heartbeat(const Ice::ConnectionPtr& con) + { + _reaper->connectionHeartbeat(con); + } + + virtual void + closed(const Ice::ConnectionPtr& con) + { + _reaper->connectionClosed(con); + } + +private: + + const ReapThreadPtr _reaper; +}; + +} + ReapThread::ReapThread() : IceUtil::Thread("Icegrid reaper thread"), + _callback(new ConnectionCallbackI(this)), _terminated(false) { } @@ -60,21 +91,39 @@ ReapThread::run() { p->item->timestamp(); // This should throw if the reapable is destroyed. ++p; + continue; } else if((IceUtil::Time::now(IceUtil::Time::Monotonic) - p->item->timestamp()) > p->timeout) { reap.push_back(*p); - p = _sessions.erase(p); } else { ++p; + continue; } } catch(const Ice::ObjectNotExistException&) { - p = _sessions.erase(p); } + + // + // Remove the reapable + // + if(p->connection) + { + map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator q = _connections.find(p->connection); + if(q != _connections.end()) + { + q->second.erase(p->item); + if(q->second.empty()) + { + p->connection->setCallback(0); + _connections.erase(q); + } + } + } + p = _sessions.erase(p); } } @@ -100,6 +149,13 @@ ReapThread::terminate() _terminated = true; notify(); reap.swap(_sessions); + + for(map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.begin(); p != _connections.end(); ++p) + { + p->first->setCallback(0); + } + _connections.clear(); + _callback = 0; } for(list<ReapableItem>::iterator p = reap.begin(); p != reap.end(); ++p) @@ -109,7 +165,7 @@ ReapThread::terminate() } void -ReapThread::add(const ReapablePtr& reapable, int timeout) +ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionPtr& connection) { Lock sync(*this); if(_terminated) @@ -132,9 +188,21 @@ ReapThread::add(const ReapablePtr& reapable, int timeout) ReapableItem item; item.item = reapable; + item.connection = connection; item.timeout = timeout == 0 ? IceUtil::Time() : IceUtil::Time::seconds(timeout); _sessions.push_back(item); + if(connection) + { + map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.find(connection); + if(p == _connections.end()) + { + p = _connections.insert(make_pair(connection, set<ReapablePtr>())).first; + connection->setCallback(_callback); + } + p->second.insert(reapable); + } + if(timeout > 0) { // @@ -154,6 +222,41 @@ ReapThread::add(const ReapablePtr& reapable, int timeout) } } +void +ReapThread::connectionHeartbeat(const Ice::ConnectionPtr& con) +{ + Lock sync(*this); + map<Ice::ConnectionPtr, set<ReapablePtr> >::const_iterator p = _connections.find(con); + if(p == _connections.end()) + { + con->setCallback(0); + return; + } + + for(set<ReapablePtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q) + { + (*q)->heartbeat(); + } +} + +void +ReapThread::connectionClosed(const Ice::ConnectionPtr& con) +{ + Lock sync(*this); + map<Ice::ConnectionPtr, set<ReapablePtr> >::iterator p = _connections.find(con); + if(p == _connections.end()) + { + con->setCallback(0); + return; + } + + for(set<ReapablePtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q) + { + (*q)->destroy(false); + } + _connections.erase(p); +} + // // Returns true if the calculated wake interval is less than the current wake // interval (or if the original wake interval was "forever"). |