summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/NodeI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/NodeI.cpp')
-rw-r--r--cpp/src/IceStorm/NodeI.cpp1246
1 files changed, 1246 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/NodeI.cpp b/cpp/src/IceStorm/NodeI.cpp
new file mode 100644
index 00000000000..b0ff4bb1bb5
--- /dev/null
+++ b/cpp/src/IceStorm/NodeI.cpp
@@ -0,0 +1,1246 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceStorm/NodeI.h>
+#include <IceStorm/Observers.h>
+#include <IceStorm/TraceLevels.h>
+
+using namespace IceStorm;
+using namespace IceStormElection;
+using namespace std;
+
+namespace
+{
+
+bool operator==(const GroupNodeInfo& info, int id)
+{
+ return info.id == id;
+}
+
+class CheckTask : public IceUtil::TimerTask
+{
+ const NodeIPtr _node;
+
+public:
+
+ CheckTask(const NodeIPtr& node) : _node(node) { }
+ virtual void runTimerTask()
+ {
+ _node->check();
+ }
+};
+
+class MergeTask : public IceUtil::TimerTask
+{
+ const NodeIPtr _node;
+ const set<int> _s;
+
+public:
+
+ MergeTask(const NodeIPtr& node, const set<int>& s) : _node(node), _s(s) { }
+ virtual void runTimerTask()
+ {
+ _node->merge(_s);
+ }
+};
+
+class MergeContinueTask : public IceUtil::TimerTask
+{
+ const NodeIPtr _node;
+
+public:
+
+ MergeContinueTask(const NodeIPtr& node) : _node(node) { }
+ virtual void runTimerTask()
+ {
+ _node->mergeContinue();
+ }
+};
+
+class TimeoutTask: public IceUtil::TimerTask
+{
+ const NodeIPtr _node;
+
+public:
+
+ TimeoutTask(const NodeIPtr& node) : _node(node) { }
+ virtual void runTimerTask()
+ {
+ _node->timeout();
+ }
+};
+
+}
+
+namespace
+{
+
+LogUpdate emptyLU = {0, 0};
+
+}
+
+GroupNodeInfo::GroupNodeInfo(int i) :
+ id(i), llu(emptyLU)
+{
+}
+
+GroupNodeInfo::GroupNodeInfo(int i, LogUpdate l, const Ice::ObjectPrx& o) :
+ id(i), llu(l), observer(o)
+{
+}
+
+bool
+GroupNodeInfo::operator<(const GroupNodeInfo& rhs) const
+{
+ return id < rhs.id;
+}
+
+bool
+GroupNodeInfo::operator==(const GroupNodeInfo& rhs) const
+{
+ return id == rhs.id;
+}
+
+Replica::~Replica()
+{
+ //cout << "~Replica" << endl;
+}
+
+namespace
+{
+static IceUtil::Time
+getTimeout(const string& key, int def, const Ice::PropertiesPtr& properties, const TraceLevelsPtr& traceLevels)
+{
+ int t = properties->getPropertyAsIntWithDefault(key, def);
+ if(t < 0)
+ {
+ Ice::Warning out(traceLevels->logger);
+ out << traceLevels->electionCat << ": " << key << " < 0; Adjusted to 1";
+ t = 1;
+ }
+ return IceUtil::Time::seconds(t);
+}
+
+static string
+toString(const set<int>& s)
+{
+ ostringstream os;
+ os << "(";
+ for(set<int>::const_iterator p = s.begin(); p != s.end(); ++p)
+ {
+ if(p != s.begin())
+ {
+ os << ",";
+ }
+ os << *p;
+ }
+ os << ")";
+ return os.str();
+}
+
+}
+
+NodeI::NodeI(const InstancePtr& instance,
+ const ReplicaPtr& replica,
+ const Ice::ObjectPrx& replicaProxy,
+ int id, const map<int, NodePrx>& nodes) :
+ _timer(instance->timer()),
+ _traceLevels(instance->traceLevels()),
+ _observers(instance->observers()),
+ _replica(replica),
+ _replicaProxy(replicaProxy),
+ _id(id),
+ _nodes(nodes),
+ _state(NodeStateInactive),
+ _updateCounter(0),
+ _max(0),
+ _generation(-1),
+ _destroy(false)
+{
+ map<int, NodePrx> oneway;
+ for(map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
+ {
+ oneway[p->first] = NodePrx::uncheckedCast(p->second->ice_oneway());
+ }
+ const_cast<map<int, NodePrx>& >(_nodesOneway) = oneway;
+
+ Ice::PropertiesPtr properties = instance->communicator()->getProperties();
+ const_cast<IceUtil::Time&>(_masterTimeout) = getTimeout(
+ instance->serviceName() + ".Election.MasterTimeout", 10, properties, _traceLevels);
+ const_cast<IceUtil::Time&>(_electionTimeout) = getTimeout(
+ instance->serviceName() + ".Election.ElectionTimeout", 10, properties, _traceLevels);
+ const_cast<IceUtil::Time&>(_mergeTimeout) = getTimeout(
+ instance->serviceName() + ".Election.ResponseTimeout", 10, properties, _traceLevels);
+}
+
+NodeI::~NodeI()
+{
+ //cout << "~NodeI" << endl;
+}
+
+void
+NodeI::start()
+{
+ // As an optimization we want the initial election to occur as
+ // soon as possible.
+ //
+ // However, if we have the node trigger the election immediately
+ // upon startup then we'll have a clash with lower priority nodes
+ // starting an election denying a higher priority node the
+ // opportunity to start the election that results in it becoming
+ // the leader. Of course, things will eventually reach a stable
+ // state but it will take longer.
+ //
+ // As such as we schedule the initial election check inversely
+ // proportional to our priority.
+ //
+ // By setting _checkTask first we stop recovery() from setting it
+ // to the regular election interval.
+ //
+ _checkTask = new CheckTask(this);
+ _timer->schedule(_checkTask, IceUtil::Time::seconds((_nodes.size() - _id) * 2));
+ recovery();
+}
+
+void
+NodeI::check()
+{
+ {
+ Lock sync(*this);
+ if(_destroy)
+ {
+ return;
+ }
+ assert(!_mergeTask);
+
+ if(_state == NodeStateElection || _state == NodeStateReorganization || _coord != _id)
+ {
+ assert(_checkTask);
+ _timer->schedule(_checkTask, _electionTimeout);
+ return;
+ }
+
+ // Next get the set of nodes that were detected as unreachable
+ // from the replica and remove them from our slave list.
+ vector<int> dead;
+ _observers->getReapedSlaves(dead);
+ if(!dead.empty())
+ {
+ for(vector<int>::const_iterator p = dead.begin(); p != dead.end(); ++p)
+ {
+ set<GroupNodeInfo>::iterator q = _up.find(GroupNodeInfo(*p));
+ if(q != _up.end())
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": reaping slave " << *p;
+ }
+ _up.erase(q);
+ }
+ }
+
+ // If we no longer have the majority of the nodes under our
+ // care then we need to stop our replica.
+ if(_up.size() < _nodes.size()/2)
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": stopping replica";
+ }
+ // Clear _checkTask -- recovery() will reset the
+ // timer.
+ assert(_checkTask);
+ _checkTask = 0;
+ recovery();
+ return;
+ }
+ }
+ }
+
+ // See if other groups exist for possible merge.
+ set<int> tmpset;
+ int max = -1;
+ for(map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
+ {
+ if(p->first == _id)
+ {
+ continue;
+ }
+ try
+ {
+ if(p->second->areYouCoordinator())
+ {
+ if(p->first > max)
+ {
+ max = p->first;
+ }
+ tmpset.insert(p->first);
+ }
+ }
+ catch(const Ice::Exception& ex)
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": call on node " << p->first << " failed: " << ex;
+ }
+ }
+ }
+
+ Lock sync(*this);
+
+ // If the node state has changed while the mutex has been released
+ // then bail. We don't schedule a re-check since we're either
+ // destroyed in which case we're going to terminate or the end of
+ // the election/reorg will re-schedule the check.
+ if(_destroy || _state == NodeStateElection || _state == NodeStateReorganization || _coord != _id)
+ {
+ return;
+ }
+
+ // If we didn't find any coordinators then we're done. Reschedule
+ // the next check and terminate.
+ if(tmpset.empty())
+ {
+ assert(_checkTask);
+ _timer->schedule(_checkTask, _electionTimeout);
+ return;
+ }
+
+ // _checkTask == 0 means that the check isn't scheduled.
+ _checkTask = 0;
+
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": highest priority node count: " << max;
+ }
+
+ IceUtil::Time delay = IceUtil::Time::seconds(0);
+ if(_id < max)
+ {
+ // Reschedule timer proportial to p-i.
+ delay = _mergeTimeout + _mergeTimeout * (max - _id);
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": scheduling merge in " << delay.toDuration()
+ << " seconds";
+ }
+ }
+
+ assert(!_mergeTask);
+ _mergeTask = new MergeTask(this, tmpset);
+ _timer->schedule(_mergeTask, delay);
+}
+
+// Called if the node has not heard from the coordinator in some time.
+void
+NodeI::timeout()
+{
+ int myCoord;
+ string myGroup;
+ {
+ Lock sync(*this);
+ // If we're destroyed or we are our own coordinator then we're
+ // done.
+ if(_destroy || _coord == _id)
+ {
+ return;
+ }
+ myCoord = _coord;
+ myGroup = _group;
+ }
+
+ bool failed = false;
+ try
+ {
+ map<int, NodePrx>::const_iterator p = _nodes.find(myCoord);
+ assert(p != _nodes.end());
+ if(!p->second->areYouThere(myGroup, _id))
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": lost connection to coordinator " << myCoord
+ << ": areYouThere returned false";
+ }
+ failed = true;
+ }
+ }
+ catch(const Ice::Exception& ex)
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": lost connection to coordinator " << myCoord << ": " << ex;
+ }
+ failed = true;
+ }
+ if(failed)
+ {
+ recovery();
+ }
+}
+
+void
+NodeI::merge(const set<int>& coordinatorSet)
+{
+ set<int> invited;
+ string gp;
+ {
+ Lock sync(*this);
+ _mergeTask = 0;
+
+ // If the node is currently in an election, or reorganizing
+ // then we're done.
+ if(_state == NodeStateElection || _state == NodeStateReorganization)
+ {
+ return;
+ }
+
+ // This state change prevents this node from accepting
+ // invitations while the merge is executing.
+ setState(NodeStateElection);
+
+ // No more replica changes are permitted.
+ while(!_destroy && _updateCounter > 0)
+ {
+ wait();
+ }
+ if(_destroy)
+ {
+ return;
+ }
+
+ ostringstream os;
+ os << _id << ":" << IceUtil::generateUUID();
+ _group = os.str();
+ gp = _group;
+
+ _invitesAccepted.clear();
+ _invitesIssued.clear();
+
+ // Construct a set of node ids to invite. This is the union of
+ // _up and set of coordinators gathered in the check stage.
+ invited = coordinatorSet;
+ for(set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
+ {
+ invited.insert(p->id);
+ }
+
+ _coord = _id;
+ _up.clear();
+
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": inviting " << toString(invited) << " to group " << _group;
+ }
+ }
+
+ set<int>::iterator p = invited.begin();
+ while(p != invited.end())
+ {
+ try
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": inviting node " << *p << " to group " << gp;
+ }
+ map<int, NodePrx>::const_iterator node = _nodesOneway.find(*p);
+ assert(node != _nodesOneway.end());
+ node->second->invitation(_id, gp);
+ ++p;
+ }
+ catch(const Ice::Exception&)
+ {
+ invited.erase(p++);
+ }
+ }
+
+ // Now we wait for responses to our invitation.
+ {
+ Lock sync(*this);
+ if(_destroy)
+ {
+ return;
+ }
+
+ // Add each of the invited nodes in the invites issed set.
+ _invitesIssued.insert(invited.begin(), invited.end());
+
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": invites pending: " << toString(_invitesIssued);
+ }
+
+ // Schedule the mergeContinueTask.
+ assert(_mergeContinueTask == 0);
+ _mergeContinueTask = new MergeContinueTask(this);
+
+ // At this point we may have already accepted all of the
+ // invitations, if so then we want to schedule the
+ // mergeContinue immediately.
+ IceUtil::Time timeout = _mergeTimeout;
+ if(_up.size() == _nodes.size()-1 || _invitesIssued == _invitesAccepted)
+ {
+ timeout = IceUtil::Time::seconds(0);
+ }
+ _timer->schedule(_mergeContinueTask, timeout);
+ }
+}
+
+void
+NodeI::mergeContinue()
+{
+ string gp;
+ set<GroupNodeInfo> tmpSet;
+ {
+ Lock sync(*this);
+ if(_destroy)
+ {
+ return;
+ }
+
+ // Copy variables for thread safety.
+ gp = _group;
+ tmpSet = _up;
+
+ assert(_mergeContinueTask);
+ _mergeContinueTask = 0;
+
+ // The node is now reorganizing.
+ assert(_state == NodeStateElection);
+ setState(NodeStateReorganization);
+
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": coordinator for " << (tmpSet.size() +1) << " nodes (including myself)";
+ }
+
+ // Now we need to decide whether we can start serving content. If
+ // we're on initial startup then we need all nodes to participate
+ // in the election. If we're running a subsequent election then we
+ // need a majority of the nodes to be active in order to start
+ // running.
+ unsigned int ingroup = tmpSet.size();
+ if(_max != _nodes.size() && ingroup != _nodes.size() -1 || ingroup < _nodes.size()/2)
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": not enough nodes " << (ingroup+1) << "/" << _nodes.size()
+ << " for replication to commence";
+ if(_max != _nodes.size())
+ {
+ out << " (require full participation for startup)";
+ }
+ }
+ recovery();
+ return;
+ }
+ }
+
+ // Find out who has the highest available set of database
+ // updates.
+ int maxid = -1;
+ LogUpdate maxllu = { -1, 0 };
+ set<GroupNodeInfo>::const_iterator p;
+ for(p = tmpSet.begin(); p != tmpSet.end(); ++p)
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node id=" << p->id << " llu=" << p->llu.generation << "/" << p->llu.iteration;
+ }
+ if(p->llu.generation > maxllu.generation ||
+ (p->llu.generation == maxllu.generation && p->llu.iteration > maxllu.iteration))
+ {
+ maxid = p->id;
+ maxllu = p->llu;
+ }
+ }
+
+ LogUpdate myLlu = _replica->getLastLogUpdate();
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node id=" << _id << " llu=" << myLlu.generation << "/" << myLlu.iteration;
+ }
+
+ // If its not us then we have to get the latest database data from
+ // the replica with the latest set.
+ //if(maxllu > _replica->getLastLogUpdate())
+ if(maxllu > myLlu)
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": syncing database state with node " << maxid;
+ }
+ try
+ {
+ map<int, NodePrx>::const_iterator node = _nodes.find(maxid);
+ assert(node != _nodes.end());
+ _replica->sync(node->second->sync());
+ }
+ catch(const Ice::Exception& ex)
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": syncing database state with node "
+ << maxid << " failed: " << ex;
+ }
+ recovery();
+ return;
+ }
+ }
+ else
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": I have the latest database state.";
+ }
+ }
+
+ // At this point we've ensured that we have the latest database
+ // state, as such we can set our _max flag.
+ unsigned int max = tmpSet.size() + 1;
+ {
+ Lock sync(*this);
+ if(max > _max)
+ {
+ _max = max;
+ }
+ max = _max;
+ }
+
+ // Prepare the LogUpdate for this generation.
+ maxllu.generation++;
+ maxllu.iteration = 0;
+
+ try
+ {
+ // Tell the replica that it is now the master with the given
+ // set of slaves and llu generation.
+ _replica->initMaster(tmpSet, maxllu);
+ }
+ catch(const Ice::Exception& ex)
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": initMaster failed: " << ex;
+ }
+ recovery();
+ return;
+ }
+
+ // Tell each node to go.
+ for(p = tmpSet.begin(); p != tmpSet.end(); ++p)
+ {
+ try
+ {
+ map<int, NodePrx>::const_iterator node = _nodes.find(p->id);
+ assert(node != _nodes.end());
+ node->second->ready(_id, gp, _replicaProxy, max, maxllu.generation);
+ }
+ catch(const Ice::Exception& ex)
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": error calling ready on " << p->id << " ex: " << ex;
+ }
+ recovery();
+ return;
+ }
+ }
+
+ {
+ Lock sync(*this);
+ if(_destroy)
+ {
+ return;
+ }
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": reporting for duty in group " << _group << " as coordinator. ";
+ out << "replication commencing with " << _up.size()+1 << "/" << _nodes.size()
+ << " nodes with llu generation: " << maxllu.generation;
+ }
+ setState(NodeStateNormal);
+ _coordinatorProxy = 0;
+
+ _generation = maxllu.generation;
+
+ assert(!_checkTask);
+ _checkTask = new CheckTask(this);
+ _timer->schedule(_checkTask, _electionTimeout);
+ }
+}
+
+void
+NodeI::invitation(int j, const string& gn, const Ice::Current&)
+{
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": invitation from " << j << " to group " << gn;
+ }
+
+ // Verify that j exists in our node set.
+ if(_nodes.find(j) == _nodes.end())
+ {
+ Ice::Warning warn(_traceLevels->logger);
+ warn << _traceLevels->electionCat << ": ignoring invitation from unknown node " << j;
+ return;
+ }
+
+ int tmpCoord = -1;
+ int max = -1;
+ set<GroupNodeInfo> tmpSet;
+ {
+ Lock sync(*this);
+ if(_destroy)
+ {
+ return;
+ }
+ // If we're in the election or reorg state a merge has already
+ // started, so ignore the invitation.
+ if(_state == NodeStateElection || _state == NodeStateReorganization)
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": invitation ignored";
+ }
+ return;
+ }
+
+ //
+ // Upon receipt of an invitation we cancel any pending merge
+ // task.
+ //
+ if(_mergeTask)
+ {
+ // If the timer doesn't cancel it means that the timer has
+ // fired and the merge is currently in-progress in which
+ // case we should reject the invitation.
+ if(!_timer->cancel(_mergeTask))
+ {
+ // The merge task is cleared in the merge. This
+ // ensures two invitations cannot cause a race with
+ // the merge.
+ //_mergeTask = 0;
+ return;
+ }
+ _mergeTask = 0;
+ }
+
+ // We're now joining with another group. If we are active we
+ // must stop serving as a master or slave.
+ setState(NodeStateElection);
+ while(!_destroy && _updateCounter > 0)
+ {
+ wait();
+ }
+ if(_destroy)
+ {
+ return;
+ }
+
+ tmpCoord = _coord;
+ tmpSet = _up;
+
+ _coord = j;
+ _group = gn;
+ max = _max;
+ }
+
+ Ice::IntSeq forwardedInvites;
+ if(tmpCoord == _id) // Forward invitation to my old members.
+ {
+ for(set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
+ {
+ try
+ {
+ map<int, NodePrx>::const_iterator node = _nodesOneway.find(p->id);
+ assert(node != _nodesOneway.end());
+ node->second->invitation(j, gn);
+ forwardedInvites.push_back(p->id);
+ }
+ catch(const Ice::Exception&)
+ {
+ }
+ }
+ }
+
+ // Set the state and timer before calling accept. This ensures
+ // that if ready is called directly after accept is called then
+ // everything is fine. Setting the state *after* calling accept
+ // can cause a race.
+ {
+ Lock sync(*this);
+ if(_destroy)
+ {
+ return;
+ }
+ assert(_state == NodeStateElection);
+ setState(NodeStateReorganization);
+ if(!_timeoutTask)
+ {
+ _timeoutTask = new TimeoutTask(this);
+ _timer->scheduleRepeated(_timeoutTask, _masterTimeout);
+ }
+ }
+
+ try
+ {
+ map<int, NodePrx>::const_iterator node = _nodesOneway.find(j);
+ assert(node != _nodesOneway.end());
+ node->second->accept(_id, gn, forwardedInvites, _replica->getObserver(), _replica->getLastLogUpdate(), max);
+ }
+ catch(const Ice::Exception&)
+ {
+ recovery();
+ return;
+ }
+}
+
+void
+NodeI::ready(int j, const string& gn, const Ice::ObjectPrx& coordinator, int max, Ice::Long generation,
+ const Ice::Current&)
+{
+ Lock sync(*this);
+ if(!_destroy && _state == NodeStateReorganization && _group == gn)
+ {
+ // The coordinator must be j (this was set in the invitation).
+ if(_coord != j)
+ {
+ Ice::Warning warn(_traceLevels->logger);
+ warn << _traceLevels->electionCat << ": ignoring ready call from replica node " << j
+ << " (real coordinator is " << _coord << ")";
+ return;
+ }
+
+ // Here we've already validated j in the invite call
+ // (otherwise _group != gn).
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": reporting for duty in group " << gn << " with coordinator " << j;
+ }
+
+ if(static_cast<unsigned int>(max) > _max)
+ {
+ _max = max;
+ }
+ _generation = generation;
+
+ // Activate the replica here since the replica is now ready
+ // for duty.
+ setState(NodeStateNormal);
+ _coordinatorProxy = coordinator;
+
+ if(!_checkTask)
+ {
+ _checkTask = new CheckTask(this);
+ _timer->schedule(_checkTask, _electionTimeout);
+ }
+ }
+}
+
+void
+NodeI::accept(int j, const string& gn, const Ice::IntSeq& forwardedInvites, const Ice::ObjectPrx& observer,
+ const LogUpdate& llu, int max, const Ice::Current&)
+{
+ // Verify that j exists in our node set.
+ if(_nodes.find(j) == _nodes.end())
+ {
+ Ice::Warning warn(_traceLevels->logger);
+ warn << _traceLevels->electionCat << ": ignoring accept from unknown node " << j;
+ return;
+ }
+
+ Lock sync(*this);
+ if(!_destroy && _state == NodeStateElection && _group == gn && _coord == _id)
+ {
+ _up.insert(GroupNodeInfo(j, llu, observer));
+
+ if(static_cast<unsigned int>(max) > _max)
+ {
+ _max = max;
+ }
+
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": accept " << j << " forward invites (";
+ for(Ice::IntSeq::const_iterator p = forwardedInvites.begin(); p != forwardedInvites.end(); ++p)
+ {
+ if(p != forwardedInvites.begin())
+ {
+ out << ",";
+ }
+ out << *p;
+ }
+ out << ") with llu "
+ << llu.generation << "/" << llu.iteration << " into group " << gn
+ << " group size " << (_up.size() + 1);
+ }
+
+ // Add each of the forwarded invites to the list of issued
+ // invitations. This doesn't use set_union since
+ // forwardedInvites may not be sorted.
+ _invitesIssued.insert(forwardedInvites.begin(), forwardedInvites.end());
+ // We've accepted the invitation from node j.
+ _invitesAccepted.insert(j);
+
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": invites pending: " << toString(_invitesIssued)
+ << " invites accepted: " << toString(_invitesAccepted);
+ }
+
+ // If invitations have been accepted from all nodes and the
+ // merge task has already been scheduled then reschedule the
+ // merge continue immediately. Otherwise, we let the existing
+ // merge() schedule continue.
+ if((_up.size() == _nodes.size()-1 || _invitesIssued == _invitesAccepted) &&
+ _mergeContinueTask && _timer->cancel(_mergeContinueTask))
+ {
+ _timer->schedule(_mergeContinueTask, IceUtil::Time::seconds(0));
+ }
+ }
+}
+
+bool
+NodeI::areYouCoordinator(const Ice::Current&) const
+{
+ Lock sync(*this);
+ return _state != NodeStateElection && _state != NodeStateReorganization && _coord == _id;
+}
+
+bool
+NodeI::areYouThere(const string& gn, int j, const Ice::Current&) const
+{
+ Lock sync(*this);
+ return _group == gn && _coord == _id && _up.find(GroupNodeInfo(j)) != _up.end();
+}
+
+Ice::ObjectPrx
+NodeI::sync(const Ice::Current&) const
+{
+ return _replica->getSync();
+}
+
+NodeInfoSeq
+NodeI::nodes(const Ice::Current&) const
+{
+ NodeInfoSeq seq;
+ for(map<int, NodePrx>::const_iterator q = _nodes.begin(); q != _nodes.end(); ++q)
+ {
+ NodeInfo ni;
+ ni.id = q->first;
+ ni.n = q->second;
+ seq.push_back(ni);
+ }
+
+ return seq;
+}
+
+QueryInfo
+NodeI::query(const Ice::Current&) const
+{
+ Lock sync(*this);
+ QueryInfo info;
+ info.id = _id;
+ info.coord = _coord;
+ info.group = _group;
+ info.replica = _replicaProxy;
+ info.state = _state;
+ info.max = _max;
+
+ for(set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
+ {
+ GroupInfo gi;
+ gi.id = p->id;
+ gi.llu = p->llu;
+ info.up.push_back(gi);
+ }
+
+ return info;
+}
+
+void
+NodeI::recovery(Ice::Long generation)
+{
+ Lock sync(*this);
+
+ // Ignore the recovery if the node has already advanced a
+ // generation.
+ if(generation != -1 && generation != _generation)
+ {
+ return;
+ }
+
+ setState(NodeStateInactive);
+ while(!_destroy && _updateCounter > 0)
+ {
+ wait();
+ }
+ if(_destroy)
+ {
+ return;
+ }
+
+ ostringstream os;
+ os << _id << ":" << IceUtil::generateUUID();
+ _group = os.str();
+
+ _generation = -1;
+ _coord = _id;
+ _up.clear();
+
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": creating new self-coordinated group " << _group;
+ }
+
+ // Reset the timer states.
+ if(_mergeTask)
+ {
+ _timer->cancel(_mergeTask);
+ _mergeTask = 0;
+ }
+ if(_timeoutTask)
+ {
+ _timer->cancel(_timeoutTask);
+ _timeoutTask = 0;
+ }
+ if(!_checkTask)
+ {
+ _checkTask = new CheckTask(this);
+ _timer->schedule(_checkTask, _electionTimeout);
+ }
+}
+
+void
+NodeI::destroy()
+{
+ Lock sync(*this);
+ assert(!_destroy);
+
+ while(_updateCounter > 0)
+ {
+ wait();
+ }
+ _destroy = true;
+ notifyAll();
+
+ // Cancel the timers.
+ if(_checkTask)
+ {
+ _timer->cancel(_checkTask);
+ _checkTask = 0;
+ }
+
+ if(_timeoutTask)
+ {
+ _timer->cancel(_timeoutTask);
+ _timeoutTask = 0;
+ }
+
+ if(_mergeTask)
+ {
+ _timer->cancel(_mergeTask);
+ _mergeTask = 0;
+ }
+}
+
+// A node should only receive an observer init call if the node is
+// reorganizing and its not the coordinator.
+void
+NodeI::checkObserverInit(Ice::Long generation)
+{
+ Lock sync(*this);
+ if(_state != NodeStateReorganization)
+ {
+ throw ObserverInconsistencyException("init cannot block when state != NodeStateReorganization");
+ }
+ if(_coord == _id)
+ {
+ throw ObserverInconsistencyException("init called on coordinator");
+ }
+}
+
+// Notify the node that we're about to start an update.
+Ice::ObjectPrx
+NodeI::startUpdate(Ice::Long& generation, const char* file, int line)
+{
+ bool majority = _observers->check();
+
+ Lock sync(*this);
+
+ // If we've actively replicating & lost the majority of our replicas then recover.
+ if(!_coordinatorProxy && !_destroy && _state == NodeStateNormal && !majority)
+ {
+ recovery();
+ }
+
+ while(!_destroy && _state != NodeStateNormal)
+ {
+ wait();
+ }
+ if(_destroy)
+ {
+ throw Ice::UnknownException(file, line);
+ }
+ if(!_coordinatorProxy)
+ {
+ ++_updateCounter;
+ }
+ generation = _generation;
+ return _coordinatorProxy;
+}
+
+bool
+NodeI::updateMaster(const char* file, int line)
+{
+ bool majority = _observers->check();
+
+ Lock sync(*this);
+
+ // If the node is destroyed, or is not a coordinator then we're
+ // done.
+ if(_destroy || _coordinatorProxy)
+ {
+ return false;
+ }
+
+ // If we've lost the majority of our replicas then recover.
+ if(_state == NodeStateNormal && !majority)
+ {
+ recovery();
+ }
+
+ // If we're not replicating then we're done.
+ if(_state != NodeStateNormal)
+ {
+ return false;
+ }
+
+ // Otherwise adjust the update counter, and return success.
+ ++_updateCounter;
+ return true;
+}
+
+Ice::ObjectPrx
+NodeI::startCachedRead(Ice::Long& generation, const char* file, int line)
+{
+ Lock sync(*this);
+ while(!_destroy && _state != NodeStateNormal)
+ {
+ wait();
+ }
+ if(_destroy)
+ {
+ throw Ice::UnknownException(file, line);
+ }
+ generation = _generation;
+ ++_updateCounter;
+ return _coordinatorProxy;
+}
+
+void
+NodeI::startObserverUpdate(Ice::Long generation, const char* file, int line)
+{
+ Lock sync(*this);
+ if(_destroy)
+ {
+ throw Ice::UnknownException(file, line);
+ }
+ if(_state != NodeStateNormal)
+ {
+ throw ObserverInconsistencyException("update called on inactive node");
+ }
+ if(!_coordinatorProxy)
+ {
+ throw ObserverInconsistencyException("update called on the master");
+ }
+ if(generation != _generation)
+ {
+ throw ObserverInconsistencyException("invalid generation");
+ }
+ ++_updateCounter;
+}
+
+void
+NodeI::finishUpdate()
+{
+ Lock sync(*this);
+ assert(!_destroy);
+ --_updateCounter;
+ assert(_updateCounter >= 0);
+ if(_updateCounter == 0)
+ {
+ notifyAll();
+ }
+}
+
+namespace
+{
+static string
+stateToString(NodeState s)
+{
+ switch(s)
+ {
+ case NodeStateInactive:
+ return "inactive";
+ case NodeStateElection:
+ return "election";
+ case NodeStateReorganization:
+ return "reorganization";
+ case NodeStateNormal:
+ return "normal";
+ }
+ return "unknown";
+}
+}
+
+void
+NodeI::setState(NodeState s)
+{
+ if(s != _state)
+ {
+ if(_traceLevels->election > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
+ out << "node " << _id << ": transition from " << stateToString(_state) << " to "
+ << stateToString(s);
+ }
+ _state = s;
+ if(_state == NodeStateNormal)
+ {
+ notifyAll();
+ }
+ }
+}