diff options
Diffstat (limited to 'cpp/src/IceStorm/NodeI.cpp')
-rw-r--r-- | cpp/src/IceStorm/NodeI.cpp | 358 |
1 files changed, 167 insertions, 191 deletions
diff --git a/cpp/src/IceStorm/NodeI.cpp b/cpp/src/IceStorm/NodeI.cpp index 48563f2f701..794b04761fb 100644 --- a/cpp/src/IceStorm/NodeI.cpp +++ b/cpp/src/IceStorm/NodeI.cpp @@ -16,11 +16,11 @@ namespace class CheckTask : public IceUtil::TimerTask { - const NodeIPtr _node; + const shared_ptr<NodeI> _node; public: - CheckTask(const NodeIPtr& node) : _node(node) { } + CheckTask(shared_ptr<NodeI> node) : _node(move(node)) { } virtual void runTimerTask() { _node->check(); @@ -29,12 +29,12 @@ public: class MergeTask : public IceUtil::TimerTask { - const NodeIPtr _node; + const shared_ptr<NodeI> _node; const set<int> _s; public: - MergeTask(const NodeIPtr& node, const set<int>& s) : _node(node), _s(s) { } + MergeTask(shared_ptr<NodeI> node, const set<int>& s) : _node(move(node)), _s(s) { } virtual void runTimerTask() { _node->merge(_s); @@ -43,11 +43,11 @@ public: class MergeContinueTask : public IceUtil::TimerTask { - const NodeIPtr _node; + const shared_ptr<NodeI> _node; public: - MergeContinueTask(const NodeIPtr& node) : _node(node) { } + MergeContinueTask(shared_ptr<NodeI> node) : _node(move(node)) { } virtual void runTimerTask() { _node->mergeContinue(); @@ -56,11 +56,11 @@ public: class TimeoutTask: public IceUtil::TimerTask { - const NodeIPtr _node; + const shared_ptr<NodeI> _node; public: - TimeoutTask(const NodeIPtr& node) : _node(node) { } + TimeoutTask(shared_ptr<NodeI> node) : _node(move(node)) { } virtual void runTimerTask() { _node->timeout(); @@ -81,8 +81,8 @@ GroupNodeInfo::GroupNodeInfo(int i) : { } -GroupNodeInfo::GroupNodeInfo(int i, LogUpdate l, const Ice::ObjectPrx& o) : - id(i), llu(l), observer(o) +GroupNodeInfo::GroupNodeInfo(int i, LogUpdate l, shared_ptr<Ice::ObjectPrx> o) : + id(i), llu(l), observer(move(o)) { } @@ -98,34 +98,22 @@ GroupNodeInfo::operator==(const GroupNodeInfo& rhs) const return id == rhs.id; } -// -// COMPILER FIX: Clang using libc++ requires to define operator= -// -#if defined(__clang__) && defined(_LIBCPP_VERSION) -GroupNodeInfo& -GroupNodeInfo::operator=(const GroupNodeInfo& other) - -{ - const_cast<int&>(this->id) = other.id; - const_cast<LogUpdate&>(this->llu) = other.llu; - const_cast<Ice::ObjectPrx&>(this->observer) = other.observer; - return *this; -} -#endif - namespace { -static IceUtil::Time -getTimeout(const string& key, int def, const Ice::PropertiesPtr& properties, const TraceLevelsPtr& traceLevels) +static chrono::seconds +getTimeout(const shared_ptr<Instance> instance, const string& key, int def) { - int t = properties->getPropertyAsIntWithDefault(key, def); - if(t < 0) + auto properties = instance->communicator()->getProperties(); + auto traceLevels = instance->traceLevels(); + + auto t = chrono::seconds(properties->getPropertyAsIntWithDefault(key, def)); + if(t < 0s) { Ice::Warning out(traceLevels->logger); out << traceLevels->electionCat << ": " << key << " < 0; Adjusted to 1"; - t = 1; + t = 1s; } - return IceUtil::Time::seconds(t); + return t; } static string @@ -147,37 +135,31 @@ toString(const set<int>& s) } -NodeI::NodeI(const InstancePtr& instance, - const ReplicaPtr& replica, - const Ice::ObjectPrx& replicaProxy, - int id, const map<int, NodePrx>& nodes) : +NodeI::NodeI(const shared_ptr<Instance>& instance, + shared_ptr<Replica> replica, + shared_ptr<Ice::ObjectPrx> replicaProxy, + int id, const map<int, shared_ptr<NodePrx>>& nodes) : _timer(instance->timer()), _traceLevels(instance->traceLevels()), _observers(instance->observers()), - _replica(replica), - _replicaProxy(replicaProxy), + _replica(move(replica)), + _replicaProxy(move(replicaProxy)), _id(id), _nodes(nodes), - _state(NodeStateInactive), + _masterTimeout(getTimeout(instance, instance->serviceName() + ".Election.MasterTimeout", 10)), + _electionTimeout(getTimeout(instance, instance->serviceName() + ".Election.ElectionTimeout", 10)), + _mergeTimeout(getTimeout(instance, instance->serviceName() + ".Election.ResponseTimeout", 10)), + _state(NodeState::NodeStateInactive), _updateCounter(0), _max(0), _generation(-1), _destroy(false) { - map<int, NodePrx> oneway; - for(map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p) + for(const auto& node : _nodes) { - oneway[p->first] = NodePrx::uncheckedCast(p->second->ice_oneway()); + auto prx = Ice::uncheckedCast<NodePrx>(node.second->ice_oneway()); + const_cast<map<int, shared_ptr<NodePrx>>& >(_nodesOneway)[node.first] = move(prx); } - const_cast<map<int, NodePrx>& >(_nodesOneway) = oneway; - - Ice::PropertiesPtr properties = instance->communicator()->getProperties(); - const_cast<IceUtil::Time&>(_masterTimeout) = getTimeout( - instance->serviceName() + ".Election.MasterTimeout", 10, properties, _traceLevels); - const_cast<IceUtil::Time&>(_electionTimeout) = getTimeout( - instance->serviceName() + ".Election.ElectionTimeout", 10, properties, _traceLevels); - const_cast<IceUtil::Time&>(_mergeTimeout) = getTimeout( - instance->serviceName() + ".Election.ResponseTimeout", 10, properties, _traceLevels); } void @@ -204,9 +186,9 @@ NodeI::start() // We use this lock to ensure that recovery is called before CheckTask // is scheduled, even if timeout is 0 // - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); - _checkTask = new CheckTask(this); + _checkTask = make_shared<CheckTask>(shared_from_this()); _timer->schedule(_checkTask, IceUtil::Time::seconds(static_cast<IceUtil::Int64>(_nodes.size() - static_cast<size_t>(_id)) * 2)); recovery(); @@ -216,17 +198,17 @@ void NodeI::check() { { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); if(_destroy) { return; } assert(!_mergeTask); - if(_state == NodeStateElection || _state == NodeStateReorganization || _coord != _id) + if(_state == NodeState::NodeStateElection || _state == NodeState::NodeStateReorganization || _coord != _id) { assert(_checkTask); - _timer->schedule(_checkTask, _electionTimeout); + _timer->schedule(_checkTask, IceUtil::Time::seconds(_electionTimeout.count())); return; } @@ -236,15 +218,15 @@ NodeI::check() _observers->getReapedSlaves(dead); if(!dead.empty()) { - for(vector<int>::const_iterator p = dead.begin(); p != dead.end(); ++p) + for(const auto& node : dead) { - set<GroupNodeInfo>::iterator q = _up.find(GroupNodeInfo(*p)); + auto q = _up.find(GroupNodeInfo(node)); if(q != _up.end()) { if(_traceLevels->election > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat); - out << "node " << _id << ": reaping slave " << *p; + out << "node " << _id << ": reaping slave " << node; } _up.erase(q); } @@ -272,21 +254,21 @@ NodeI::check() // See if other groups exist for possible merge. set<int> tmpset; int max = -1; - for(map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p) + for(const auto& node : _nodes) { - if(p->first == _id) + if(node.first == _id) { continue; } try { - if(p->second->areYouCoordinator()) + if(node.second->areYouCoordinator()) { - if(p->first > max) + if(node.first > max) { - max = p->first; + max = node.first; } - tmpset.insert(p->first); + tmpset.insert(node.first); } } catch(const Ice::Exception& ex) @@ -294,18 +276,18 @@ NodeI::check() if(_traceLevels->election > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat); - out << "node " << _id << ": call on node " << p->first << " failed: " << ex; + out << "node " << _id << ": call on node " << node.first << " failed: " << ex; } } } - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); // If the node state has changed while the mutex has been released // then bail. We don't schedule a re-check since we're either // destroyed in which case we're going to terminate or the end of // the election/reorg will re-schedule the check. - if(_destroy || _state == NodeStateElection || _state == NodeStateReorganization || _coord != _id) + if(_destroy || _state == NodeState::NodeStateElection || _state == NodeState::NodeStateReorganization || _coord != _id) { _checkTask = 0; return; @@ -316,7 +298,7 @@ NodeI::check() if(tmpset.empty()) { assert(_checkTask); - _timer->schedule(_checkTask, _electionTimeout); + _timer->schedule(_checkTask, IceUtil::Time::seconds(_electionTimeout.count())); return; } @@ -329,22 +311,21 @@ NodeI::check() out << "node " << _id << ": highest priority node count: " << max; } - IceUtil::Time delay = IceUtil::Time::seconds(0); + chrono::seconds delay = 0s; if(_id < max) { - // Reschedule timer proportial to p-i. + // Reschedule timer proportional to p-i. delay = _mergeTimeout + _mergeTimeout * (max - _id); if(_traceLevels->election > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat); - out << "node " << _id << ": scheduling merge in " << delay.toDuration() - << " seconds"; + out << "node " << _id << ": scheduling merge in " << delay.count() << " seconds"; } } assert(!_mergeTask); - _mergeTask = new MergeTask(this, tmpset); - _timer->schedule(_mergeTask, delay); + _mergeTask = make_shared<MergeTask>(shared_from_this(), tmpset); + _timer->schedule(_mergeTask, IceUtil::Time::seconds(delay.count())); } // Called if the node has not heard from the coordinator in some time. @@ -354,7 +335,7 @@ NodeI::timeout() int myCoord; string myGroup; { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); // If we're destroyed or we are our own coordinator then we're // done. if(_destroy || _coord == _id) @@ -368,7 +349,7 @@ NodeI::timeout() bool failed = false; try { - map<int, NodePrx>::const_iterator p = _nodes.find(myCoord); + auto p = _nodes.find(myCoord); assert(p != _nodes.end()); if(!p->second->areYouThere(myGroup, _id)) { @@ -402,24 +383,26 @@ NodeI::merge(const set<int>& coordinatorSet) set<int> invited; string gp; { - Lock sync(*this); + unique_lock<recursive_mutex> lock(_mutex); + _mergeTask = 0; // If the node is currently in an election, or reorganizing // then we're done. - if(_state == NodeStateElection || _state == NodeStateReorganization) + if(_state == NodeState::NodeStateElection || _state == NodeState::NodeStateReorganization) { return; } // This state change prevents this node from accepting // invitations while the merge is executing. - setState(NodeStateElection); + setState(NodeState::NodeStateElection); // No more replica changes are permitted. while(!_destroy && _updateCounter > 0) { - wait(); + // The recursive mutex (_mutex) must only be locked once by this tread + _condVar.wait(lock); } if(_destroy) { @@ -437,9 +420,9 @@ NodeI::merge(const set<int>& coordinatorSet) // Construct a set of node ids to invite. This is the union of // _up and set of coordinators gathered in the check stage. invited = coordinatorSet; - for(set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p) + for(const auto& node : _up) { - invited.insert(p->id); + invited.insert(node.id); } _coord = _id; @@ -462,7 +445,7 @@ NodeI::merge(const set<int>& coordinatorSet) Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat); out << "node " << _id << ": inviting node " << *p << " to group " << gp; } - map<int, NodePrx>::const_iterator node = _nodesOneway.find(*p); + auto node = _nodesOneway.find(*p); assert(node != _nodesOneway.end()); node->second->invitation(_id, gp); ++p; @@ -475,7 +458,7 @@ NodeI::merge(const set<int>& coordinatorSet) // Now we wait for responses to our invitation. { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); if(_destroy) { return; @@ -492,17 +475,17 @@ NodeI::merge(const set<int>& coordinatorSet) // Schedule the mergeContinueTask. assert(_mergeContinueTask == 0); - _mergeContinueTask = new MergeContinueTask(this); + _mergeContinueTask = make_shared<MergeContinueTask>(shared_from_this()); // At this point we may have already accepted all of the // invitations, if so then we want to schedule the // mergeContinue immediately. - IceUtil::Time timeout = _mergeTimeout; + chrono::seconds timeout = _mergeTimeout; if(_up.size() == _nodes.size()-1 || _invitesIssued == _invitesAccepted) { - timeout = IceUtil::Time::seconds(0); + timeout = 0s; } - _timer->schedule(_mergeContinueTask, timeout); + _timer->schedule(_mergeContinueTask, IceUtil::Time::seconds(timeout.count())); } } @@ -512,7 +495,7 @@ NodeI::mergeContinue() string gp; set<GroupNodeInfo> tmpSet; { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); if(_destroy) { return; @@ -520,14 +503,14 @@ NodeI::mergeContinue() // Copy variables for thread safety. gp = _group; - tmpSet = _up; + tmpSet = set<GroupNodeInfo>(_up); assert(_mergeContinueTask); _mergeContinueTask = 0; // The node is now reorganizing. - assert(_state == NodeStateElection); - setState(NodeStateReorganization); + assert(_state == NodeState::NodeStateElection); + setState(NodeState::NodeStateReorganization); if(_traceLevels->election > 0) { @@ -562,18 +545,18 @@ NodeI::mergeContinue() // updates. int maxid = -1; LogUpdate maxllu = { -1, 0 }; - for(set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p) + for(const auto& n : tmpSet) { if(_traceLevels->election > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat); - out << "node id=" << p->id << " llu=" << p->llu.generation << "/" << p->llu.iteration; + out << "node id=" << n.id << " llu=" << n.llu.generation << "/" << n.llu.iteration; } - if(p->llu.generation > maxllu.generation || - (p->llu.generation == maxllu.generation && p->llu.iteration > maxllu.iteration)) + if(n.llu.generation > maxllu.generation || + (n.llu.generation == maxllu.generation && n.llu.iteration > maxllu.iteration)) { - maxid = p->id; - maxllu = p->llu; + maxid = n.id; + maxllu = n.llu; } } @@ -596,7 +579,7 @@ NodeI::mergeContinue() } try { - map<int, NodePrx>::const_iterator node = _nodes.find(maxid); + auto node = _nodes.find(maxid); assert(node != _nodes.end()); _replica->sync(node->second->sync()); } @@ -625,7 +608,7 @@ NodeI::mergeContinue() // state, as such we can set our _max flag. unsigned int max = static_cast<unsigned int>(tmpSet.size()) + 1; { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); if(max > _max) { _max = max; @@ -655,11 +638,11 @@ NodeI::mergeContinue() } // Tell each node to go. - for(set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p) + for(const auto& n : tmpSet) { try { - map<int, NodePrx>::const_iterator node = _nodes.find(p->id); + auto node = _nodes.find(n.id); assert(node != _nodes.end()); node->second->ready(_id, gp, _replicaProxy, static_cast<Ice::Int>(max), maxllu.generation); } @@ -668,7 +651,7 @@ NodeI::mergeContinue() if(_traceLevels->election > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat); - out << "node " << _id << ": error calling ready on " << p->id << " ex: " << ex; + out << "node " << _id << ": error calling ready on " << n.id << " ex: " << ex; } recovery(); return; @@ -676,7 +659,7 @@ NodeI::mergeContinue() } { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); if(_destroy) { return; @@ -688,19 +671,19 @@ NodeI::mergeContinue() out << "replication commencing with " << _up.size()+1 << "/" << _nodes.size() << " nodes with llu generation: " << maxllu.generation; } - setState(NodeStateNormal); + setState(NodeState::NodeStateNormal); _coordinatorProxy = 0; _generation = maxllu.generation; assert(!_checkTask); - _checkTask = new CheckTask(this); - _timer->schedule(_checkTask, _electionTimeout); + _checkTask = make_shared<CheckTask>(shared_from_this()); + _timer->schedule(_checkTask, IceUtil::Time::seconds(_electionTimeout.count())); } } void -NodeI::invitation(int j, const string& gn, const Ice::Current&) +NodeI::invitation(int j, string gn, const Ice::Current&) { if(_traceLevels->election > 0) { @@ -720,14 +703,14 @@ NodeI::invitation(int j, const string& gn, const Ice::Current&) int max = -1; set<GroupNodeInfo> tmpSet; { - Lock sync(*this); + unique_lock<recursive_mutex> lock(_mutex); if(_destroy) { return; } // If we're in the election or reorg state a merge has already // started, so ignore the invitation. - if(_state == NodeStateElection || _state == NodeStateReorganization) + if(_state == NodeState::NodeStateElection || _state == NodeState::NodeStateReorganization) { if(_traceLevels->election > 0) { @@ -751,18 +734,18 @@ NodeI::invitation(int j, const string& gn, const Ice::Current&) // The merge task is cleared in the merge. This // ensures two invitations cannot cause a race with // the merge. - //_mergeTask = 0; return; } - _mergeTask = 0; + _mergeTask = nullptr; } // We're now joining with another group. If we are active we // must stop serving as a master or slave. - setState(NodeStateElection); + setState(NodeState::NodeStateElection); while(!_destroy && _updateCounter > 0) { - wait(); + // The recursive mutex (_mutex) must only be locked once by this tread + _condVar.wait(lock); } if(_destroy) { @@ -770,7 +753,7 @@ NodeI::invitation(int j, const string& gn, const Ice::Current&) } tmpCoord = _coord; - tmpSet = _up; + tmpSet = set<GroupNodeInfo>(_up); _coord = j; _group = gn; @@ -780,14 +763,14 @@ NodeI::invitation(int j, const string& gn, const Ice::Current&) Ice::IntSeq forwardedInvites; if(tmpCoord == _id) // Forward invitation to my old members. { - for(set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p) + for(const auto& n : tmpSet) { try { - map<int, NodePrx>::const_iterator node = _nodesOneway.find(p->id); + auto node = _nodesOneway.find(n.id); assert(node != _nodesOneway.end()); node->second->invitation(j, gn); - forwardedInvites.push_back(p->id); + forwardedInvites.push_back(n.id); } catch(const Ice::Exception&) { @@ -800,23 +783,24 @@ NodeI::invitation(int j, const string& gn, const Ice::Current&) // everything is fine. Setting the state *after* calling accept // can cause a race. { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); if(_destroy) { return; } - assert(_state == NodeStateElection); - setState(NodeStateReorganization); + + assert(_state == NodeState::NodeStateElection); + setState(NodeState::NodeStateReorganization); if(!_timeoutTask) { - _timeoutTask = new TimeoutTask(this); - _timer->scheduleRepeated(_timeoutTask, _masterTimeout); + _timeoutTask = make_shared<TimeoutTask>(shared_from_this()); + _timer->scheduleRepeated(_timeoutTask, IceUtil::Time::seconds(_masterTimeout.count())); } } try { - map<int, NodePrx>::const_iterator node = _nodesOneway.find(j); + auto node = _nodesOneway.find(j); assert(node != _nodesOneway.end()); node->second->accept(_id, gn, forwardedInvites, _replica->getObserver(), _replica->getLastLogUpdate(), max); } @@ -828,11 +812,11 @@ NodeI::invitation(int j, const string& gn, const Ice::Current&) } void -NodeI::ready(int j, const string& gn, const Ice::ObjectPrx& coordinator, int max, Ice::Long generation, +NodeI::ready(int j, string gn, shared_ptr<Ice::ObjectPrx> coordinator, int max, Ice::Long generation, const Ice::Current&) { - Lock sync(*this); - if(!_destroy && _state == NodeStateReorganization && _group == gn) + lock_guard<recursive_mutex> lg(_mutex); + if(!_destroy && _state == NodeState::NodeStateReorganization && _group == gn) { // The coordinator must be j (this was set in the invitation). if(_coord != j) @@ -859,20 +843,20 @@ NodeI::ready(int j, const string& gn, const Ice::ObjectPrx& coordinator, int max // Activate the replica here since the replica is now ready // for duty. - setState(NodeStateNormal); + setState(NodeState::NodeStateNormal); _coordinatorProxy = coordinator; if(!_checkTask) { - _checkTask = new CheckTask(this); - _timer->schedule(_checkTask, _electionTimeout); + _checkTask = make_shared<CheckTask>(shared_from_this()); + _timer->schedule(_checkTask, IceUtil::Time::seconds(_electionTimeout.count())); } } } void -NodeI::accept(int j, const string& gn, const Ice::IntSeq& forwardedInvites, const Ice::ObjectPrx& observer, - const LogUpdate& llu, int max, const Ice::Current&) +NodeI::accept(int j, string gn, Ice::IntSeq forwardedInvites, shared_ptr<Ice::ObjectPrx> observer, LogUpdate llu, + int max, const Ice::Current&) { // Verify that j exists in our node set. if(_nodes.find(j) == _nodes.end()) @@ -882,8 +866,8 @@ NodeI::accept(int j, const string& gn, const Ice::IntSeq& forwardedInvites, cons return; } - Lock sync(*this); - if(!_destroy && _state == NodeStateElection && _group == gn && _coord == _id) + lock_guard<recursive_mutex> lg(_mutex); + if(!_destroy && _state == NodeState::NodeStateElection && _group == gn && _coord == _id) { _up.insert(GroupNodeInfo(j, llu, observer)); @@ -928,7 +912,7 @@ NodeI::accept(int j, const string& gn, const Ice::IntSeq& forwardedInvites, cons // merge continue immediately. Otherwise, we let the existing // merge() schedule continue. if((_up.size() == _nodes.size()-1 || _invitesIssued == _invitesAccepted) && - _mergeContinueTask && _timer->cancel(_mergeContinueTask)) + _mergeContinueTask && _timer->cancel(_mergeContinueTask)) { _timer->schedule(_mergeContinueTask, IceUtil::Time::seconds(0)); } @@ -938,18 +922,18 @@ NodeI::accept(int j, const string& gn, const Ice::IntSeq& forwardedInvites, cons bool NodeI::areYouCoordinator(const Ice::Current&) const { - Lock sync(*this); - return _state != NodeStateElection && _state != NodeStateReorganization && _coord == _id; + lock_guard<recursive_mutex> lg(_mutex); + return _state != NodeState::NodeStateElection && _state != NodeState::NodeStateReorganization && _coord == _id; } bool -NodeI::areYouThere(const string& gn, int j, const Ice::Current&) const +NodeI::areYouThere(string gn, int j, const Ice::Current&) const { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); return _group == gn && _coord == _id && _up.find(GroupNodeInfo(j)) != _up.end(); } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> NodeI::sync(const Ice::Current&) const { return _replica->getSync(); @@ -959,12 +943,9 @@ NodeInfoSeq NodeI::nodes(const Ice::Current&) const { NodeInfoSeq seq; - for(map<int, NodePrx>::const_iterator q = _nodes.begin(); q != _nodes.end(); ++q) + for(const auto& n : _nodes) { - NodeInfo ni; - ni.id = q->first; - ni.n = q->second; - seq.push_back(ni); + seq.push_back({ n.first, n.second }); } return seq; @@ -973,21 +954,18 @@ NodeI::nodes(const Ice::Current&) const QueryInfo NodeI::query(const Ice::Current&) const { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); QueryInfo info; info.id = _id; info.coord = _coord; info.group = _group; info.replica = _replicaProxy; info.state = _state; - info.max = static_cast<Ice::Int>(_max); + info.max = static_cast<int>(_max); - for(set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p) + for(const auto& gni : _up) { - GroupInfo gi; - gi.id = p->id; - gi.llu = p->llu; - info.up.push_back(gi); + info.up.push_back( { gni.id, gni.llu }); } return info; @@ -996,7 +974,7 @@ NodeI::query(const Ice::Current&) const void NodeI::recovery(Ice::Long generation) { - Lock sync(*this); + unique_lock<recursive_mutex> lock(_mutex); // Ignore the recovery if the node has already advanced a // generation. @@ -1005,10 +983,10 @@ NodeI::recovery(Ice::Long generation) return; } - setState(NodeStateInactive); + setState(NodeState::NodeStateInactive); while(!_destroy && _updateCounter > 0) { - wait(); + _condVar.wait(lock); } if(_destroy) { @@ -1042,23 +1020,23 @@ NodeI::recovery(Ice::Long generation) } if(!_checkTask) { - _checkTask = new CheckTask(this); - _timer->schedule(_checkTask, _electionTimeout); + _checkTask = make_shared<CheckTask>(shared_from_this()); + _timer->schedule(_checkTask, IceUtil::Time::seconds(_electionTimeout.count())); } } void NodeI::destroy() { - Lock sync(*this); + unique_lock<recursive_mutex> lock(_mutex); assert(!_destroy); while(_updateCounter > 0) { - wait(); + _condVar.wait(lock); } _destroy = true; - notifyAll(); + _condVar.notify_all(); // Cancel the timers. if(_checkTask) @@ -1083,10 +1061,10 @@ NodeI::destroy() // A node should only receive an observer init call if the node is // reorganizing and its not the coordinator. void -NodeI::checkObserverInit(Ice::Long /*generation*/) +NodeI::checkObserverInit(Ice::Long) { - Lock sync(*this); - if(_state != NodeStateReorganization) + lock_guard<recursive_mutex> lg(_mutex); + if(_state != NodeState::NodeStateReorganization) { throw ObserverInconsistencyException("init cannot block when state != NodeStateReorganization"); } @@ -1097,23 +1075,21 @@ NodeI::checkObserverInit(Ice::Long /*generation*/) } // Notify the node that we're about to start an update. -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> NodeI::startUpdate(Ice::Long& generation, const char* file, int line) { bool majority = _observers->check(); - Lock sync(*this); + unique_lock<recursive_mutex> lock(_mutex); // If we've actively replicating & lost the majority of our replicas then recover. - if(!_coordinatorProxy && !_destroy && _state == NodeStateNormal && !majority) + if(!_coordinatorProxy && !_destroy && _state == NodeState::NodeStateNormal && !majority) { recovery(); } - while(!_destroy && _state != NodeStateNormal) - { - wait(); - } + _condVar.wait(lock, [this] { return _destroy || _state == NodeState::NodeStateNormal; } ); + if(_destroy) { throw Ice::UnknownException(file, line); @@ -1127,11 +1103,11 @@ NodeI::startUpdate(Ice::Long& generation, const char* file, int line) } bool -NodeI::updateMaster(const char* /*file*/, int /*line*/) +NodeI::updateMaster(const char*, int) { bool majority = _observers->check(); - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); // If the node is destroyed, or is not a coordinator then we're // done. @@ -1141,13 +1117,13 @@ NodeI::updateMaster(const char* /*file*/, int /*line*/) } // If we've lost the majority of our replicas then recover. - if(_state == NodeStateNormal && !majority) + if(_state == NodeState::NodeStateNormal && !majority) { recovery(); } // If we're not replicating then we're done. - if(_state != NodeStateNormal) + if(_state != NodeState::NodeStateNormal) { return false; } @@ -1157,14 +1133,13 @@ NodeI::updateMaster(const char* /*file*/, int /*line*/) return true; } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> NodeI::startCachedRead(Ice::Long& generation, const char* file, int line) { - Lock sync(*this); - while(!_destroy && _state != NodeStateNormal) - { - wait(); - } + unique_lock<recursive_mutex> lock(_mutex); + + _condVar.wait(lock, [this] { return _destroy || _state == NodeState::NodeStateNormal; }); + if(_destroy) { throw Ice::UnknownException(file, line); @@ -1177,12 +1152,13 @@ NodeI::startCachedRead(Ice::Long& generation, const char* file, int line) void NodeI::startObserverUpdate(Ice::Long generation, const char* file, int line) { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); + if(_destroy) { throw Ice::UnknownException(file, line); } - if(_state != NodeStateNormal) + if(_state != NodeState::NodeStateNormal) { throw ObserverInconsistencyException("update called on inactive node"); } @@ -1200,13 +1176,13 @@ NodeI::startObserverUpdate(Ice::Long generation, const char* file, int line) void NodeI::finishUpdate() { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); assert(!_destroy); --_updateCounter; assert(_updateCounter >= 0); if(_updateCounter == 0) { - notifyAll(); + _condVar.notify_all(); } } @@ -1217,13 +1193,13 @@ stateToString(NodeState s) { switch(s) { - case NodeStateInactive: + case NodeState::NodeStateInactive: return "inactive"; - case NodeStateElection: + case NodeState::NodeStateElection: return "election"; - case NodeStateReorganization: + case NodeState::NodeStateReorganization: return "reorganization"; - case NodeStateNormal: + case NodeState::NodeStateNormal: return "normal"; } return "unknown"; @@ -1241,10 +1217,10 @@ NodeI::setState(NodeState s) out << "node " << _id << ": transition from " << stateToString(_state) << " to " << stateToString(s); } - _state = s; - if(_state == NodeStateNormal) + _state = move(s); + if(_state == NodeState::NodeStateNormal) { - notifyAll(); + _condVar.notify_all(); } } } |