summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Instance.h
blob: 5f1e1fe173944bfe5c164551e45c23b7514d2e4a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//

#ifndef INSTANCE_H
#define INSTANCE_H

#include <Ice/CommunicatorF.h>
#include <Ice/ObjectAdapterF.h>
#include <Ice/PropertiesF.h>
#include <IceStorm/Election.h>
#include <IceStorm/Instrumentation.h>
#include <IceStorm/Util.h>

namespace IceStormElection
{

class Observers;
class NodeI;

}

namespace IceStorm
{

class TraceLevels;

class TopicReaper
{
public:

    void add(const std::string&);
    std::vector<std::string> consumeReapedTopics();

private:

    std::vector<std::string> _topics;

    std::mutex _mutex;
};

class Instance
{
public:

    enum SendQueueSizeMaxPolicy
    {
        RemoveSubscriber,
        DropEvents
    };

    Instance(const std::string&, const std::string&, std::shared_ptr<Ice::Communicator>,
             std::shared_ptr<Ice::ObjectAdapter>, std::shared_ptr<Ice::ObjectAdapter>,
             std::shared_ptr<Ice::ObjectAdapter> = nullptr,
             std::shared_ptr<IceStormElection::NodePrx> = nullptr);

    virtual ~Instance();

    void setNode(std::shared_ptr<IceStormElection::NodeI>);

    std::string instanceName() const;
    std::string serviceName() const;
    std::shared_ptr<Ice::Communicator> communicator() const;
    std::shared_ptr<Ice::Properties> properties() const;
    std::shared_ptr<Ice::ObjectAdapter> publishAdapter() const;
    std::shared_ptr<Ice::ObjectAdapter> topicAdapter() const;
    std::shared_ptr<Ice::ObjectAdapter> nodeAdapter() const;
    std::shared_ptr<IceStormElection::Observers> observers() const;
    std::shared_ptr<IceStormElection::NodeI> node() const;
    std::shared_ptr<IceStormElection::NodePrx> nodeProxy() const;
    std::shared_ptr<TraceLevels> traceLevels() const;
    IceUtil::TimerPtr timer() const;
    std::shared_ptr<Ice::ObjectPrx> topicReplicaProxy() const;
    std::shared_ptr<Ice::ObjectPrx> publisherReplicaProxy() const;
    std::shared_ptr<IceStorm::Instrumentation::TopicManagerObserver> observer() const;
    std::shared_ptr<TopicReaper> topicReaper() const;

    std::chrono::seconds discardInterval() const;
    std::chrono::milliseconds flushInterval() const;
    std::chrono::milliseconds sendTimeout() const;
    int sendQueueSizeMax() const;
    SendQueueSizeMaxPolicy sendQueueSizeMaxPolicy() const;

    void shutdown();
    virtual void destroy();

private:

    const std::string _instanceName;
    const std::string _serviceName;
    const std::shared_ptr<Ice::Communicator> _communicator;
    const std::shared_ptr<Ice::ObjectAdapter> _publishAdapter;
    const std::shared_ptr<Ice::ObjectAdapter> _topicAdapter;
    const std::shared_ptr<Ice::ObjectAdapter> _nodeAdapter;
    const std::shared_ptr<IceStormElection::NodePrx> _nodeProxy;
    const std::shared_ptr<TraceLevels> _traceLevels;
    const std::chrono::seconds _discardInterval;
    const std::chrono::milliseconds _flushInterval;
    const std::chrono::milliseconds _sendTimeout;
    const int _sendQueueSizeMax;
    const SendQueueSizeMaxPolicy _sendQueueSizeMaxPolicy;
    const std::shared_ptr<Ice::ObjectPrx> _topicReplicaProxy;
    const std::shared_ptr<Ice::ObjectPrx> _publisherReplicaProxy;
    const std::shared_ptr<TopicReaper> _topicReaper;
    std::shared_ptr<IceStormElection::NodeI> _node;
    std::shared_ptr<IceStormElection::Observers> _observers;
    IceUtil::TimerPtr _timer;
    std::shared_ptr<IceStorm::Instrumentation::TopicManagerObserver> _observer;
};

using SubscriberMapRWCursor = IceDB::ReadWriteCursor<SubscriberRecordKey,
                                                     SubscriberRecord,
                                                     IceDB::IceContext,
                                                     Ice::OutputStream>;

class PersistentInstance final : public Instance
{
public:

    PersistentInstance(const std::string&, const std::string&, std::shared_ptr<Ice::Communicator>,
                       std::shared_ptr<Ice::ObjectAdapter>, std::shared_ptr<Ice::ObjectAdapter>,
                       std::shared_ptr<Ice::ObjectAdapter> = nullptr,
                       std::shared_ptr<IceStormElection::NodePrx> = nullptr);

    const IceDB::Env& dbEnv() const { return _dbEnv; }
    LLUMap lluMap() const { return _lluMap; }
    SubscriberMap subscriberMap() const { return _subscriberMap; }

    void destroy() override;

private:

    IceUtilInternal::FileLock _dbLock;
    IceDB::Env _dbEnv;
    LLUMap _lluMap;
    SubscriberMap _subscriberMap;
};

} // End namespace IceStorm

#endif