// ********************************************************************** // // Copyright (c) 2003-present ZeroC, Inc. All rights reserved. // // ********************************************************************** #include #include using namespace std; using namespace IceGrid; namespace { class CloseCallbackI : public Ice::CloseCallback { public: CloseCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper) { } virtual void closed(const Ice::ConnectionPtr& con) { _reaper->connectionClosed(con); } private: const ReapThreadPtr _reaper; }; class HeartbeatCallbackI : public Ice::HeartbeatCallback { public: HeartbeatCallbackI(const ReapThreadPtr& reaper) : _reaper(reaper) { } virtual void heartbeat(const Ice::ConnectionPtr& con) { _reaper->connectionHeartbeat(con); } private: const ReapThreadPtr _reaper; }; } ReapThread::ReapThread() : IceUtil::Thread("Icegrid reaper thread"), _closeCallback(new CloseCallbackI(this)), _heartbeatCallback(new HeartbeatCallbackI(this)), _terminated(false) { } void ReapThread::run() { vector reap; while(true) { { Lock sync(*this); if(_terminated) { break; } calcWakeInterval(); // // If the wake interval is zero then we wait forever. // if(_wakeInterval == IceUtil::Time()) { wait(); } else { timedWait(_wakeInterval); } if(_terminated) { break; } list::iterator p = _sessions.begin(); while(p != _sessions.end()) { try { if(p->timeout == IceUtil::Time()) { p->item->timestamp(); // This should throw if the reapable is destroyed. ++p; continue; } else if((IceUtil::Time::now(IceUtil::Time::Monotonic) - p->item->timestamp()) > p->timeout) { reap.push_back(*p); } else { ++p; continue; } } catch(const Ice::ObjectNotExistException&) { } // // Remove the reapable // if(p->connection) { map >::iterator q = _connections.find(p->connection); if(q != _connections.end()) { q->second.erase(p->item); if(q->second.empty()) { p->connection->setCloseCallback(0); p->connection->setHeartbeatCallback(0); _connections.erase(q); } } } p = _sessions.erase(p); } } for(vector::const_iterator p = reap.begin(); p != reap.end(); ++p) { p->item->destroy(false); } reap.clear(); } } void ReapThread::terminate() { list reap; { Lock sync(*this); if(_terminated) { assert(_sessions.empty()); return; } _terminated = true; notify(); reap.swap(_sessions); for(map >::iterator p = _connections.begin(); p != _connections.end(); ++p) { p->first->setCloseCallback(0); p->first->setHeartbeatCallback(0); } _connections.clear(); _closeCallback = 0; _heartbeatCallback = 0; } for(list::iterator p = reap.begin(); p != reap.end(); ++p) { p->item->destroy(true); } } void ReapThread::add(const ReapablePtr& reapable, int timeout, const Ice::ConnectionPtr& connection) { Lock sync(*this); if(_terminated) { return; } // // NOTE: registering a reapable with a null timeout is allowed. The reapable is reaped // only when the reaper thread is shutdown. // // // 10 seconds is the minimum permissable timeout. // if(timeout > 0 && timeout < 10) { timeout = 10; } ReapableItem item; item.item = reapable; item.connection = connection; item.timeout = timeout == 0 ? IceUtil::Time() : IceUtil::Time::seconds(timeout); _sessions.push_back(item); if(connection) { map >::iterator p = _connections.find(connection); if(p == _connections.end()) { p = _connections.insert(make_pair(connection, set())).first; connection->setCloseCallback(_closeCallback); connection->setHeartbeatCallback(_heartbeatCallback); } p->second.insert(reapable); } if(timeout > 0) { // // If there is a new minimum wake interval then wake the reaping // thread. // if(calcWakeInterval()) { notify(); } // // Since we just added a new session with a non null timeout there // must be a non-zero wakeInterval. // assert(_wakeInterval != IceUtil::Time()); } } void ReapThread::connectionHeartbeat(const Ice::ConnectionPtr& con) { Lock sync(*this); map >::const_iterator p = _connections.find(con); if(p == _connections.end()) { con->setCloseCallback(0); con->setHeartbeatCallback(0); return; } for(set::const_iterator q = p->second.begin(); q != p->second.end(); ++q) { (*q)->heartbeat(); } } void ReapThread::connectionClosed(const Ice::ConnectionPtr& con) { Lock sync(*this); map >::iterator p = _connections.find(con); if(p == _connections.end()) { con->setCloseCallback(0); con->setHeartbeatCallback(0); return; } for(set::const_iterator q = p->second.begin(); q != p->second.end(); ++q) { (*q)->destroy(false); } _connections.erase(p); } // // Returns true if the calculated wake interval is less than the current wake // interval (or if the original wake interval was "forever"). // bool ReapThread::calcWakeInterval() { // Re-calculate minimum timeout IceUtil::Time oldWakeInterval = _wakeInterval; IceUtil::Time minimum; bool first = true; for(list::const_iterator p = _sessions.begin(); p != _sessions.end(); ++p) { if(p->timeout != IceUtil::Time() && (first || p->timeout < minimum)) { minimum = p->timeout; first = false; } } _wakeInterval = minimum; return oldWakeInterval == IceUtil::Time() || minimum < oldWakeInterval; }