diff options
author | Joe George <joe@zeroc.com> | 2021-01-28 16:26:44 -0500 |
---|---|---|
committer | Joe George <joe@zeroc.com> | 2021-02-01 16:59:30 -0500 |
commit | 92a6531e409f2691d82591e185a92299d415fc0f (patch) | |
tree | 60c79e2a8f327b8f0b6ebc06b06f48a2e8086f6a /cpp/src/IceGrid/Topics.cpp | |
parent | Port Glacier2, IceBox, IceBridge, IceDB, IceXML, icegriddb (diff) | |
download | ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.bz2 ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.xz ice-92a6531e409f2691d82591e185a92299d415fc0f.zip |
IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 381 |
1 files changed, 191 insertions, 190 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 8ab60af908a..119209dd636 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -24,14 +24,15 @@ Ice::EncodingVersion encodings[] = { } -ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name, Ice::Long dbSerial) : +ObserverTopic::ObserverTopic(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager, const string& name, + long long dbSerial) : _logger(topicManager->ice_getCommunicator()->getLogger()), _serial(0), _dbSerial(dbSerial) { for(int i = 0; i < static_cast<int>(sizeof(encodings) / sizeof(Ice::EncodingVersion)); ++i) { ostringstream os; os << name << "-" << Ice::encodingVersionToString(encodings[i]); - IceStorm::TopicPrx t; + shared_ptr<IceStorm::TopicPrx> t; try { t = topicManager->create(os.str()); @@ -51,14 +52,10 @@ ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, cons } } -ObserverTopic::~ObserverTopic() -{ -} - int -ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name) +ObserverTopic::subscribe(const shared_ptr<Ice::ObjectPrx>& obsv, const string& name) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; @@ -70,7 +67,7 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name) IceStorm::QoS qos; qos["reliability"] = "ordered"; Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(obsv->ice_getEncodingVersion()); - map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator p = _topics.find(v); + auto p = _topics.find(v); if(p == _topics.end()) { Ice::Warning out(_logger); @@ -95,11 +92,11 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name) } void -ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name) +ObserverTopic::unsubscribe(const shared_ptr<Ice::ObjectPrx>& observer, const string& name) { - Lock sync(*this); + lock_guard lock(_mutex); Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(observer->ice_getEncodingVersion()); - map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator q = _topics.find(v); + auto q = _topics.find(v); if(q == _topics.end()) { return; @@ -119,7 +116,7 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name) assert(_syncSubscribers.find(name) != _syncSubscribers.end()); _syncSubscribers.erase(name); - map<int, set<string> >::iterator p = _waitForUpdates.begin(); + auto p = _waitForUpdates.begin(); bool notifyMonitor = false; while(p != _waitForUpdates.end()) { @@ -137,7 +134,7 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name) if(notifyMonitor) { - notifyAll(); + _condVar.notify_all(); } } } @@ -145,15 +142,15 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name) void ObserverTopic::destroy() { - Lock sync(*this); + lock_guard lock(_mutex); _topics.clear(); - notifyAll(); + _condVar.notify_all(); } void ObserverTopic::receivedUpdate(const string& name, int serial, const string& failure) { - Lock sync(*this); + lock_guard lock(_mutex); map<int, set<string> >::iterator p = _waitForUpdates.find(serial); if(p != _waitForUpdates.end()) { @@ -174,47 +171,15 @@ ObserverTopic::receivedUpdate(const string& name, int serial, const string& fail _waitForUpdates.erase(p); } - notifyAll(); + _condVar.notify_all(); } } void ObserverTopic::waitForSyncedSubscribers(int serial, const string& name) { - Lock sync(*this); - waitForSyncedSubscribersNoSync(serial, name); -} - -int -ObserverTopic::getSerial() const -{ - Lock sync(*this); - return _serial; -} - -void -ObserverTopic::addExpectedUpdate(int serial, const string& name) -{ - if(_syncSubscribers.empty() && name.empty()) - { - return; - } + unique_lock lock(_mutex); - // Must be called with the lock held. - if(name.empty()) - { - assert(_waitForUpdates[serial].empty()); - _waitForUpdates[serial] = _syncSubscribers; - } - else - { - _waitForUpdates[serial].insert(name); - } -} - -void -ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) -{ if(serial < 0) { return; @@ -225,10 +190,10 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) // while(!_topics.empty()) { - map<int, set<string> >::const_iterator p = _waitForUpdates.find(serial); + auto p = _waitForUpdates.find(serial); if(p == _waitForUpdates.end()) { - map<int, map<string, string> >::iterator q = _updateFailures.find(serial); + auto q = _updateFailures.find(serial); if(q != _updateFailures.end()) { map<string, string> failures = q->second; @@ -250,13 +215,40 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) { return; } - wait(); + _condVar.wait(lock); } } } +int +ObserverTopic::getSerial() const +{ + lock_guard lock(_mutex); + return _serial; +} + +void +ObserverTopic::addExpectedUpdate(int serial, const string& name) +{ + if(_syncSubscribers.empty() && name.empty()) + { + return; + } + + // Must be called with the lock held. + if(name.empty()) + { + assert(_waitForUpdates[serial].empty()); + _waitForUpdates[serial] = _syncSubscribers; + } + else + { + _waitForUpdates[serial].insert(name); + } +} + void -ObserverTopic::updateSerial(Ice::Long dbSerial) +ObserverTopic::updateSerial(long long dbSerial) { ++_serial; if(dbSerial > 0) @@ -266,7 +258,7 @@ ObserverTopic::updateSerial(Ice::Long dbSerial) } Ice::Context -ObserverTopic::getContext(int serial, Ice::Long dbSerial) const +ObserverTopic::getContext(int serial, long long dbSerial) const { Ice::Context context; { @@ -283,7 +275,7 @@ ObserverTopic::getContext(int serial, Ice::Long dbSerial) const return context; } -RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : +RegistryObserverTopic::RegistryObserverTopic(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager) : ObserverTopic(topicManager, "RegistryObserver") { _publishers = getPublishers<RegistryObserverPrx>(); @@ -292,18 +284,18 @@ RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& to void RegistryObserverTopic::registryUp(const RegistryInfo& info) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return; } updateSerial(); - _registries.insert(make_pair(info.name, info)); + _registries.insert({ info.name, info }); try { - for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->registryUp(info); + publisher->registryUp(info); } } catch(const Ice::LocalException& ex) @@ -316,7 +308,7 @@ RegistryObserverTopic::registryUp(const RegistryInfo& info) void RegistryObserverTopic::registryDown(const string& name) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return; @@ -331,9 +323,9 @@ RegistryObserverTopic::registryDown(const string& name) _registries.erase(name); try { - for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->registryDown(name); + publisher->registryDown(name); } } catch(const Ice::LocalException& ex) @@ -344,57 +336,67 @@ RegistryObserverTopic::registryDown(const string& name) } void -RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +RegistryObserverTopic::initObserver(const shared_ptr<Ice::ObjectPrx>& obsv) { - RegistryObserverPrx observer = RegistryObserverPrx::uncheckedCast(obsv); + auto observer = Ice::uncheckedCast<RegistryObserverPrx>(obsv); RegistryInfoSeq registries; registries.reserve(_registries.size()); - for(map<string, RegistryInfo>::const_iterator p = _registries.begin(); p != _registries.end(); ++p) + for(const auto& registry : _registries) { - registries.push_back(p->second); + registries.push_back(registry.second); } observer->registryInit(registries, getContext(_serial)); } -NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager, - const Ice::ObjectAdapterPtr& adapter) : - ObserverTopic(topicManager, "NodeObserver") +shared_ptr<NodeObserverTopic> +NodeObserverTopic::create(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager, + const shared_ptr<Ice::ObjectAdapter>& adapter) { - _publishers = getPublishers<NodeObserverPrx>(); + shared_ptr<NodeObserverTopic> topic(new NodeObserverTopic(topicManager)); + try { - const_cast<NodeObserverPrx&>(_externalPublisher) = NodeObserverPrx::uncheckedCast(adapter->addWithUUID(this)); + const_cast<shared_ptr<NodeObserverPrx>&>(topic->_externalPublisher) = + Ice::uncheckedCast<NodeObserverPrx>(adapter->addWithUUID(topic)); } catch(const Ice::LocalException&) { } + + return topic; +} + +NodeObserverTopic::NodeObserverTopic(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager) : + ObserverTopic(topicManager, "NodeObserver") +{ + _publishers = getPublishers<NodeObserverPrx>(); } void -NodeObserverTopic::nodeInit(const NodeDynamicInfoSeq&, const Ice::Current&) +NodeObserverTopic::nodeInit(NodeDynamicInfoSeq, const Ice::Current&) { assert(false); } void -NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&) +NodeObserverTopic::nodeUp(NodeDynamicInfo info, const Ice::Current&) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return; } updateSerial(); - _nodes.insert(make_pair(info.info.name, info)); - for(ServerDynamicInfoSeq::const_iterator p = info.servers.begin(); p != info.servers.end(); ++p) + _nodes.insert({ info.info.name, info }); + for(const auto& server : info.servers) { - _serverStatus[p->id] = p->enabled; + _serverStatus[server.id] = server.enabled; } try { - for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->nodeUp(info); + publisher->nodeUp(info); } } catch(const Ice::LocalException& ex) @@ -405,15 +407,15 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&) } void -NodeObserverTopic::nodeDown(const string& /*name*/, const Ice::Current&) +NodeObserverTopic::nodeDown(string, const Ice::Current&) { assert(false); } void -NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&) +NodeObserverTopic::updateServer(string node, ServerDynamicInfo server, const Ice::Current&) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return; @@ -435,7 +437,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser { if(p->id == server.id) { - if(server.state == Destroyed || (server.state == Inactive && server.enabled)) + if(server.state == ServerState::Destroyed || (server.state == ServerState::Inactive && server.enabled)) { servers.erase(p); } @@ -447,12 +449,13 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser } ++p; } - if(server.state != Destroyed && (server.state != Inactive || !server.enabled) && p == servers.end()) + if(server.state != ServerState::Destroyed && (server.state != ServerState::Inactive + || !server.enabled) && p == servers.end()) { servers.push_back(server); } - if(server.state != Destroyed) + if(server.state != ServerState::Destroyed) { _serverStatus[server.id] = server.enabled; } @@ -463,9 +466,9 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser try { - for(vector<NodeObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q) + for(const auto& publisher : _publishers) { - (*q)->updateServer(node, server); + publisher->updateServer(node, server); } } catch(const Ice::LocalException& ex) @@ -476,9 +479,9 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser } void -NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&) +NodeObserverTopic::updateAdapter(string node, AdapterDynamicInfo adapter, const Ice::Current&) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return; @@ -519,9 +522,9 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a try { - for(vector<NodeObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q) + for(const auto& publisher : _publishers) { - (*q)->updateAdapter(node, adapter); + publisher->updateAdapter(node, adapter); } } catch(const Ice::LocalException& ex) @@ -534,7 +537,7 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a void NodeObserverTopic::nodeDown(const string& name) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return; @@ -548,17 +551,17 @@ NodeObserverTopic::nodeDown(const string& name) } ServerDynamicInfoSeq& servers = _nodes[name].servers; - for(ServerDynamicInfoSeq::const_iterator p = servers.begin(); p != servers.end(); ++p) + for(const auto& server : servers) { - _serverStatus.erase(p->id); + _serverStatus.erase(server.id); } _nodes.erase(name); try { - for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->nodeDown(name); + publisher->nodeDown(name); } } catch(const Ice::LocalException& ex) @@ -569,14 +572,14 @@ NodeObserverTopic::nodeDown(const string& name) } void -NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +NodeObserverTopic::initObserver(const shared_ptr<Ice::ObjectPrx>& obsv) { - NodeObserverPrx observer = NodeObserverPrx::uncheckedCast(obsv); + auto observer = Ice::uncheckedCast<NodeObserverPrx>(obsv); NodeDynamicInfoSeq nodes; nodes.reserve(_nodes.size()); - for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p) + for(const auto& node : _nodes ) { - nodes.push_back(p->second); + nodes.push_back(node.second); } observer->nodeInit(nodes, getContext(_serial)); } @@ -584,7 +587,7 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv) bool NodeObserverTopic::isServerEnabled(const string& server) const { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return false; @@ -600,8 +603,8 @@ NodeObserverTopic::isServerEnabled(const string& server) const } } -ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager, - const map<string, ApplicationInfo>& applications, Ice::Long serial) : +ApplicationObserverTopic::ApplicationObserverTopic(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager, + const map<string, ApplicationInfo>& applications, long long serial) : ObserverTopic(topicManager, "ApplicationObserver", serial), _applications(applications) { @@ -609,9 +612,9 @@ ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerP } int -ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationInfoSeq& apps) +ApplicationObserverTopic::applicationInit(long long dbSerial, const ApplicationInfoSeq& apps) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; @@ -624,9 +627,9 @@ ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationI } try { - for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->applicationInit(_serial, apps, getContext(_serial, dbSerial)); + publisher->applicationInit(_serial, apps, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -639,9 +642,9 @@ ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationI } int -ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const ApplicationInfo& info) +ApplicationObserverTopic::applicationAdded(long long dbSerial, const ApplicationInfo& info) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; @@ -651,9 +654,9 @@ ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const Application _applications.insert(make_pair(info.descriptor.name, info)); try { - for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->applicationAdded(_serial, info, getContext(_serial, dbSerial)); + publisher->applicationAdded(_serial, info, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -666,9 +669,9 @@ ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const Application } int -ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& name) +ApplicationObserverTopic::applicationRemoved(long long dbSerial, const string& name) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; @@ -677,9 +680,9 @@ ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& n _applications.erase(name); try { - for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->applicationRemoved(_serial, name, getContext(_serial, dbSerial)); + publisher->applicationRemoved(_serial, name, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -692,9 +695,9 @@ ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& n } int -ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const ApplicationUpdateInfo& info) +ApplicationObserverTopic::applicationUpdated(long long dbSerial, const ApplicationUpdateInfo& info) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; @@ -733,9 +736,9 @@ ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const Applicati } try { - for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->applicationUpdated(_serial, info, getContext(_serial, dbSerial)); + publisher->applicationUpdated(_serial, info, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -748,19 +751,19 @@ ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const Applicati } void -ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +ApplicationObserverTopic::initObserver(const shared_ptr<Ice::ObjectPrx>& obsv) { - ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv); + auto observer = Ice::uncheckedCast<ApplicationObserverPrx>(obsv); ApplicationInfoSeq applications; - for(map<string, ApplicationInfo>::const_iterator p = _applications.begin(); p != _applications.end(); ++p) + for(const auto& application : _applications) { - applications.push_back(p->second); + applications.push_back(application.second); } observer->applicationInit(_serial, applications, getContext(_serial, _dbSerial)); } -AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager, - const map<string, AdapterInfo>& adapters, Ice::Long serial) : +AdapterObserverTopic::AdapterObserverTopic(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager, + const map<string, AdapterInfo>& adapters, long long serial) : ObserverTopic(topicManager, "AdapterObserver", serial), _adapters(adapters) { @@ -768,24 +771,24 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi } int -AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpts) +AdapterObserverTopic::adapterInit(long long dbSerial, const AdapterInfoSeq& adpts) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; } updateSerial(dbSerial); _adapters.clear(); - for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q) + for(const auto& adpt : adpts) { - _adapters.insert(make_pair(q->id, *q)); + _adapters.insert({ adpt.id, adpt }); } try { - for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->adapterInit(adpts, getContext(_serial, dbSerial)); + publisher->adapterInit(adpts, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -798,9 +801,9 @@ AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpt } int -AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info) +AdapterObserverTopic::adapterAdded(long long dbSerial, const AdapterInfo& info) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; @@ -809,9 +812,9 @@ AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info) _adapters.insert(make_pair(info.id, info)); try { - for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->adapterAdded(info, getContext(_serial, dbSerial)); + publisher->adapterAdded(info, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -824,9 +827,9 @@ AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info) } int -AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info) +AdapterObserverTopic::adapterUpdated(long long dbSerial, const AdapterInfo& info) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; @@ -835,9 +838,9 @@ AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info _adapters[info.id] = info; try { - for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->adapterUpdated(info, getContext(_serial, dbSerial)); + publisher->adapterUpdated(info, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -850,9 +853,9 @@ AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info } int -AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id) +AdapterObserverTopic::adapterRemoved(long long dbSerial, const string& id) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; @@ -861,9 +864,9 @@ AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id) _adapters.erase(id); try { - for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->adapterRemoved(id, getContext(_serial, dbSerial)); + publisher->adapterRemoved(id, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -876,19 +879,19 @@ AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id) } void -AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +AdapterObserverTopic::initObserver(const shared_ptr<Ice::ObjectPrx>& obsv) { - AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv); + auto observer = Ice::uncheckedCast<AdapterObserverPrx>(obsv); AdapterInfoSeq adapters; - for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + for(const auto& adapter : _adapters) { - adapters.push_back(p->second); + adapters.push_back(adapter.second); } observer->adapterInit(adapters, getContext(_serial, _dbSerial)); } -ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager, - const map<Ice::Identity, ObjectInfo>& objects, Ice::Long serial) : +ObjectObserverTopic::ObjectObserverTopic(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager, + const map<Ice::Identity, ObjectInfo>& objects, long long serial) : ObserverTopic(topicManager, "ObjectObserver", serial), _objects(objects) { @@ -896,24 +899,24 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM } int -ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects) +ObjectObserverTopic::objectInit(long long dbSerial, const ObjectInfoSeq& objects) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; } updateSerial(dbSerial); _objects.clear(); - for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r) + for(const auto& object : objects) { - _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r)); + _objects.insert(make_pair(object.proxy->ice_getIdentity(), object)); } try { - for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->objectInit(objects, getContext(_serial, dbSerial)); + publisher->objectInit(objects, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -926,9 +929,9 @@ ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects } int -ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info) +ObjectObserverTopic::objectAdded(long long dbSerial, const ObjectInfo& info) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; @@ -937,9 +940,9 @@ ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info) _objects.insert(make_pair(info.proxy->ice_getIdentity(), info)); try { - for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->objectAdded(info, getContext(_serial, dbSerial)); + publisher->objectAdded(info, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -952,9 +955,9 @@ ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info) } int -ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info) +ObjectObserverTopic::objectUpdated(long long dbSerial, const ObjectInfo& info) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; @@ -963,9 +966,9 @@ ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info) _objects[info.proxy->ice_getIdentity()] = info; try { - for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->objectUpdated(info, getContext(_serial, dbSerial)); + publisher->objectUpdated(info, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -978,9 +981,9 @@ ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info) } int -ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id) +ObjectObserverTopic::objectRemoved(long long dbSerial, const Ice::Identity& id) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; @@ -989,9 +992,9 @@ ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id) _objects.erase(id); try { - for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) + for(const auto& publisher : _publishers) { - (*p)->objectRemoved(id, getContext(_serial, dbSerial)); + publisher->objectRemoved(id, getContext(_serial, dbSerial)); } } catch(const Ice::LocalException& ex) @@ -1006,24 +1009,24 @@ ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id) int ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; } - for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) + for(const auto& info : infos) { updateSerial(); - map<Ice::Identity, ObjectInfo>::iterator q = _objects.find(p->proxy->ice_getIdentity()); + auto q = _objects.find(info.proxy->ice_getIdentity()); if(q != _objects.end()) { - q->second = *p; + q->second = info; try { - for(vector<ObjectObserverPrx>::const_iterator r = _publishers.begin(); r != _publishers.end(); ++r) + for(const auto& publisher : _publishers) { - (*r)->objectUpdated(*p, getContext(_serial)); + publisher->objectUpdated(info, getContext(_serial)); } } catch(const Ice::LocalException& ex) @@ -1034,12 +1037,12 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos) } else { - _objects.insert(make_pair(p->proxy->ice_getIdentity(), *p)); + _objects.insert(make_pair(info.proxy->ice_getIdentity(), info)); try { - for(vector<ObjectObserverPrx>::const_iterator r = _publishers.begin(); r != _publishers.end(); ++r) + for(const auto& publisher : _publishers) { - (*r)->objectAdded(*p, getContext(_serial)); + publisher->objectAdded(info, getContext(_serial)); } } catch(const Ice::LocalException& ex) @@ -1055,28 +1058,27 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos) // here. This operation is called by ReplicaSessionI. // addExpectedUpdate(_serial); - //waitForSyncedSubscribersNoSync(_serial); return _serial; } int ObjectObserverTopic::wellKnownObjectsRemoved(const ObjectInfoSeq& infos) { - Lock sync(*this); + lock_guard lock(_mutex); if(_topics.empty()) { return -1; } - for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) + for(const auto& info : infos) { updateSerial(); - _objects.erase(p->proxy->ice_getIdentity()); + _objects.erase(info.proxy->ice_getIdentity()); try { - for(vector<ObjectObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q) + for(const auto& publisher : _publishers) { - (*q)->objectRemoved(p->proxy->ice_getIdentity(), getContext(_serial)); + publisher->objectRemoved(info.proxy->ice_getIdentity(), getContext(_serial)); } } catch(const Ice::LocalException& ex) @@ -1088,22 +1090,21 @@ ObjectObserverTopic::wellKnownObjectsRemoved(const ObjectInfoSeq& infos) // // We don't need to wait for the update to be received by the - // replicas here. This operation is only called internaly by + // replicas here. This operation is only called internally by // IceGrid. // addExpectedUpdate(_serial); - //waitForSyncedSubscribersNoSync(_serial); return _serial; } void -ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +ObjectObserverTopic::initObserver(const shared_ptr<Ice::ObjectPrx>& obsv) { - ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv); + auto observer = Ice::uncheckedCast<ObjectObserverPrx>(obsv); ObjectInfoSeq objects; - for(map<Ice::Identity, ObjectInfo>::const_iterator p = _objects.begin(); p != _objects.end(); ++p) + for(const auto& object : _objects) { - objects.push_back(p->second); + objects.push_back(object.second); } observer->objectInit(objects, getContext(_serial, _dbSerial)); } |