// ********************************************************************** // // Copyright (c) 2003-2015 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. // // ********************************************************************** #ifndef ICEGRID_TOPICS_H #define ICEGRID_TOPICS_H #include #include #include #include #include namespace IceGrid { class ObserverTopic : public IceUtil::Monitor, public virtual Ice::Object { public: ObserverTopic(const IceStorm::TopicManagerPrx&, const std::string&, Ice::Long = 0); virtual ~ObserverTopic(); int subscribe(const Ice::ObjectPrx&, const std::string& = std::string()); void unsubscribe(const Ice::ObjectPrx&, const std::string& = std::string()); void destroy(); void receivedUpdate(const std::string&, int, const std::string&); virtual void initObserver(const Ice::ObjectPrx&) = 0; void waitForSyncedSubscribers(int, const std::string& = std::string()); int getSerial() const; protected: void addExpectedUpdate(int, const std::string& = std::string()); void waitForSyncedSubscribersNoSync(int, const std::string& = std::string()); void updateSerial(Ice::Long = 0); Ice::Context getContext(int, Ice::Long = 0) const; template std::vector getPublishers() const { std::vector publishers; for(std::vector::const_iterator p = _basePublishers.begin(); p != _basePublishers.end(); ++p) { publishers.push_back(T::uncheckedCast(*p)); } return publishers; } Ice::LoggerPtr _logger; std::map _topics; std::vector _basePublishers; int _serial; Ice::Long _dbSerial; std::set _syncSubscribers; std::map > _waitForUpdates; std::map > _updateFailures; }; typedef IceUtil::Handle ObserverTopicPtr; class RegistryObserverTopic : public ObserverTopic { public: RegistryObserverTopic(const IceStorm::TopicManagerPrx&); void registryUp(const RegistryInfo&); void registryDown(const std::string&); virtual void initObserver(const Ice::ObjectPrx&); private: std::vector _publishers; std::map _registries; }; typedef IceUtil::Handle RegistryObserverTopicPtr; class NodeObserverTopic : public ObserverTopic, public NodeObserver { public: NodeObserverTopic(const IceStorm::TopicManagerPrx&, const Ice::ObjectAdapterPtr&); virtual void nodeInit(const NodeDynamicInfoSeq&, const Ice::Current&); virtual void nodeUp(const NodeDynamicInfo&, const Ice::Current&); virtual void nodeDown(const std::string&, const Ice::Current&); virtual void updateServer(const std::string&, const ServerDynamicInfo&, const Ice::Current&); virtual void updateAdapter(const std::string&, const AdapterDynamicInfo&, const Ice::Current&); const NodeObserverPrx& getPublisher() { return _externalPublisher; } void nodeDown(const std::string&); virtual void initObserver(const Ice::ObjectPrx&); private: const NodeObserverPrx _externalPublisher; std::vector _publishers; std::map _nodes; }; typedef IceUtil::Handle NodeObserverTopicPtr; class ApplicationObserverTopic : public ObserverTopic { public: ApplicationObserverTopic(const IceStorm::TopicManagerPrx&, const std::map&, Ice::Long); int applicationInit(Ice::Long, const ApplicationInfoSeq&); int applicationAdded(Ice::Long, const ApplicationInfo&); int applicationRemoved(Ice::Long, const std::string&); int applicationUpdated(Ice::Long, const ApplicationUpdateInfo&); virtual void initObserver(const Ice::ObjectPrx&); private: std::vector _publishers; std::map _applications; }; typedef IceUtil::Handle ApplicationObserverTopicPtr; class AdapterObserverTopic : public ObserverTopic { public: AdapterObserverTopic(const IceStorm::TopicManagerPrx&, const std::map&, Ice::Long); int adapterInit(Ice::Long, const AdapterInfoSeq&); int adapterAdded(Ice::Long, const AdapterInfo&); int adapterUpdated(Ice::Long, const AdapterInfo&); int adapterRemoved(Ice::Long, const std::string&); virtual void initObserver(const Ice::ObjectPrx&); private: std::vector _publishers; std::map _adapters; }; typedef IceUtil::Handle AdapterObserverTopicPtr; class ObjectObserverTopic : public ObserverTopic { public: ObjectObserverTopic(const IceStorm::TopicManagerPrx&, const std::map&, Ice::Long); int objectInit(Ice::Long, const ObjectInfoSeq&); int objectAdded(Ice::Long, const ObjectInfo&); int objectUpdated(Ice::Long, const ObjectInfo&); int objectRemoved(Ice::Long, const Ice::Identity&); int wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq&); int wellKnownObjectsRemoved(const ObjectInfoSeq&); virtual void initObserver(const Ice::ObjectPrx&); private: std::vector _publishers; std::map _objects; }; typedef IceUtil::Handle ObjectObserverTopicPtr; }; #endif