diff options
author | Joe George <joe@zeroc.com> | 2021-01-28 16:26:44 -0500 |
---|---|---|
committer | Joe George <joe@zeroc.com> | 2021-02-01 16:59:30 -0500 |
commit | 92a6531e409f2691d82591e185a92299d415fc0f (patch) | |
tree | 60c79e2a8f327b8f0b6ebc06b06f48a2e8086f6a /cpp/src/IceStorm/TopicManagerI.cpp | |
parent | Port Glacier2, IceBox, IceBridge, IceDB, IceXML, icegriddb (diff) | |
download | ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.bz2 ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.xz ice-92a6531e409f2691d82591e185a92299d415fc0f.zip |
IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.cpp | 328 |
1 files changed, 161 insertions, 167 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp index 194a896a934..7215b5c7e21 100644 --- a/cpp/src/IceStorm/TopicManagerI.cpp +++ b/cpp/src/IceStorm/TopicManagerI.cpp @@ -22,32 +22,32 @@ namespace { void -logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex) +logError(const shared_ptr<Ice::Communicator>& com, const IceDB::LMDBException& ex) { Ice::Error error(com->getLogger()); error << "LMDB error: " << ex; } -class TopicManagerI : public TopicManagerInternal +class TopicManagerI final : public TopicManagerInternal { public: - TopicManagerI(const PersistentInstancePtr& instance, const TopicManagerImplPtr& impl) : - _instance(instance), _impl(impl) + TopicManagerI(shared_ptr<PersistentInstance> instance, shared_ptr<TopicManagerImpl> impl) : + _instance(move(instance)), _impl(move(impl)) { } - virtual TopicPrx create(const string& id, const Ice::Current&) + shared_ptr<TopicPrx> create(string id, const Ice::Current&) override { while(true) { - Ice::Long generation; - TopicManagerPrx master = getMaster(generation, __FILE__, __LINE__); + long long generation; + auto master = getMaster(generation, __FILE__, __LINE__); if(master) { try { - return master->create(id); + return master->create(move(id)); } catch(const Ice::ConnectFailedException&) { @@ -63,26 +63,26 @@ public: else { FinishUpdateHelper unlock(_instance->node()); - return _impl->create(id); + return _impl->create(move(id)); } } } - virtual TopicPrx retrieve(const string& id, const Ice::Current&) const + shared_ptr<TopicPrx> retrieve(string id, const Ice::Current&) override { // Use cached reads. CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); - return _impl->retrieve(id); + return _impl->retrieve(move(id)); } - virtual TopicDict retrieveAll(const Ice::Current&) const + TopicDict retrieveAll(const Ice::Current&) override { // Use cached reads. CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); return _impl->retrieveAll(); } - virtual NodePrx getReplicaNode(const Ice::Current&) const + shared_ptr<NodePrx> getReplicaNode(const Ice::Current&) const override { // This doesn't require the replication to be running. return _instance->nodeProxy(); @@ -90,49 +90,49 @@ public: private: - TopicManagerPrx getMaster(Ice::Long& generation, const char* file, int line) const + shared_ptr<TopicManagerPrx> getMaster(long long& generation, const char* file, int line) const { - NodeIPtr node = _instance->node(); + auto node = _instance->node(); if(node) { - return TopicManagerPrx::uncheckedCast(node->startUpdate(generation, file, line)); + return Ice::uncheckedCast<TopicManagerPrx>(node->startUpdate(generation, file, line)); } else { - return TopicManagerPrx(); + return nullptr; } } - const PersistentInstancePtr _instance; - const TopicManagerImplPtr _impl; + const shared_ptr<PersistentInstance> _instance; + const shared_ptr<TopicManagerImpl> _impl; }; -class ReplicaObserverI : public ReplicaObserver +class ReplicaObserverI final : public ReplicaObserver { public: - ReplicaObserverI(const PersistentInstancePtr& instance, const TopicManagerImplPtr& impl) : - _instance(instance), - _impl(impl) + ReplicaObserverI(shared_ptr<PersistentInstance> instance, shared_ptr<TopicManagerImpl> impl) : + _instance(move(instance)), + _impl(move(impl)) { } - virtual void init(const LogUpdate& llu, const TopicContentSeq& content, const Ice::Current&) + void init(LogUpdate llu, TopicContentSeq content, const Ice::Current&) override { - NodeIPtr node = _instance->node(); + auto node = _instance->node(); if(node) { node->checkObserverInit(llu.generation); } - _impl->observerInit(llu, content); + _impl->observerInit(move(llu), move(content)); } - virtual void createTopic(const LogUpdate& llu, const string& name, const Ice::Current&) + void createTopic(LogUpdate llu, string name, const Ice::Current&) override { try { ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__); - _impl->observerCreateTopic(llu, name); + _impl->observerCreateTopic(llu, move(name)); } catch(const ObserverInconsistencyException& e) { @@ -143,12 +143,12 @@ public: } } - virtual void destroyTopic(const LogUpdate& llu, const string& name, const Ice::Current&) + void destroyTopic(LogUpdate llu, string name, const Ice::Current&) override { try { ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__); - _impl->observerDestroyTopic(llu, name); + _impl->observerDestroyTopic(llu, move(name)); } catch(const ObserverInconsistencyException& e) { @@ -159,13 +159,12 @@ public: } } - virtual void addSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& rec, - const Ice::Current&) + void addSubscriber(LogUpdate llu, string name, SubscriberRecord rec, const Ice::Current&) override { try { ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__); - _impl->observerAddSubscriber(llu, name, rec); + _impl->observerAddSubscriber(llu, move(name), move(rec)); } catch(const ObserverInconsistencyException& e) { @@ -176,13 +175,12 @@ public: } } - virtual void removeSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id, - const Ice::Current&) + void removeSubscriber(LogUpdate llu, string name, Ice::IdentitySeq id, const Ice::Current&) override { try { ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__); - _impl->observerRemoveSubscriber(llu, name, id); + _impl->observerRemoveSubscriber(llu, move(name), move(id)); } catch(const ObserverInconsistencyException& e) { @@ -195,110 +193,109 @@ public: private: - const PersistentInstancePtr _instance; - const TopicManagerImplPtr _impl; + const shared_ptr<PersistentInstance> _instance; + const shared_ptr<TopicManagerImpl> _impl; }; -class TopicManagerSyncI : public TopicManagerSync +class TopicManagerSyncI final : public TopicManagerSync { public: - TopicManagerSyncI(const TopicManagerImplPtr& impl) : - _impl(impl) + TopicManagerSyncI(shared_ptr<TopicManagerImpl> impl) : + _impl(move(impl)) { } - virtual void getContent(LogUpdate& llu, TopicContentSeq& content, const Ice::Current&) + void getContent(LogUpdate& llu, TopicContentSeq& content, const Ice::Current&) override { _impl->getContent(llu, content); } private: - const TopicManagerImplPtr _impl; + const shared_ptr<TopicManagerImpl> _impl; }; } -TopicManagerImpl::TopicManagerImpl(const PersistentInstancePtr& instance) : - _instance(instance), - _lluMap(instance->lluMap()), - _subscriberMap(instance->subscriberMap()) +shared_ptr<TopicManagerImpl> +TopicManagerImpl::create(const std::shared_ptr<PersistentInstance>& instance) { - try + shared_ptr<TopicManagerImpl> manager(new TopicManagerImpl(instance)); + + if(instance->observer()) { - __setNoDelete(true); + instance->observer()->setObserverUpdater(manager); + } - if(_instance->observer()) - { - _instance->observer()->setObserverUpdater(this); - } + manager->_managerImpl = make_shared<TopicManagerI>(instance, manager); - // TODO: If we want to improve the performance of the - // non-replicated case we could allocate a null-topic manager impl - // here. - _managerImpl = new TopicManagerI(instance, this); + // If there is no node adapter we don't need to start the + // observer, nor sync since we're not replicating. + if(instance->nodeAdapter()) + { + auto observerImpl = make_shared<ReplicaObserverI>(instance, manager); + manager->_observer = instance->nodeAdapter()->addWithUUID(observerImpl); + auto syncImpl = make_shared<TopicManagerSyncI>(manager); + manager->_sync = instance->nodeAdapter()->addWithUUID(syncImpl); + } - // If there is no node adapter we don't need to start the - // observer, nor sync since we're not replicating. - if(_instance->nodeAdapter()) - { - _observerImpl = new ReplicaObserverI(instance, this); - _observer = _instance->nodeAdapter()->addWithUUID(_observerImpl); - _syncImpl = new TopicManagerSyncI(this); - _sync = _instance->nodeAdapter()->addWithUUID(_syncImpl); - } + return manager; +} - { - IceDB::ReadWriteTxn txn(_instance->dbEnv()); +TopicManagerImpl::TopicManagerImpl(shared_ptr<PersistentInstance> instance) : + _instance(move(instance)), + _lluMap(_instance->lluMap()), + _subscriberMap(_instance->subscriberMap()) +{ + try + { + IceDB::ReadWriteTxn txn(_instance->dbEnv()); - // Ensure that the llu counter is present in the log. - LogUpdate empty = {0, 0}; - _instance->lluMap().put(txn, lluDbKey, empty); + // Ensure that the llu counter is present in the log. + LogUpdate empty = {0, 0}; + _instance->lluMap().put(txn, lluDbKey, empty); - // Recreate each of the topics. - SubscriberRecordKey k; - SubscriberRecord v; + // Recreate each of the topics. + SubscriberRecordKey k; + SubscriberRecord v; - SubscriberMapRWCursor cursor(_subscriberMap, txn); - if(cursor.get(k, v, MDB_FIRST)) + SubscriberMapRWCursor cursor(_subscriberMap, txn); + if(cursor.get(k, v, MDB_FIRST)) + { + bool moreTopics = false; + do { - bool moreTopics = false; - do - { - // This record has to be a place holder record, otherwise - // there is a database bug. - assert(k.id.name.empty() && k.id.category.empty()); + // This record has to be a place holder record, otherwise + // there is a database bug. + assert(k.id.name.empty() && k.id.category.empty()); - Ice::Identity topic = k.topic; + Ice::Identity topic = k.topic; - SubscriberRecordSeq content; - while((moreTopics = cursor.get(k, v, MDB_NEXT)) == true && k.topic == topic) - { - content.push_back(v); - } - - string name = identityToTopicName(topic); - installTopic(name, topic, false, content); - } while(moreTopics); - } + SubscriberRecordSeq content; + while((moreTopics = cursor.get(k, v, MDB_NEXT)) == true && k.topic == topic) + { + content.push_back(v); + } - txn.commit(); + string name = identityToTopicName(topic); + installTopic(name, topic, false, content); + } while(moreTopics); } + + txn.commit(); } - catch(...) + catch(const std::exception&) { shutdown(); - __setNoDelete(false); throw; } - __setNoDelete(false); } -TopicPrx +shared_ptr<TopicPrx> TopicManagerImpl::create(const string& name) { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); reap(); if(_topics.find(name) != _topics.end()) @@ -336,15 +333,14 @@ TopicManagerImpl::create(const string& name) return installTopic(name, id, true); } -TopicPrx -TopicManagerImpl::retrieve(const string& name) const +shared_ptr<TopicPrx> +TopicManagerImpl::retrieve(const string& name) { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); - TopicManagerImpl* This = const_cast<TopicManagerImpl*>(this); - This->reap(); + reap(); - map<string, TopicImplPtr>::const_iterator p = _topics.find(name); + auto p = _topics.find(name); if(p == _topics.end()) { throw NoSuchTopic(name); @@ -354,17 +350,16 @@ TopicManagerImpl::retrieve(const string& name) const } TopicDict -TopicManagerImpl::retrieveAll() const +TopicManagerImpl::retrieveAll() { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); - TopicManagerImpl* This = const_cast<TopicManagerImpl*>(this); - This->reap(); + reap(); TopicDict all; - for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) + for(auto p = _topics.begin(); p != _topics.end(); ++p) { - all.insert(TopicDict::value_type(p->first, p->second->proxy())); + all.insert({ p->first, p->second->proxy() }); } return all; @@ -373,19 +368,19 @@ TopicManagerImpl::retrieveAll() const void TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& content) { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topicMgr > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat); out << "init"; - for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p) + for(const auto& c : content) { - out << " topic: " << _instance->communicator()->identityToString(p->id) << " subscribers: "; - for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q) + out << " topic: " << _instance->communicator()->identityToString(c.id) << " subscribers: "; + for(auto q = c.records.cbegin(); q != c.records.cend(); ++q) { - if(q != p->records.begin()) + if(q != c.records.begin()) { out << ","; } @@ -409,22 +404,23 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont _subscriberMap.clear(txn); for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p) + for(const auto& c : content) { SubscriberRecordKey srkey; - srkey.topic = p->id; + srkey.topic = c.id; SubscriberRecord rec; rec.link = false; rec.cost = 0; _subscriberMap.put(txn, srkey, rec); - for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q) + for(const auto& record : c.records) { SubscriberRecordKey key; - key.topic = p->id; - key.id = q->id; + key.topic = c.id; + key.id = record.id; - _subscriberMap.put(txn, key, *q); + _subscriberMap.put(txn, key, record); } } txn.commit(); @@ -440,7 +436,7 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont // runs through the init list and either adds the ones that don't // exist, or updates those that do. - map<string, TopicImplPtr>::iterator p = _topics.begin(); + auto p = _topics.begin(); while(p != _topics.end()) { TopicContentSeq::const_iterator q; @@ -471,10 +467,10 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont // Now run through the contents updating the topics that do exist, // and creating those that do not. - for(TopicContentSeq::const_iterator q = content.begin(); q != content.end(); ++q) + for(auto q = content.cbegin(); q != content.cend(); ++q) { string name = identityToTopicName(q->id); - map<string, TopicImplPtr>::const_iterator r = _topics.find(name); + auto r = _topics.find(name); if(r == _topics.end()) { installTopic(name, q->id, true, q->records); @@ -491,7 +487,7 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont void TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name) { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); Ice::Identity id = nameToIdentity(_instance, name); try @@ -526,9 +522,9 @@ TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name) void TopicManagerImpl::observerDestroyTopic(const LogUpdate& llu, const string& name) { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); - map<string, TopicImplPtr>::iterator q = _topics.find(name); + auto q = _topics.find(name); if(q == _topics.end()) { throw ObserverInconsistencyException("no topic: " + name); @@ -541,11 +537,11 @@ TopicManagerImpl::observerDestroyTopic(const LogUpdate& llu, const string& name) void TopicManagerImpl::observerAddSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& record) { - TopicImplPtr topic; + shared_ptr<TopicImpl> topic; { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); - map<string, TopicImplPtr>::iterator q = _topics.find(name); + auto q = _topics.find(name); if(q == _topics.end()) { throw ObserverInconsistencyException("no topic: " + name); @@ -559,11 +555,11 @@ TopicManagerImpl::observerAddSubscriber(const LogUpdate& llu, const string& name void TopicManagerImpl::observerRemoveSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id) { - TopicImplPtr topic; + shared_ptr<TopicImpl> topic; { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); - map<string, TopicImplPtr>::iterator q = _topics.find(name); + auto q = _topics.find(name); if(q == _topics.end()) { throw ObserverInconsistencyException("no topic: " + name); @@ -578,16 +574,16 @@ void TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content) { { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); reap(); } try { content.clear(); - for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) + for(const auto& topic : _topics) { - TopicContent rec = p->second->getContent(); + TopicContent rec = topic.second->getContent(); content.push_back(rec); } @@ -620,9 +616,9 @@ TopicManagerImpl::getLastLogUpdate() const } void -TopicManagerImpl::sync(const Ice::ObjectPrx& master) +TopicManagerImpl::sync(const shared_ptr<Ice::ObjectPrx>& master) { - TopicManagerSyncPrx sync = TopicManagerSyncPrx::uncheckedCast(master); + auto sync = Ice::uncheckedCast<TopicManagerSyncPrx>(master); LogUpdate llu; TopicContentSeq content; @@ -634,7 +630,7 @@ TopicManagerImpl::sync(const Ice::ObjectPrx& master) void TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate& llu) { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); reap(); @@ -657,9 +653,9 @@ TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate& IceDB::ReadWriteTxn txn(_instance->dbEnv()); - for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) + for(const auto& topic : _topics) { - TopicContent rec = p->second->getContent(); + TopicContent rec = topic.second->getContent(); content.push_back(rec); } @@ -677,13 +673,13 @@ TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate& _instance->observers()->init(slaves, llu, content); } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> TopicManagerImpl::getObserver() const { return _observer; } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> TopicManagerImpl::getSync() const { return _sync; @@ -695,12 +691,10 @@ TopicManagerImpl::reap() // // Always called with mutex locked. // - // Lock sync(*this); - // vector<string> reaped = _instance->topicReaper()->consumeReapedTopics(); - for(vector<string>::const_iterator p = reaped.begin(); p != reaped.end(); ++p) + for(const auto& topic : reaped) { - map<string, TopicImplPtr>::iterator q = _topics.find(*p); + auto q = _topics.find(topic); if(q != _topics.end() && q->second->destroyed()) { _topics.erase(q); @@ -711,20 +705,18 @@ TopicManagerImpl::reap() void TopicManagerImpl::shutdown() { - Lock sync(*this); + lock_guard<recursive_mutex> lg(_mutex); - for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) + for(const auto& topic : _topics) { - p->second->shutdown(); + topic.second->shutdown(); } _topics.clear(); - _observerImpl = 0; - _syncImpl = 0; - _managerImpl = 0; + _managerImpl = nullptr; } -Ice::ObjectPtr +shared_ptr<Ice::Object> TopicManagerImpl::getServant() const { return _managerImpl; @@ -733,31 +725,33 @@ TopicManagerImpl::getServant() const void TopicManagerImpl::updateTopicObservers() { - Lock sync(*this); - for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) + lock_guard<recursive_mutex> lg(_mutex); + + for(const auto& topic : _topics) { - p->second->updateObserver(); + topic.second->updateObserver(); } } void TopicManagerImpl::updateSubscriberObservers() { - Lock sync(*this); - for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) + lock_guard<recursive_mutex> lg(_mutex); + + for(const auto& topic : _topics) { - p->second->updateSubscriberObservers(); + topic.second->updateSubscriberObservers(); } } -TopicPrx +shared_ptr<TopicPrx> TopicManagerImpl::installTopic(const string& name, const Ice::Identity& id, bool create, const IceStorm::SubscriberRecordSeq& subscribers) { // // Called by constructor or with 'this' mutex locked. // - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topicMgr > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat); @@ -766,7 +760,7 @@ TopicManagerImpl::installTopic(const string& name, const Ice::Identity& id, bool out << "creating new topic \"" << name << "\". id: " << _instance->communicator()->identityToString(id) << " subscribers: "; - for(SubscriberRecordSeq::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q) + for(auto q = subscribers.cbegin(); q != subscribers.cend(); ++q) { if(q != subscribers.begin()) { @@ -784,7 +778,7 @@ TopicManagerImpl::installTopic(const string& name, const Ice::Identity& id, bool out << "loading topic \"" << name << "\" from database. id: " << _instance->communicator()->identityToString(id) << " subscribers: "; - for(SubscriberRecordSeq::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q) + for(auto q = subscribers.cbegin(); q != subscribers.cend(); ++q) { if(q != subscribers.begin()) { @@ -800,10 +794,10 @@ TopicManagerImpl::installTopic(const string& name, const Ice::Identity& id, bool } // Create topic implementation - TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers); + auto topicImpl = TopicImpl::create(_instance, name, id, subscribers); // The identity is the name of the Topic. - _topics.insert(map<string, TopicImplPtr>::value_type(name, topicImpl)); + _topics.insert({ name, topicImpl }); _instance->topicAdapter()->add(topicImpl->getServant(), id); return topicImpl->proxy(); } |