summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/NodeI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/NodeI.cpp')
-rw-r--r--cpp/src/IceStorm/NodeI.cpp358
1 files changed, 167 insertions, 191 deletions
diff --git a/cpp/src/IceStorm/NodeI.cpp b/cpp/src/IceStorm/NodeI.cpp
index 48563f2f701..794b04761fb 100644
--- a/cpp/src/IceStorm/NodeI.cpp
+++ b/cpp/src/IceStorm/NodeI.cpp
@@ -16,11 +16,11 @@ namespace
class CheckTask : public IceUtil::TimerTask
{
- const NodeIPtr _node;
+ const shared_ptr<NodeI> _node;
public:
- CheckTask(const NodeIPtr& node) : _node(node) { }
+ CheckTask(shared_ptr<NodeI> node) : _node(move(node)) { }
virtual void runTimerTask()
{
_node->check();
@@ -29,12 +29,12 @@ public:
class MergeTask : public IceUtil::TimerTask
{
- const NodeIPtr _node;
+ const shared_ptr<NodeI> _node;
const set<int> _s;
public:
- MergeTask(const NodeIPtr& node, const set<int>& s) : _node(node), _s(s) { }
+ MergeTask(shared_ptr<NodeI> node, const set<int>& s) : _node(move(node)), _s(s) { }
virtual void runTimerTask()
{
_node->merge(_s);
@@ -43,11 +43,11 @@ public:
class MergeContinueTask : public IceUtil::TimerTask
{
- const NodeIPtr _node;
+ const shared_ptr<NodeI> _node;
public:
- MergeContinueTask(const NodeIPtr& node) : _node(node) { }
+ MergeContinueTask(shared_ptr<NodeI> node) : _node(move(node)) { }
virtual void runTimerTask()
{
_node->mergeContinue();
@@ -56,11 +56,11 @@ public:
class TimeoutTask: public IceUtil::TimerTask
{
- const NodeIPtr _node;
+ const shared_ptr<NodeI> _node;
public:
- TimeoutTask(const NodeIPtr& node) : _node(node) { }
+ TimeoutTask(shared_ptr<NodeI> node) : _node(move(node)) { }
virtual void runTimerTask()
{
_node->timeout();
@@ -81,8 +81,8 @@ GroupNodeInfo::GroupNodeInfo(int i) :
{
}
-GroupNodeInfo::GroupNodeInfo(int i, LogUpdate l, const Ice::ObjectPrx& o) :
- id(i), llu(l), observer(o)
+GroupNodeInfo::GroupNodeInfo(int i, LogUpdate l, shared_ptr<Ice::ObjectPrx> o) :
+ id(i), llu(l), observer(move(o))
{
}
@@ -98,34 +98,22 @@ GroupNodeInfo::operator==(const GroupNodeInfo& rhs) const
return id == rhs.id;
}
-//
-// COMPILER FIX: Clang using libc++ requires to define operator=
-//
-#if defined(__clang__) && defined(_LIBCPP_VERSION)
-GroupNodeInfo&
-GroupNodeInfo::operator=(const GroupNodeInfo& other)
-
-{
- const_cast<int&>(this->id) = other.id;
- const_cast<LogUpdate&>(this->llu) = other.llu;
- const_cast<Ice::ObjectPrx&>(this->observer) = other.observer;
- return *this;
-}
-#endif
-
namespace
{
-static IceUtil::Time
-getTimeout(const string& key, int def, const Ice::PropertiesPtr& properties, const TraceLevelsPtr& traceLevels)
+static chrono::seconds
+getTimeout(const shared_ptr<Instance> instance, const string& key, int def)
{
- int t = properties->getPropertyAsIntWithDefault(key, def);
- if(t < 0)
+ auto properties = instance->communicator()->getProperties();
+ auto traceLevels = instance->traceLevels();
+
+ auto t = chrono::seconds(properties->getPropertyAsIntWithDefault(key, def));
+ if(t < 0s)
{
Ice::Warning out(traceLevels->logger);
out << traceLevels->electionCat << ": " << key << " < 0; Adjusted to 1";
- t = 1;
+ t = 1s;
}
- return IceUtil::Time::seconds(t);
+ return t;
}
static string
@@ -147,37 +135,31 @@ toString(const set<int>& s)
}
-NodeI::NodeI(const InstancePtr& instance,
- const ReplicaPtr& replica,
- const Ice::ObjectPrx& replicaProxy,
- int id, const map<int, NodePrx>& nodes) :
+NodeI::NodeI(const shared_ptr<Instance>& instance,
+ shared_ptr<Replica> replica,
+ shared_ptr<Ice::ObjectPrx> replicaProxy,
+ int id, const map<int, shared_ptr<NodePrx>>& nodes) :
_timer(instance->timer()),
_traceLevels(instance->traceLevels()),
_observers(instance->observers()),
- _replica(replica),
- _replicaProxy(replicaProxy),
+ _replica(move(replica)),
+ _replicaProxy(move(replicaProxy)),
_id(id),
_nodes(nodes),
- _state(NodeStateInactive),
+ _masterTimeout(getTimeout(instance, instance->serviceName() + ".Election.MasterTimeout", 10)),
+ _electionTimeout(getTimeout(instance, instance->serviceName() + ".Election.ElectionTimeout", 10)),
+ _mergeTimeout(getTimeout(instance, instance->serviceName() + ".Election.ResponseTimeout", 10)),
+ _state(NodeState::NodeStateInactive),
_updateCounter(0),
_max(0),
_generation(-1),
_destroy(false)
{
- map<int, NodePrx> oneway;
- for(map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
+ for(const auto& node : _nodes)
{
- oneway[p->first] = NodePrx::uncheckedCast(p->second->ice_oneway());
+ auto prx = Ice::uncheckedCast<NodePrx>(node.second->ice_oneway());
+ const_cast<map<int, shared_ptr<NodePrx>>& >(_nodesOneway)[node.first] = move(prx);
}
- const_cast<map<int, NodePrx>& >(_nodesOneway) = oneway;
-
- Ice::PropertiesPtr properties = instance->communicator()->getProperties();
- const_cast<IceUtil::Time&>(_masterTimeout) = getTimeout(
- instance->serviceName() + ".Election.MasterTimeout", 10, properties, _traceLevels);
- const_cast<IceUtil::Time&>(_electionTimeout) = getTimeout(
- instance->serviceName() + ".Election.ElectionTimeout", 10, properties, _traceLevels);
- const_cast<IceUtil::Time&>(_mergeTimeout) = getTimeout(
- instance->serviceName() + ".Election.ResponseTimeout", 10, properties, _traceLevels);
}
void
@@ -204,9 +186,9 @@ NodeI::start()
// We use this lock to ensure that recovery is called before CheckTask
// is scheduled, even if timeout is 0
//
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
- _checkTask = new CheckTask(this);
+ _checkTask = make_shared<CheckTask>(shared_from_this());
_timer->schedule(_checkTask,
IceUtil::Time::seconds(static_cast<IceUtil::Int64>(_nodes.size() - static_cast<size_t>(_id)) * 2));
recovery();
@@ -216,17 +198,17 @@ void
NodeI::check()
{
{
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
if(_destroy)
{
return;
}
assert(!_mergeTask);
- if(_state == NodeStateElection || _state == NodeStateReorganization || _coord != _id)
+ if(_state == NodeState::NodeStateElection || _state == NodeState::NodeStateReorganization || _coord != _id)
{
assert(_checkTask);
- _timer->schedule(_checkTask, _electionTimeout);
+ _timer->schedule(_checkTask, IceUtil::Time::seconds(_electionTimeout.count()));
return;
}
@@ -236,15 +218,15 @@ NodeI::check()
_observers->getReapedSlaves(dead);
if(!dead.empty())
{
- for(vector<int>::const_iterator p = dead.begin(); p != dead.end(); ++p)
+ for(const auto& node : dead)
{
- set<GroupNodeInfo>::iterator q = _up.find(GroupNodeInfo(*p));
+ auto q = _up.find(GroupNodeInfo(node));
if(q != _up.end())
{
if(_traceLevels->election > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
- out << "node " << _id << ": reaping slave " << *p;
+ out << "node " << _id << ": reaping slave " << node;
}
_up.erase(q);
}
@@ -272,21 +254,21 @@ NodeI::check()
// See if other groups exist for possible merge.
set<int> tmpset;
int max = -1;
- for(map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
+ for(const auto& node : _nodes)
{
- if(p->first == _id)
+ if(node.first == _id)
{
continue;
}
try
{
- if(p->second->areYouCoordinator())
+ if(node.second->areYouCoordinator())
{
- if(p->first > max)
+ if(node.first > max)
{
- max = p->first;
+ max = node.first;
}
- tmpset.insert(p->first);
+ tmpset.insert(node.first);
}
}
catch(const Ice::Exception& ex)
@@ -294,18 +276,18 @@ NodeI::check()
if(_traceLevels->election > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
- out << "node " << _id << ": call on node " << p->first << " failed: " << ex;
+ out << "node " << _id << ": call on node " << node.first << " failed: " << ex;
}
}
}
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
// If the node state has changed while the mutex has been released
// then bail. We don't schedule a re-check since we're either
// destroyed in which case we're going to terminate or the end of
// the election/reorg will re-schedule the check.
- if(_destroy || _state == NodeStateElection || _state == NodeStateReorganization || _coord != _id)
+ if(_destroy || _state == NodeState::NodeStateElection || _state == NodeState::NodeStateReorganization || _coord != _id)
{
_checkTask = 0;
return;
@@ -316,7 +298,7 @@ NodeI::check()
if(tmpset.empty())
{
assert(_checkTask);
- _timer->schedule(_checkTask, _electionTimeout);
+ _timer->schedule(_checkTask, IceUtil::Time::seconds(_electionTimeout.count()));
return;
}
@@ -329,22 +311,21 @@ NodeI::check()
out << "node " << _id << ": highest priority node count: " << max;
}
- IceUtil::Time delay = IceUtil::Time::seconds(0);
+ chrono::seconds delay = 0s;
if(_id < max)
{
- // Reschedule timer proportial to p-i.
+ // Reschedule timer proportional to p-i.
delay = _mergeTimeout + _mergeTimeout * (max - _id);
if(_traceLevels->election > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
- out << "node " << _id << ": scheduling merge in " << delay.toDuration()
- << " seconds";
+ out << "node " << _id << ": scheduling merge in " << delay.count() << " seconds";
}
}
assert(!_mergeTask);
- _mergeTask = new MergeTask(this, tmpset);
- _timer->schedule(_mergeTask, delay);
+ _mergeTask = make_shared<MergeTask>(shared_from_this(), tmpset);
+ _timer->schedule(_mergeTask, IceUtil::Time::seconds(delay.count()));
}
// Called if the node has not heard from the coordinator in some time.
@@ -354,7 +335,7 @@ NodeI::timeout()
int myCoord;
string myGroup;
{
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
// If we're destroyed or we are our own coordinator then we're
// done.
if(_destroy || _coord == _id)
@@ -368,7 +349,7 @@ NodeI::timeout()
bool failed = false;
try
{
- map<int, NodePrx>::const_iterator p = _nodes.find(myCoord);
+ auto p = _nodes.find(myCoord);
assert(p != _nodes.end());
if(!p->second->areYouThere(myGroup, _id))
{
@@ -402,24 +383,26 @@ NodeI::merge(const set<int>& coordinatorSet)
set<int> invited;
string gp;
{
- Lock sync(*this);
+ unique_lock<recursive_mutex> lock(_mutex);
+
_mergeTask = 0;
// If the node is currently in an election, or reorganizing
// then we're done.
- if(_state == NodeStateElection || _state == NodeStateReorganization)
+ if(_state == NodeState::NodeStateElection || _state == NodeState::NodeStateReorganization)
{
return;
}
// This state change prevents this node from accepting
// invitations while the merge is executing.
- setState(NodeStateElection);
+ setState(NodeState::NodeStateElection);
// No more replica changes are permitted.
while(!_destroy && _updateCounter > 0)
{
- wait();
+ // The recursive mutex (_mutex) must only be locked once by this tread
+ _condVar.wait(lock);
}
if(_destroy)
{
@@ -437,9 +420,9 @@ NodeI::merge(const set<int>& coordinatorSet)
// Construct a set of node ids to invite. This is the union of
// _up and set of coordinators gathered in the check stage.
invited = coordinatorSet;
- for(set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
+ for(const auto& node : _up)
{
- invited.insert(p->id);
+ invited.insert(node.id);
}
_coord = _id;
@@ -462,7 +445,7 @@ NodeI::merge(const set<int>& coordinatorSet)
Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
out << "node " << _id << ": inviting node " << *p << " to group " << gp;
}
- map<int, NodePrx>::const_iterator node = _nodesOneway.find(*p);
+ auto node = _nodesOneway.find(*p);
assert(node != _nodesOneway.end());
node->second->invitation(_id, gp);
++p;
@@ -475,7 +458,7 @@ NodeI::merge(const set<int>& coordinatorSet)
// Now we wait for responses to our invitation.
{
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
if(_destroy)
{
return;
@@ -492,17 +475,17 @@ NodeI::merge(const set<int>& coordinatorSet)
// Schedule the mergeContinueTask.
assert(_mergeContinueTask == 0);
- _mergeContinueTask = new MergeContinueTask(this);
+ _mergeContinueTask = make_shared<MergeContinueTask>(shared_from_this());
// At this point we may have already accepted all of the
// invitations, if so then we want to schedule the
// mergeContinue immediately.
- IceUtil::Time timeout = _mergeTimeout;
+ chrono::seconds timeout = _mergeTimeout;
if(_up.size() == _nodes.size()-1 || _invitesIssued == _invitesAccepted)
{
- timeout = IceUtil::Time::seconds(0);
+ timeout = 0s;
}
- _timer->schedule(_mergeContinueTask, timeout);
+ _timer->schedule(_mergeContinueTask, IceUtil::Time::seconds(timeout.count()));
}
}
@@ -512,7 +495,7 @@ NodeI::mergeContinue()
string gp;
set<GroupNodeInfo> tmpSet;
{
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
if(_destroy)
{
return;
@@ -520,14 +503,14 @@ NodeI::mergeContinue()
// Copy variables for thread safety.
gp = _group;
- tmpSet = _up;
+ tmpSet = set<GroupNodeInfo>(_up);
assert(_mergeContinueTask);
_mergeContinueTask = 0;
// The node is now reorganizing.
- assert(_state == NodeStateElection);
- setState(NodeStateReorganization);
+ assert(_state == NodeState::NodeStateElection);
+ setState(NodeState::NodeStateReorganization);
if(_traceLevels->election > 0)
{
@@ -562,18 +545,18 @@ NodeI::mergeContinue()
// updates.
int maxid = -1;
LogUpdate maxllu = { -1, 0 };
- for(set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
+ for(const auto& n : tmpSet)
{
if(_traceLevels->election > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
- out << "node id=" << p->id << " llu=" << p->llu.generation << "/" << p->llu.iteration;
+ out << "node id=" << n.id << " llu=" << n.llu.generation << "/" << n.llu.iteration;
}
- if(p->llu.generation > maxllu.generation ||
- (p->llu.generation == maxllu.generation && p->llu.iteration > maxllu.iteration))
+ if(n.llu.generation > maxllu.generation ||
+ (n.llu.generation == maxllu.generation && n.llu.iteration > maxllu.iteration))
{
- maxid = p->id;
- maxllu = p->llu;
+ maxid = n.id;
+ maxllu = n.llu;
}
}
@@ -596,7 +579,7 @@ NodeI::mergeContinue()
}
try
{
- map<int, NodePrx>::const_iterator node = _nodes.find(maxid);
+ auto node = _nodes.find(maxid);
assert(node != _nodes.end());
_replica->sync(node->second->sync());
}
@@ -625,7 +608,7 @@ NodeI::mergeContinue()
// state, as such we can set our _max flag.
unsigned int max = static_cast<unsigned int>(tmpSet.size()) + 1;
{
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
if(max > _max)
{
_max = max;
@@ -655,11 +638,11 @@ NodeI::mergeContinue()
}
// Tell each node to go.
- for(set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
+ for(const auto& n : tmpSet)
{
try
{
- map<int, NodePrx>::const_iterator node = _nodes.find(p->id);
+ auto node = _nodes.find(n.id);
assert(node != _nodes.end());
node->second->ready(_id, gp, _replicaProxy, static_cast<Ice::Int>(max), maxllu.generation);
}
@@ -668,7 +651,7 @@ NodeI::mergeContinue()
if(_traceLevels->election > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
- out << "node " << _id << ": error calling ready on " << p->id << " ex: " << ex;
+ out << "node " << _id << ": error calling ready on " << n.id << " ex: " << ex;
}
recovery();
return;
@@ -676,7 +659,7 @@ NodeI::mergeContinue()
}
{
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
if(_destroy)
{
return;
@@ -688,19 +671,19 @@ NodeI::mergeContinue()
out << "replication commencing with " << _up.size()+1 << "/" << _nodes.size()
<< " nodes with llu generation: " << maxllu.generation;
}
- setState(NodeStateNormal);
+ setState(NodeState::NodeStateNormal);
_coordinatorProxy = 0;
_generation = maxllu.generation;
assert(!_checkTask);
- _checkTask = new CheckTask(this);
- _timer->schedule(_checkTask, _electionTimeout);
+ _checkTask = make_shared<CheckTask>(shared_from_this());
+ _timer->schedule(_checkTask, IceUtil::Time::seconds(_electionTimeout.count()));
}
}
void
-NodeI::invitation(int j, const string& gn, const Ice::Current&)
+NodeI::invitation(int j, string gn, const Ice::Current&)
{
if(_traceLevels->election > 0)
{
@@ -720,14 +703,14 @@ NodeI::invitation(int j, const string& gn, const Ice::Current&)
int max = -1;
set<GroupNodeInfo> tmpSet;
{
- Lock sync(*this);
+ unique_lock<recursive_mutex> lock(_mutex);
if(_destroy)
{
return;
}
// If we're in the election or reorg state a merge has already
// started, so ignore the invitation.
- if(_state == NodeStateElection || _state == NodeStateReorganization)
+ if(_state == NodeState::NodeStateElection || _state == NodeState::NodeStateReorganization)
{
if(_traceLevels->election > 0)
{
@@ -751,18 +734,18 @@ NodeI::invitation(int j, const string& gn, const Ice::Current&)
// The merge task is cleared in the merge. This
// ensures two invitations cannot cause a race with
// the merge.
- //_mergeTask = 0;
return;
}
- _mergeTask = 0;
+ _mergeTask = nullptr;
}
// We're now joining with another group. If we are active we
// must stop serving as a master or slave.
- setState(NodeStateElection);
+ setState(NodeState::NodeStateElection);
while(!_destroy && _updateCounter > 0)
{
- wait();
+ // The recursive mutex (_mutex) must only be locked once by this tread
+ _condVar.wait(lock);
}
if(_destroy)
{
@@ -770,7 +753,7 @@ NodeI::invitation(int j, const string& gn, const Ice::Current&)
}
tmpCoord = _coord;
- tmpSet = _up;
+ tmpSet = set<GroupNodeInfo>(_up);
_coord = j;
_group = gn;
@@ -780,14 +763,14 @@ NodeI::invitation(int j, const string& gn, const Ice::Current&)
Ice::IntSeq forwardedInvites;
if(tmpCoord == _id) // Forward invitation to my old members.
{
- for(set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
+ for(const auto& n : tmpSet)
{
try
{
- map<int, NodePrx>::const_iterator node = _nodesOneway.find(p->id);
+ auto node = _nodesOneway.find(n.id);
assert(node != _nodesOneway.end());
node->second->invitation(j, gn);
- forwardedInvites.push_back(p->id);
+ forwardedInvites.push_back(n.id);
}
catch(const Ice::Exception&)
{
@@ -800,23 +783,24 @@ NodeI::invitation(int j, const string& gn, const Ice::Current&)
// everything is fine. Setting the state *after* calling accept
// can cause a race.
{
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
if(_destroy)
{
return;
}
- assert(_state == NodeStateElection);
- setState(NodeStateReorganization);
+
+ assert(_state == NodeState::NodeStateElection);
+ setState(NodeState::NodeStateReorganization);
if(!_timeoutTask)
{
- _timeoutTask = new TimeoutTask(this);
- _timer->scheduleRepeated(_timeoutTask, _masterTimeout);
+ _timeoutTask = make_shared<TimeoutTask>(shared_from_this());
+ _timer->scheduleRepeated(_timeoutTask, IceUtil::Time::seconds(_masterTimeout.count()));
}
}
try
{
- map<int, NodePrx>::const_iterator node = _nodesOneway.find(j);
+ auto node = _nodesOneway.find(j);
assert(node != _nodesOneway.end());
node->second->accept(_id, gn, forwardedInvites, _replica->getObserver(), _replica->getLastLogUpdate(), max);
}
@@ -828,11 +812,11 @@ NodeI::invitation(int j, const string& gn, const Ice::Current&)
}
void
-NodeI::ready(int j, const string& gn, const Ice::ObjectPrx& coordinator, int max, Ice::Long generation,
+NodeI::ready(int j, string gn, shared_ptr<Ice::ObjectPrx> coordinator, int max, Ice::Long generation,
const Ice::Current&)
{
- Lock sync(*this);
- if(!_destroy && _state == NodeStateReorganization && _group == gn)
+ lock_guard<recursive_mutex> lg(_mutex);
+ if(!_destroy && _state == NodeState::NodeStateReorganization && _group == gn)
{
// The coordinator must be j (this was set in the invitation).
if(_coord != j)
@@ -859,20 +843,20 @@ NodeI::ready(int j, const string& gn, const Ice::ObjectPrx& coordinator, int max
// Activate the replica here since the replica is now ready
// for duty.
- setState(NodeStateNormal);
+ setState(NodeState::NodeStateNormal);
_coordinatorProxy = coordinator;
if(!_checkTask)
{
- _checkTask = new CheckTask(this);
- _timer->schedule(_checkTask, _electionTimeout);
+ _checkTask = make_shared<CheckTask>(shared_from_this());
+ _timer->schedule(_checkTask, IceUtil::Time::seconds(_electionTimeout.count()));
}
}
}
void
-NodeI::accept(int j, const string& gn, const Ice::IntSeq& forwardedInvites, const Ice::ObjectPrx& observer,
- const LogUpdate& llu, int max, const Ice::Current&)
+NodeI::accept(int j, string gn, Ice::IntSeq forwardedInvites, shared_ptr<Ice::ObjectPrx> observer, LogUpdate llu,
+ int max, const Ice::Current&)
{
// Verify that j exists in our node set.
if(_nodes.find(j) == _nodes.end())
@@ -882,8 +866,8 @@ NodeI::accept(int j, const string& gn, const Ice::IntSeq& forwardedInvites, cons
return;
}
- Lock sync(*this);
- if(!_destroy && _state == NodeStateElection && _group == gn && _coord == _id)
+ lock_guard<recursive_mutex> lg(_mutex);
+ if(!_destroy && _state == NodeState::NodeStateElection && _group == gn && _coord == _id)
{
_up.insert(GroupNodeInfo(j, llu, observer));
@@ -928,7 +912,7 @@ NodeI::accept(int j, const string& gn, const Ice::IntSeq& forwardedInvites, cons
// merge continue immediately. Otherwise, we let the existing
// merge() schedule continue.
if((_up.size() == _nodes.size()-1 || _invitesIssued == _invitesAccepted) &&
- _mergeContinueTask && _timer->cancel(_mergeContinueTask))
+ _mergeContinueTask && _timer->cancel(_mergeContinueTask))
{
_timer->schedule(_mergeContinueTask, IceUtil::Time::seconds(0));
}
@@ -938,18 +922,18 @@ NodeI::accept(int j, const string& gn, const Ice::IntSeq& forwardedInvites, cons
bool
NodeI::areYouCoordinator(const Ice::Current&) const
{
- Lock sync(*this);
- return _state != NodeStateElection && _state != NodeStateReorganization && _coord == _id;
+ lock_guard<recursive_mutex> lg(_mutex);
+ return _state != NodeState::NodeStateElection && _state != NodeState::NodeStateReorganization && _coord == _id;
}
bool
-NodeI::areYouThere(const string& gn, int j, const Ice::Current&) const
+NodeI::areYouThere(string gn, int j, const Ice::Current&) const
{
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
return _group == gn && _coord == _id && _up.find(GroupNodeInfo(j)) != _up.end();
}
-Ice::ObjectPrx
+shared_ptr<Ice::ObjectPrx>
NodeI::sync(const Ice::Current&) const
{
return _replica->getSync();
@@ -959,12 +943,9 @@ NodeInfoSeq
NodeI::nodes(const Ice::Current&) const
{
NodeInfoSeq seq;
- for(map<int, NodePrx>::const_iterator q = _nodes.begin(); q != _nodes.end(); ++q)
+ for(const auto& n : _nodes)
{
- NodeInfo ni;
- ni.id = q->first;
- ni.n = q->second;
- seq.push_back(ni);
+ seq.push_back({ n.first, n.second });
}
return seq;
@@ -973,21 +954,18 @@ NodeI::nodes(const Ice::Current&) const
QueryInfo
NodeI::query(const Ice::Current&) const
{
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
QueryInfo info;
info.id = _id;
info.coord = _coord;
info.group = _group;
info.replica = _replicaProxy;
info.state = _state;
- info.max = static_cast<Ice::Int>(_max);
+ info.max = static_cast<int>(_max);
- for(set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
+ for(const auto& gni : _up)
{
- GroupInfo gi;
- gi.id = p->id;
- gi.llu = p->llu;
- info.up.push_back(gi);
+ info.up.push_back( { gni.id, gni.llu });
}
return info;
@@ -996,7 +974,7 @@ NodeI::query(const Ice::Current&) const
void
NodeI::recovery(Ice::Long generation)
{
- Lock sync(*this);
+ unique_lock<recursive_mutex> lock(_mutex);
// Ignore the recovery if the node has already advanced a
// generation.
@@ -1005,10 +983,10 @@ NodeI::recovery(Ice::Long generation)
return;
}
- setState(NodeStateInactive);
+ setState(NodeState::NodeStateInactive);
while(!_destroy && _updateCounter > 0)
{
- wait();
+ _condVar.wait(lock);
}
if(_destroy)
{
@@ -1042,23 +1020,23 @@ NodeI::recovery(Ice::Long generation)
}
if(!_checkTask)
{
- _checkTask = new CheckTask(this);
- _timer->schedule(_checkTask, _electionTimeout);
+ _checkTask = make_shared<CheckTask>(shared_from_this());
+ _timer->schedule(_checkTask, IceUtil::Time::seconds(_electionTimeout.count()));
}
}
void
NodeI::destroy()
{
- Lock sync(*this);
+ unique_lock<recursive_mutex> lock(_mutex);
assert(!_destroy);
while(_updateCounter > 0)
{
- wait();
+ _condVar.wait(lock);
}
_destroy = true;
- notifyAll();
+ _condVar.notify_all();
// Cancel the timers.
if(_checkTask)
@@ -1083,10 +1061,10 @@ NodeI::destroy()
// A node should only receive an observer init call if the node is
// reorganizing and its not the coordinator.
void
-NodeI::checkObserverInit(Ice::Long /*generation*/)
+NodeI::checkObserverInit(Ice::Long)
{
- Lock sync(*this);
- if(_state != NodeStateReorganization)
+ lock_guard<recursive_mutex> lg(_mutex);
+ if(_state != NodeState::NodeStateReorganization)
{
throw ObserverInconsistencyException("init cannot block when state != NodeStateReorganization");
}
@@ -1097,23 +1075,21 @@ NodeI::checkObserverInit(Ice::Long /*generation*/)
}
// Notify the node that we're about to start an update.
-Ice::ObjectPrx
+shared_ptr<Ice::ObjectPrx>
NodeI::startUpdate(Ice::Long& generation, const char* file, int line)
{
bool majority = _observers->check();
- Lock sync(*this);
+ unique_lock<recursive_mutex> lock(_mutex);
// If we've actively replicating & lost the majority of our replicas then recover.
- if(!_coordinatorProxy && !_destroy && _state == NodeStateNormal && !majority)
+ if(!_coordinatorProxy && !_destroy && _state == NodeState::NodeStateNormal && !majority)
{
recovery();
}
- while(!_destroy && _state != NodeStateNormal)
- {
- wait();
- }
+ _condVar.wait(lock, [this] { return _destroy || _state == NodeState::NodeStateNormal; } );
+
if(_destroy)
{
throw Ice::UnknownException(file, line);
@@ -1127,11 +1103,11 @@ NodeI::startUpdate(Ice::Long& generation, const char* file, int line)
}
bool
-NodeI::updateMaster(const char* /*file*/, int /*line*/)
+NodeI::updateMaster(const char*, int)
{
bool majority = _observers->check();
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
// If the node is destroyed, or is not a coordinator then we're
// done.
@@ -1141,13 +1117,13 @@ NodeI::updateMaster(const char* /*file*/, int /*line*/)
}
// If we've lost the majority of our replicas then recover.
- if(_state == NodeStateNormal && !majority)
+ if(_state == NodeState::NodeStateNormal && !majority)
{
recovery();
}
// If we're not replicating then we're done.
- if(_state != NodeStateNormal)
+ if(_state != NodeState::NodeStateNormal)
{
return false;
}
@@ -1157,14 +1133,13 @@ NodeI::updateMaster(const char* /*file*/, int /*line*/)
return true;
}
-Ice::ObjectPrx
+shared_ptr<Ice::ObjectPrx>
NodeI::startCachedRead(Ice::Long& generation, const char* file, int line)
{
- Lock sync(*this);
- while(!_destroy && _state != NodeStateNormal)
- {
- wait();
- }
+ unique_lock<recursive_mutex> lock(_mutex);
+
+ _condVar.wait(lock, [this] { return _destroy || _state == NodeState::NodeStateNormal; });
+
if(_destroy)
{
throw Ice::UnknownException(file, line);
@@ -1177,12 +1152,13 @@ NodeI::startCachedRead(Ice::Long& generation, const char* file, int line)
void
NodeI::startObserverUpdate(Ice::Long generation, const char* file, int line)
{
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
+
if(_destroy)
{
throw Ice::UnknownException(file, line);
}
- if(_state != NodeStateNormal)
+ if(_state != NodeState::NodeStateNormal)
{
throw ObserverInconsistencyException("update called on inactive node");
}
@@ -1200,13 +1176,13 @@ NodeI::startObserverUpdate(Ice::Long generation, const char* file, int line)
void
NodeI::finishUpdate()
{
- Lock sync(*this);
+ lock_guard<recursive_mutex> lg(_mutex);
assert(!_destroy);
--_updateCounter;
assert(_updateCounter >= 0);
if(_updateCounter == 0)
{
- notifyAll();
+ _condVar.notify_all();
}
}
@@ -1217,13 +1193,13 @@ stateToString(NodeState s)
{
switch(s)
{
- case NodeStateInactive:
+ case NodeState::NodeStateInactive:
return "inactive";
- case NodeStateElection:
+ case NodeState::NodeStateElection:
return "election";
- case NodeStateReorganization:
+ case NodeState::NodeStateReorganization:
return "reorganization";
- case NodeStateNormal:
+ case NodeState::NodeStateNormal:
return "normal";
}
return "unknown";
@@ -1241,10 +1217,10 @@ NodeI::setState(NodeState s)
out << "node " << _id << ": transition from " << stateToString(_state) << " to "
<< stateToString(s);
}
- _state = s;
- if(_state == NodeStateNormal)
+ _state = move(s);
+ if(_state == NodeState::NodeStateNormal)
{
- notifyAll();
+ _condVar.notify_all();
}
}
}