summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TransientTopicManagerI.cpp
blob: 0eaeb00b4bab6209168e5c19da432b3534da2cf4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//

#include <IceStorm/TransientTopicManagerI.h>
#include <IceStorm/TransientTopicI.h>
#include <IceStorm/TraceLevels.h>
#include <IceStorm/Instance.h>
#include <IceStorm/Subscriber.h>

#include <Ice/Ice.h>

#include <functional>

using namespace IceStorm;
using namespace std;

TransientTopicManagerImpl::TransientTopicManagerImpl(shared_ptr<Instance> instance) :
    _instance(move(instance))
{
}

shared_ptr<TopicPrx>
TransientTopicManagerImpl::create(string name, const Ice::Current&)
{
    lock_guard<mutex> lg(_mutex);

    reap();

    if(_topics.find(name) != _topics.end())
    {
        throw TopicExists(name);
    }

    Ice::Identity id = IceStormInternal::nameToIdentity(_instance, name);

    //
    // Called by constructor or with 'this' mutex locked.
    //
    auto traceLevels = _instance->traceLevels();
    if(traceLevels->topicMgr > 0)
    {
        Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
        out << "creating new topic \"" << name << "\". id: "
            << _instance->communicator()->identityToString(id);
    }

    //
    // Create topic implementation
    //
    auto topicImpl = TransientTopicImpl::create(_instance, name, id);

    //
    // The identity is the name of the Topic.
    //
    auto prx = Ice::uncheckedCast<TopicPrx>(_instance->topicAdapter()->add(topicImpl, id));
    _topics.insert({ name, topicImpl });
    return prx;
}

shared_ptr<TopicPrx>
TransientTopicManagerImpl::retrieve(string name, const Ice::Current&)
{
    lock_guard<mutex> lg(_mutex);

    reap();

    auto p = _topics.find(name);
    if(p == _topics.end())
    {
        throw NoSuchTopic(name);
    }

    // Here we cannot just reconstruct the identity since the
    // identity could be either instanceName/topic name, or if
    // created with pre-3.2 IceStorm / topic name.
    return Ice::uncheckedCast<TopicPrx>(_instance->topicAdapter()->createProxy(p->second->id()));
}

TopicDict
TransientTopicManagerImpl::retrieveAll(const Ice::Current&)
{
    lock_guard<mutex> lg(_mutex);

    reap();

    TopicDict all;
    for(const auto& topic : _topics)
    {
        //
        // Here we cannot just reconstruct the identity since the
        // identity could be either "<instanceName>/topic.<topicname>"
        // name, or if created with pre-3.2 IceStorm "/<topicname>".
        //
        all.insert({ topic.first,
                     Ice::uncheckedCast<TopicPrx>(_instance->topicAdapter()->createProxy(topic.second->id())) });
    }

    return all;
}

shared_ptr<IceStormElection::NodePrx>
TransientTopicManagerImpl::getReplicaNode(const Ice::Current&) const
{
    return nullptr;
}

void
TransientTopicManagerImpl::reap()
{
    //
    // Must be called called with mutex locked.
    //
    vector<string> reaped = _instance->topicReaper()->consumeReapedTopics();
    for(vector<string>::const_iterator p = reaped.begin(); p != reaped.end(); ++p)
    {
        auto i = _topics.find(*p);
        if(i != _topics.end() && i->second->destroyed())
        {
            auto id = i->second->id();
            auto traceLevels = _instance->traceLevels();
            if(traceLevels->topicMgr > 0)
            {
                Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
                out << "Reaping " << i->first;
            }

            try
            {
                _instance->topicAdapter()->remove(id);
            }
            catch(const Ice::ObjectAdapterDeactivatedException&)
            {
                // Ignore
            }

            _topics.erase(i);
        }
    }
}

void
TransientTopicManagerImpl::shutdown()
{
    lock_guard<mutex> lg(_mutex);

    for(const auto& topic : _topics)
    {
        topic.second->shutdown();
    }
}