summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.h
blob: 48d60c2df7a3469ff1b255376587260fbdffd32f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//

#ifndef ICEGRID_TOPICS_H
#define ICEGRID_TOPICS_H

#include <IceStorm/IceStorm.h>
#include <IceGrid/Internal.h>
#include <IceGrid/Registry.h>
#include <set>

namespace IceGrid
{

class ObserverTopic
{
public:

    ObserverTopic(const std::shared_ptr<IceStorm::TopicManagerPrx>&, const std::string&, long long = 0);
    virtual ~ObserverTopic() = default;

    int subscribe(const std::shared_ptr<Ice::ObjectPrx>&, const std::string& = std::string());
    void unsubscribe(const std::shared_ptr<Ice::ObjectPrx>&, const std::string& = std::string());
    void destroy();

    void receivedUpdate(const std::string&, int, const std::string&);

    virtual void initObserver(const std::shared_ptr<Ice::ObjectPrx>&) = 0;

    void waitForSyncedSubscribers(int, const std::string& = std::string());

    int getSerial() const;

protected:

    void addExpectedUpdate(int, const std::string& = std::string());
    void updateSerial(long long = 0);
    Ice::Context getContext(int, long long = 0) const;

    template<typename T> std::vector<std::shared_ptr<T>> getPublishers() const
    {
        std::vector<std::shared_ptr<T>> publishers;
        for(const auto& publisher :_basePublishers)
        {
            publishers.push_back(Ice::uncheckedCast<T>(publisher));
        }
        return publishers;
    }

    std::shared_ptr<Ice::Logger> _logger;
    std::map<Ice::EncodingVersion, std::shared_ptr<IceStorm::TopicPrx>> _topics;
    std::vector<std::shared_ptr<Ice::ObjectPrx>> _basePublishers;
    int _serial;
    long long _dbSerial;

    std::set<std::string> _syncSubscribers;
    std::map<int, std::set<std::string> > _waitForUpdates;
    std::map<int, std::map<std::string, std::string> > _updateFailures;

    mutable std::mutex _mutex;
    std::condition_variable _condVar;
};

class RegistryObserverTopic : public ObserverTopic
{
public:

    RegistryObserverTopic(const std::shared_ptr<IceStorm::TopicManagerPrx>&);

    void registryUp(const RegistryInfo&);
    void registryDown(const std::string&);

    virtual void initObserver(const std::shared_ptr<Ice::ObjectPrx>&);

private:

    std::vector<std::shared_ptr<RegistryObserverPrx>> _publishers;
    std::map<std::string, RegistryInfo> _registries;
};

class NodeObserverTopic final : public ObserverTopic, public NodeObserver
{
public:

    static std::shared_ptr<NodeObserverTopic>
    create(const std::shared_ptr<IceStorm::TopicManagerPrx>&, const std::shared_ptr<Ice::ObjectAdapter>&);

    void nodeInit(NodeDynamicInfoSeq, const Ice::Current&) override;
    void nodeUp(NodeDynamicInfo, const Ice::Current&) override;
    void nodeDown(std::string, const Ice::Current&) override;
    void updateServer(std::string, ServerDynamicInfo, const Ice::Current&) override;
    void updateAdapter(std::string, AdapterDynamicInfo, const Ice::Current&) override;

    const std::shared_ptr<NodeObserverPrx>& getPublisher() { return _externalPublisher; }

    void nodeDown(const std::string&);
    void initObserver(const std::shared_ptr<Ice::ObjectPrx>&) override;

    bool isServerEnabled(const std::string&) const;

private:

    NodeObserverTopic(const std::shared_ptr<IceStorm::TopicManagerPrx>&);

    const std::shared_ptr<NodeObserverPrx> _externalPublisher;
    std::vector<std::shared_ptr<NodeObserverPrx>> _publishers;
    std::map<std::string, NodeDynamicInfo> _nodes;
    std::map<std::string, bool> _serverStatus;
};

class ApplicationObserverTopic : public ObserverTopic
{
public:

    ApplicationObserverTopic(const std::shared_ptr<IceStorm::TopicManagerPrx>&, const std::map<std::string, ApplicationInfo>&, long long);

    int applicationInit(long long, const ApplicationInfoSeq&);
    int applicationAdded(long long, const ApplicationInfo&);
    int applicationRemoved(long long, const std::string&);
    int applicationUpdated(long long, const ApplicationUpdateInfo&);

    virtual void initObserver(const std::shared_ptr<Ice::ObjectPrx>&);

private:

    std::vector<std::shared_ptr<ApplicationObserverPrx>> _publishers;
    std::map<std::string, ApplicationInfo> _applications;
};

class AdapterObserverTopic : public ObserverTopic
{
public:

    AdapterObserverTopic(const std::shared_ptr<IceStorm::TopicManagerPrx>&, const std::map<std::string, AdapterInfo>&, long long);

    int adapterInit(long long, const AdapterInfoSeq&);
    int adapterAdded(long long, const AdapterInfo&);
    int adapterUpdated(long long, const AdapterInfo&);
    int adapterRemoved(long long, const std::string&);

    virtual void initObserver(const std::shared_ptr<Ice::ObjectPrx>&);

private:

    std::vector<std::shared_ptr<AdapterObserverPrx>> _publishers;
    std::map<std::string, AdapterInfo> _adapters;
};

class ObjectObserverTopic : public ObserverTopic
{
public:

    ObjectObserverTopic(const std::shared_ptr<IceStorm::TopicManagerPrx>&, const std::map<Ice::Identity, ObjectInfo>&, long long);

    int objectInit(long long, const ObjectInfoSeq&);
    int objectAdded(long long, const ObjectInfo&);
    int objectUpdated(long long, const ObjectInfo&);
    int objectRemoved(long long, const Ice::Identity&);

    int wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq&);
    int wellKnownObjectsRemoved(const ObjectInfoSeq&);

    virtual void initObserver(const std::shared_ptr<Ice::ObjectPrx>&);

private:

    std::vector<std::shared_ptr<ObjectObserverPrx>> _publishers;
    std::map<Ice::Identity, ObjectInfo> _objects;
};

};

#endif