diff options
Diffstat (limited to 'cpp/src/IceGrid/Topics.h')
-rw-r--r-- | cpp/src/IceGrid/Topics.h | 116 |
1 files changed, 57 insertions, 59 deletions
diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h index 01288d76715..48d60c2df7a 100644 --- a/cpp/src/IceGrid/Topics.h +++ b/cpp/src/IceGrid/Topics.h @@ -5,7 +5,6 @@ #ifndef ICEGRID_TOPICS_H #define ICEGRID_TOPICS_H -#include <IceUtil/Mutex.h> #include <IceStorm/IceStorm.h> #include <IceGrid/Internal.h> #include <IceGrid/Registry.h> @@ -14,20 +13,20 @@ namespace IceGrid { -class ObserverTopic : public IceUtil::Monitor<IceUtil::Mutex>, public virtual Ice::Object +class ObserverTopic { public: - ObserverTopic(const IceStorm::TopicManagerPrx&, const std::string&, Ice::Long = 0); - virtual ~ObserverTopic(); + ObserverTopic(const std::shared_ptr<IceStorm::TopicManagerPrx>&, const std::string&, long long = 0); + virtual ~ObserverTopic() = default; - int subscribe(const Ice::ObjectPrx&, const std::string& = std::string()); - void unsubscribe(const Ice::ObjectPrx&, const std::string& = std::string()); + int subscribe(const std::shared_ptr<Ice::ObjectPrx>&, const std::string& = std::string()); + void unsubscribe(const std::shared_ptr<Ice::ObjectPrx>&, const std::string& = std::string()); void destroy(); void receivedUpdate(const std::string&, int, const std::string&); - virtual void initObserver(const Ice::ObjectPrx&) = 0; + virtual void initObserver(const std::shared_ptr<Ice::ObjectPrx>&) = 0; void waitForSyncedSubscribers(int, const std::string& = std::string()); @@ -36,140 +35,139 @@ public: protected: void addExpectedUpdate(int, const std::string& = std::string()); - void waitForSyncedSubscribersNoSync(int, const std::string& = std::string()); - void updateSerial(Ice::Long = 0); - Ice::Context getContext(int, Ice::Long = 0) const; + void updateSerial(long long = 0); + Ice::Context getContext(int, long long = 0) const; - template<typename T> std::vector<T> getPublishers() const + template<typename T> std::vector<std::shared_ptr<T>> getPublishers() const { - std::vector<T> publishers; - for(std::vector<Ice::ObjectPrx>::const_iterator p = _basePublishers.begin(); p != _basePublishers.end(); ++p) + std::vector<std::shared_ptr<T>> publishers; + for(const auto& publisher :_basePublishers) { - publishers.push_back(T::uncheckedCast(*p)); + publishers.push_back(Ice::uncheckedCast<T>(publisher)); } return publishers; } - Ice::LoggerPtr _logger; - std::map<Ice::EncodingVersion, IceStorm::TopicPrx> _topics; - std::vector<Ice::ObjectPrx> _basePublishers; + std::shared_ptr<Ice::Logger> _logger; + std::map<Ice::EncodingVersion, std::shared_ptr<IceStorm::TopicPrx>> _topics; + std::vector<std::shared_ptr<Ice::ObjectPrx>> _basePublishers; int _serial; - Ice::Long _dbSerial; + long long _dbSerial; std::set<std::string> _syncSubscribers; std::map<int, std::set<std::string> > _waitForUpdates; std::map<int, std::map<std::string, std::string> > _updateFailures; + + mutable std::mutex _mutex; + std::condition_variable _condVar; }; -typedef IceUtil::Handle<ObserverTopic> ObserverTopicPtr; class RegistryObserverTopic : public ObserverTopic { public: - RegistryObserverTopic(const IceStorm::TopicManagerPrx&); + RegistryObserverTopic(const std::shared_ptr<IceStorm::TopicManagerPrx>&); void registryUp(const RegistryInfo&); void registryDown(const std::string&); - virtual void initObserver(const Ice::ObjectPrx&); + virtual void initObserver(const std::shared_ptr<Ice::ObjectPrx>&); private: - std::vector<RegistryObserverPrx> _publishers; + std::vector<std::shared_ptr<RegistryObserverPrx>> _publishers; std::map<std::string, RegistryInfo> _registries; }; -typedef IceUtil::Handle<RegistryObserverTopic> RegistryObserverTopicPtr; -class NodeObserverTopic : public ObserverTopic, public NodeObserver +class NodeObserverTopic final : public ObserverTopic, public NodeObserver { public: - NodeObserverTopic(const IceStorm::TopicManagerPrx&, const Ice::ObjectAdapterPtr&); + static std::shared_ptr<NodeObserverTopic> + create(const std::shared_ptr<IceStorm::TopicManagerPrx>&, const std::shared_ptr<Ice::ObjectAdapter>&); - virtual void nodeInit(const NodeDynamicInfoSeq&, const Ice::Current&); - virtual void nodeUp(const NodeDynamicInfo&, const Ice::Current&); - virtual void nodeDown(const std::string&, const Ice::Current&); - virtual void updateServer(const std::string&, const ServerDynamicInfo&, const Ice::Current&); - virtual void updateAdapter(const std::string&, const AdapterDynamicInfo&, const Ice::Current&); + void nodeInit(NodeDynamicInfoSeq, const Ice::Current&) override; + void nodeUp(NodeDynamicInfo, const Ice::Current&) override; + void nodeDown(std::string, const Ice::Current&) override; + void updateServer(std::string, ServerDynamicInfo, const Ice::Current&) override; + void updateAdapter(std::string, AdapterDynamicInfo, const Ice::Current&) override; - const NodeObserverPrx& getPublisher() { return _externalPublisher; } + const std::shared_ptr<NodeObserverPrx>& getPublisher() { return _externalPublisher; } void nodeDown(const std::string&); - virtual void initObserver(const Ice::ObjectPrx&); + void initObserver(const std::shared_ptr<Ice::ObjectPrx>&) override; bool isServerEnabled(const std::string&) const; private: - const NodeObserverPrx _externalPublisher; - std::vector<NodeObserverPrx> _publishers; + NodeObserverTopic(const std::shared_ptr<IceStorm::TopicManagerPrx>&); + + const std::shared_ptr<NodeObserverPrx> _externalPublisher; + std::vector<std::shared_ptr<NodeObserverPrx>> _publishers; std::map<std::string, NodeDynamicInfo> _nodes; std::map<std::string, bool> _serverStatus; }; -typedef IceUtil::Handle<NodeObserverTopic> NodeObserverTopicPtr; class ApplicationObserverTopic : public ObserverTopic { public: - ApplicationObserverTopic(const IceStorm::TopicManagerPrx&, const std::map<std::string, ApplicationInfo>&, Ice::Long); + ApplicationObserverTopic(const std::shared_ptr<IceStorm::TopicManagerPrx>&, const std::map<std::string, ApplicationInfo>&, long long); - int applicationInit(Ice::Long, const ApplicationInfoSeq&); - int applicationAdded(Ice::Long, const ApplicationInfo&); - int applicationRemoved(Ice::Long, const std::string&); - int applicationUpdated(Ice::Long, const ApplicationUpdateInfo&); + int applicationInit(long long, const ApplicationInfoSeq&); + int applicationAdded(long long, const ApplicationInfo&); + int applicationRemoved(long long, const std::string&); + int applicationUpdated(long long, const ApplicationUpdateInfo&); - virtual void initObserver(const Ice::ObjectPrx&); + virtual void initObserver(const std::shared_ptr<Ice::ObjectPrx>&); private: - std::vector<ApplicationObserverPrx> _publishers; + std::vector<std::shared_ptr<ApplicationObserverPrx>> _publishers; std::map<std::string, ApplicationInfo> _applications; }; -typedef IceUtil::Handle<ApplicationObserverTopic> ApplicationObserverTopicPtr; class AdapterObserverTopic : public ObserverTopic { public: - AdapterObserverTopic(const IceStorm::TopicManagerPrx&, const std::map<std::string, AdapterInfo>&, Ice::Long); + AdapterObserverTopic(const std::shared_ptr<IceStorm::TopicManagerPrx>&, const std::map<std::string, AdapterInfo>&, long long); - int adapterInit(Ice::Long, const AdapterInfoSeq&); - int adapterAdded(Ice::Long, const AdapterInfo&); - int adapterUpdated(Ice::Long, const AdapterInfo&); - int adapterRemoved(Ice::Long, const std::string&); + int adapterInit(long long, const AdapterInfoSeq&); + int adapterAdded(long long, const AdapterInfo&); + int adapterUpdated(long long, const AdapterInfo&); + int adapterRemoved(long long, const std::string&); - virtual void initObserver(const Ice::ObjectPrx&); + virtual void initObserver(const std::shared_ptr<Ice::ObjectPrx>&); private: - std::vector<AdapterObserverPrx> _publishers; + std::vector<std::shared_ptr<AdapterObserverPrx>> _publishers; std::map<std::string, AdapterInfo> _adapters; }; -typedef IceUtil::Handle<AdapterObserverTopic> AdapterObserverTopicPtr; class ObjectObserverTopic : public ObserverTopic { public: - ObjectObserverTopic(const IceStorm::TopicManagerPrx&, const std::map<Ice::Identity, ObjectInfo>&, Ice::Long); + ObjectObserverTopic(const std::shared_ptr<IceStorm::TopicManagerPrx>&, const std::map<Ice::Identity, ObjectInfo>&, long long); - int objectInit(Ice::Long, const ObjectInfoSeq&); - int objectAdded(Ice::Long, const ObjectInfo&); - int objectUpdated(Ice::Long, const ObjectInfo&); - int objectRemoved(Ice::Long, const Ice::Identity&); + int objectInit(long long, const ObjectInfoSeq&); + int objectAdded(long long, const ObjectInfo&); + int objectUpdated(long long, const ObjectInfo&); + int objectRemoved(long long, const Ice::Identity&); int wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq&); int wellKnownObjectsRemoved(const ObjectInfoSeq&); - virtual void initObserver(const Ice::ObjectPrx&); + virtual void initObserver(const std::shared_ptr<Ice::ObjectPrx>&); private: - std::vector<ObjectObserverPrx> _publishers; + std::vector<std::shared_ptr<ObjectObserverPrx>> _publishers; std::map<Ice::Identity, ObjectInfo> _objects; }; -typedef IceUtil::Handle<ObjectObserverTopic> ObjectObserverTopicPtr; }; |