// // Copyright (c) ZeroC, Inc. All rights reserved. // #ifndef ICEGRID_TOPICS_H #define ICEGRID_TOPICS_H #include #include #include #include namespace IceGrid { class ObserverTopic { public: ObserverTopic(const std::shared_ptr&, const std::string&, long long = 0); virtual ~ObserverTopic() = default; int subscribe(const std::shared_ptr&, const std::string& = std::string()); void unsubscribe(const std::shared_ptr&, const std::string& = std::string()); void destroy(); void receivedUpdate(const std::string&, int, const std::string&); virtual void initObserver(const std::shared_ptr&) = 0; void waitForSyncedSubscribers(int, const std::string& = std::string()); int getSerial() const; protected: void addExpectedUpdate(int, const std::string& = std::string()); void updateSerial(long long = 0); Ice::Context getContext(int, long long = 0) const; template std::vector> getPublishers() const { std::vector> publishers; for(const auto& publisher :_basePublishers) { publishers.push_back(Ice::uncheckedCast(publisher)); } return publishers; } std::shared_ptr _logger; std::map> _topics; std::vector> _basePublishers; int _serial; long long _dbSerial; std::set _syncSubscribers; std::map > _waitForUpdates; std::map > _updateFailures; mutable std::mutex _mutex; std::condition_variable _condVar; }; class RegistryObserverTopic : public ObserverTopic { public: RegistryObserverTopic(const std::shared_ptr&); void registryUp(const RegistryInfo&); void registryDown(const std::string&); virtual void initObserver(const std::shared_ptr&); private: std::vector> _publishers; std::map _registries; }; class NodeObserverTopic final : public ObserverTopic, public NodeObserver { public: static std::shared_ptr create(const std::shared_ptr&, const std::shared_ptr&); void nodeInit(NodeDynamicInfoSeq, const Ice::Current&) override; void nodeUp(NodeDynamicInfo, const Ice::Current&) override; void nodeDown(std::string, const Ice::Current&) override; void updateServer(std::string, ServerDynamicInfo, const Ice::Current&) override; void updateAdapter(std::string, AdapterDynamicInfo, const Ice::Current&) override; const std::shared_ptr& getPublisher() { return _externalPublisher; } void nodeDown(const std::string&); void initObserver(const std::shared_ptr&) override; bool isServerEnabled(const std::string&) const; private: NodeObserverTopic(const std::shared_ptr&); const std::shared_ptr _externalPublisher; std::vector> _publishers; std::map _nodes; std::map _serverStatus; }; class ApplicationObserverTopic : public ObserverTopic { public: ApplicationObserverTopic(const std::shared_ptr&, const std::map&, long long); int applicationInit(long long, const ApplicationInfoSeq&); int applicationAdded(long long, const ApplicationInfo&); int applicationRemoved(long long, const std::string&); int applicationUpdated(long long, const ApplicationUpdateInfo&); virtual void initObserver(const std::shared_ptr&); private: std::vector> _publishers; std::map _applications; }; class AdapterObserverTopic : public ObserverTopic { public: AdapterObserverTopic(const std::shared_ptr&, const std::map&, long long); int adapterInit(long long, const AdapterInfoSeq&); int adapterAdded(long long, const AdapterInfo&); int adapterUpdated(long long, const AdapterInfo&); int adapterRemoved(long long, const std::string&); virtual void initObserver(const std::shared_ptr&); private: std::vector> _publishers; std::map _adapters; }; class ObjectObserverTopic : public ObserverTopic { public: ObjectObserverTopic(const std::shared_ptr&, const std::map&, long long); int objectInit(long long, const ObjectInfoSeq&); int objectAdded(long long, const ObjectInfo&); int objectUpdated(long long, const ObjectInfo&); int objectRemoved(long long, const Ice::Identity&); int wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq&); int wellKnownObjectsRemoved(const ObjectInfoSeq&); virtual void initObserver(const std::shared_ptr&); private: std::vector> _publishers; std::map _objects; }; }; #endif