// // Copyright (c) ZeroC, Inc. All rights reserved. // #ifndef NODE_I_H #define NODE_I_H #include #include #include #include #include #include #include #include namespace IceStormElection { class Observers; class NodeI final : public Node, public std::enable_shared_from_this { public: NodeI(const std::shared_ptr&, std::shared_ptr, std::shared_ptr, int, const std::map>&); void start(); void check(); void timeout(); void merge(const std::set&); void mergeContinue(); void invitation(int, std::string, const Ice::Current&) override; void ready(int, std::string, std::shared_ptr, int, Ice::Long, const Ice::Current&) override; void accept(int, std::string, Ice::IntSeq, std::shared_ptr, LogUpdate, int, const Ice::Current&) override; bool areYouCoordinator(const Ice::Current&) const override; bool areYouThere(std::string, int, const Ice::Current&) const override; std::shared_ptr sync(const Ice::Current&) const override; NodeInfoSeq nodes(const Ice::Current&) const override; QueryInfo query(const Ice::Current&) const override; void recovery(Ice::Long = -1); void destroy(); // Notify the node that we're about to start an update. void checkObserverInit(Ice::Long); std::shared_ptr startUpdate(Ice::Long&, const char*, int); std::shared_ptr startCachedRead(Ice::Long&, const char*, int); void startObserverUpdate(Ice::Long, const char*, int); bool updateMaster(const char*, int); // The node has completed the update. void finishUpdate(); private: void setState(NodeState); const IceUtil::TimerPtr _timer; const std::shared_ptr _traceLevels; const std::shared_ptr _observers; const std::shared_ptr _replica; // The replica. const std::shared_ptr _replicaProxy; // A proxy to the individual replica. const int _id; // My node id. const std::map> _nodes; // The nodes indexed by their id. const std::map> _nodesOneway; // The nodes indexed by their id (as oneway proxies). const std::chrono::seconds _masterTimeout; const std::chrono::seconds _electionTimeout; const std::chrono::seconds _mergeTimeout; NodeState _state; int _updateCounter; int _coord; // Id of the coordinator. std::string _group; // My group id. std::set _up; // Set of nodes in my group. std::set _invitesIssued; // The issued invitations. std::set _invitesAccepted; // The accepted invitations. unsigned int _max; // The highest group count I've seen. Ice::Long _generation; // The current generation (or -1 if not set). std::shared_ptr _coordinatorProxy; bool _destroy; IceUtil::TimerTaskPtr _mergeTask; IceUtil::TimerTaskPtr _timeoutTask; IceUtil::TimerTaskPtr _checkTask; IceUtil::TimerTaskPtr _mergeContinueTask; mutable std::recursive_mutex _mutex; std::condition_variable_any _condVar; }; class FinishUpdateHelper { public: FinishUpdateHelper(std::shared_ptr node) : _node(std::move(node)) { } ~FinishUpdateHelper() { if(_node) { _node->finishUpdate(); } } private: const std::shared_ptr _node; }; class CachedReadHelper { public: CachedReadHelper(std::shared_ptr node, const char* file, int line) : _node(std::move(node)) { if(_node) { _master = _node->startCachedRead(_generation, file, line); } } ~CachedReadHelper() { if(_node) { _node->finishUpdate(); } } std::shared_ptr getMaster() const { return _master; } Ice::Long generation() const { return _generation; } bool observerPrecondition(Ice::Long generation) const { return generation == _generation && _master; } private: const std::shared_ptr _node; std::shared_ptr _master; Ice::Long _generation; }; class ObserverUpdateHelper { public: ObserverUpdateHelper(std::shared_ptr node, Ice::Long generation, const char* file, int line) : _node(std::move(node)) { if(_node) { _node->startObserverUpdate(generation, file, line); } } ~ObserverUpdateHelper() { if(_node) { _node->finishUpdate(); } } private: const std::shared_ptr _node; }; } #endif // NODE_I_H