summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp381
1 files changed, 191 insertions, 190 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 8ab60af908a..119209dd636 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -24,14 +24,15 @@ Ice::EncodingVersion encodings[] = {
}
-ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name, Ice::Long dbSerial) :
+ObserverTopic::ObserverTopic(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager, const string& name,
+ long long dbSerial) :
_logger(topicManager->ice_getCommunicator()->getLogger()), _serial(0), _dbSerial(dbSerial)
{
for(int i = 0; i < static_cast<int>(sizeof(encodings) / sizeof(Ice::EncodingVersion)); ++i)
{
ostringstream os;
os << name << "-" << Ice::encodingVersionToString(encodings[i]);
- IceStorm::TopicPrx t;
+ shared_ptr<IceStorm::TopicPrx> t;
try
{
t = topicManager->create(os.str());
@@ -51,14 +52,10 @@ ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, cons
}
}
-ObserverTopic::~ObserverTopic()
-{
-}
-
int
-ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
+ObserverTopic::subscribe(const shared_ptr<Ice::ObjectPrx>& obsv, const string& name)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
@@ -70,7 +67,7 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
IceStorm::QoS qos;
qos["reliability"] = "ordered";
Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(obsv->ice_getEncodingVersion());
- map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator p = _topics.find(v);
+ auto p = _topics.find(v);
if(p == _topics.end())
{
Ice::Warning out(_logger);
@@ -95,11 +92,11 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
}
void
-ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
+ObserverTopic::unsubscribe(const shared_ptr<Ice::ObjectPrx>& observer, const string& name)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(observer->ice_getEncodingVersion());
- map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator q = _topics.find(v);
+ auto q = _topics.find(v);
if(q == _topics.end())
{
return;
@@ -119,7 +116,7 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
assert(_syncSubscribers.find(name) != _syncSubscribers.end());
_syncSubscribers.erase(name);
- map<int, set<string> >::iterator p = _waitForUpdates.begin();
+ auto p = _waitForUpdates.begin();
bool notifyMonitor = false;
while(p != _waitForUpdates.end())
{
@@ -137,7 +134,7 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
if(notifyMonitor)
{
- notifyAll();
+ _condVar.notify_all();
}
}
}
@@ -145,15 +142,15 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
void
ObserverTopic::destroy()
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
_topics.clear();
- notifyAll();
+ _condVar.notify_all();
}
void
ObserverTopic::receivedUpdate(const string& name, int serial, const string& failure)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
map<int, set<string> >::iterator p = _waitForUpdates.find(serial);
if(p != _waitForUpdates.end())
{
@@ -174,47 +171,15 @@ ObserverTopic::receivedUpdate(const string& name, int serial, const string& fail
_waitForUpdates.erase(p);
}
- notifyAll();
+ _condVar.notify_all();
}
}
void
ObserverTopic::waitForSyncedSubscribers(int serial, const string& name)
{
- Lock sync(*this);
- waitForSyncedSubscribersNoSync(serial, name);
-}
-
-int
-ObserverTopic::getSerial() const
-{
- Lock sync(*this);
- return _serial;
-}
-
-void
-ObserverTopic::addExpectedUpdate(int serial, const string& name)
-{
- if(_syncSubscribers.empty() && name.empty())
- {
- return;
- }
+ unique_lock lock(_mutex);
- // Must be called with the lock held.
- if(name.empty())
- {
- assert(_waitForUpdates[serial].empty());
- _waitForUpdates[serial] = _syncSubscribers;
- }
- else
- {
- _waitForUpdates[serial].insert(name);
- }
-}
-
-void
-ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
-{
if(serial < 0)
{
return;
@@ -225,10 +190,10 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
//
while(!_topics.empty())
{
- map<int, set<string> >::const_iterator p = _waitForUpdates.find(serial);
+ auto p = _waitForUpdates.find(serial);
if(p == _waitForUpdates.end())
{
- map<int, map<string, string> >::iterator q = _updateFailures.find(serial);
+ auto q = _updateFailures.find(serial);
if(q != _updateFailures.end())
{
map<string, string> failures = q->second;
@@ -250,13 +215,40 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
{
return;
}
- wait();
+ _condVar.wait(lock);
}
}
}
+int
+ObserverTopic::getSerial() const
+{
+ lock_guard lock(_mutex);
+ return _serial;
+}
+
+void
+ObserverTopic::addExpectedUpdate(int serial, const string& name)
+{
+ if(_syncSubscribers.empty() && name.empty())
+ {
+ return;
+ }
+
+ // Must be called with the lock held.
+ if(name.empty())
+ {
+ assert(_waitForUpdates[serial].empty());
+ _waitForUpdates[serial] = _syncSubscribers;
+ }
+ else
+ {
+ _waitForUpdates[serial].insert(name);
+ }
+}
+
void
-ObserverTopic::updateSerial(Ice::Long dbSerial)
+ObserverTopic::updateSerial(long long dbSerial)
{
++_serial;
if(dbSerial > 0)
@@ -266,7 +258,7 @@ ObserverTopic::updateSerial(Ice::Long dbSerial)
}
Ice::Context
-ObserverTopic::getContext(int serial, Ice::Long dbSerial) const
+ObserverTopic::getContext(int serial, long long dbSerial) const
{
Ice::Context context;
{
@@ -283,7 +275,7 @@ ObserverTopic::getContext(int serial, Ice::Long dbSerial) const
return context;
}
-RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) :
+RegistryObserverTopic::RegistryObserverTopic(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager) :
ObserverTopic(topicManager, "RegistryObserver")
{
_publishers = getPublishers<RegistryObserverPrx>();
@@ -292,18 +284,18 @@ RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& to
void
RegistryObserverTopic::registryUp(const RegistryInfo& info)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return;
}
updateSerial();
- _registries.insert(make_pair(info.name, info));
+ _registries.insert({ info.name, info });
try
{
- for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->registryUp(info);
+ publisher->registryUp(info);
}
}
catch(const Ice::LocalException& ex)
@@ -316,7 +308,7 @@ RegistryObserverTopic::registryUp(const RegistryInfo& info)
void
RegistryObserverTopic::registryDown(const string& name)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return;
@@ -331,9 +323,9 @@ RegistryObserverTopic::registryDown(const string& name)
_registries.erase(name);
try
{
- for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->registryDown(name);
+ publisher->registryDown(name);
}
}
catch(const Ice::LocalException& ex)
@@ -344,57 +336,67 @@ RegistryObserverTopic::registryDown(const string& name)
}
void
-RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+RegistryObserverTopic::initObserver(const shared_ptr<Ice::ObjectPrx>& obsv)
{
- RegistryObserverPrx observer = RegistryObserverPrx::uncheckedCast(obsv);
+ auto observer = Ice::uncheckedCast<RegistryObserverPrx>(obsv);
RegistryInfoSeq registries;
registries.reserve(_registries.size());
- for(map<string, RegistryInfo>::const_iterator p = _registries.begin(); p != _registries.end(); ++p)
+ for(const auto& registry : _registries)
{
- registries.push_back(p->second);
+ registries.push_back(registry.second);
}
observer->registryInit(registries, getContext(_serial));
}
-NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
- const Ice::ObjectAdapterPtr& adapter) :
- ObserverTopic(topicManager, "NodeObserver")
+shared_ptr<NodeObserverTopic>
+NodeObserverTopic::create(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager,
+ const shared_ptr<Ice::ObjectAdapter>& adapter)
{
- _publishers = getPublishers<NodeObserverPrx>();
+ shared_ptr<NodeObserverTopic> topic(new NodeObserverTopic(topicManager));
+
try
{
- const_cast<NodeObserverPrx&>(_externalPublisher) = NodeObserverPrx::uncheckedCast(adapter->addWithUUID(this));
+ const_cast<shared_ptr<NodeObserverPrx>&>(topic->_externalPublisher) =
+ Ice::uncheckedCast<NodeObserverPrx>(adapter->addWithUUID(topic));
}
catch(const Ice::LocalException&)
{
}
+
+ return topic;
+}
+
+NodeObserverTopic::NodeObserverTopic(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager) :
+ ObserverTopic(topicManager, "NodeObserver")
+{
+ _publishers = getPublishers<NodeObserverPrx>();
}
void
-NodeObserverTopic::nodeInit(const NodeDynamicInfoSeq&, const Ice::Current&)
+NodeObserverTopic::nodeInit(NodeDynamicInfoSeq, const Ice::Current&)
{
assert(false);
}
void
-NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&)
+NodeObserverTopic::nodeUp(NodeDynamicInfo info, const Ice::Current&)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return;
}
updateSerial();
- _nodes.insert(make_pair(info.info.name, info));
- for(ServerDynamicInfoSeq::const_iterator p = info.servers.begin(); p != info.servers.end(); ++p)
+ _nodes.insert({ info.info.name, info });
+ for(const auto& server : info.servers)
{
- _serverStatus[p->id] = p->enabled;
+ _serverStatus[server.id] = server.enabled;
}
try
{
- for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->nodeUp(info);
+ publisher->nodeUp(info);
}
}
catch(const Ice::LocalException& ex)
@@ -405,15 +407,15 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&)
}
void
-NodeObserverTopic::nodeDown(const string& /*name*/, const Ice::Current&)
+NodeObserverTopic::nodeDown(string, const Ice::Current&)
{
assert(false);
}
void
-NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&)
+NodeObserverTopic::updateServer(string node, ServerDynamicInfo server, const Ice::Current&)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return;
@@ -435,7 +437,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
{
if(p->id == server.id)
{
- if(server.state == Destroyed || (server.state == Inactive && server.enabled))
+ if(server.state == ServerState::Destroyed || (server.state == ServerState::Inactive && server.enabled))
{
servers.erase(p);
}
@@ -447,12 +449,13 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
}
++p;
}
- if(server.state != Destroyed && (server.state != Inactive || !server.enabled) && p == servers.end())
+ if(server.state != ServerState::Destroyed && (server.state != ServerState::Inactive
+ || !server.enabled) && p == servers.end())
{
servers.push_back(server);
}
- if(server.state != Destroyed)
+ if(server.state != ServerState::Destroyed)
{
_serverStatus[server.id] = server.enabled;
}
@@ -463,9 +466,9 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
try
{
- for(vector<NodeObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
+ for(const auto& publisher : _publishers)
{
- (*q)->updateServer(node, server);
+ publisher->updateServer(node, server);
}
}
catch(const Ice::LocalException& ex)
@@ -476,9 +479,9 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
}
void
-NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&)
+NodeObserverTopic::updateAdapter(string node, AdapterDynamicInfo adapter, const Ice::Current&)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return;
@@ -519,9 +522,9 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
try
{
- for(vector<NodeObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
+ for(const auto& publisher : _publishers)
{
- (*q)->updateAdapter(node, adapter);
+ publisher->updateAdapter(node, adapter);
}
}
catch(const Ice::LocalException& ex)
@@ -534,7 +537,7 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
void
NodeObserverTopic::nodeDown(const string& name)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return;
@@ -548,17 +551,17 @@ NodeObserverTopic::nodeDown(const string& name)
}
ServerDynamicInfoSeq& servers = _nodes[name].servers;
- for(ServerDynamicInfoSeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
+ for(const auto& server : servers)
{
- _serverStatus.erase(p->id);
+ _serverStatus.erase(server.id);
}
_nodes.erase(name);
try
{
- for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->nodeDown(name);
+ publisher->nodeDown(name);
}
}
catch(const Ice::LocalException& ex)
@@ -569,14 +572,14 @@ NodeObserverTopic::nodeDown(const string& name)
}
void
-NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+NodeObserverTopic::initObserver(const shared_ptr<Ice::ObjectPrx>& obsv)
{
- NodeObserverPrx observer = NodeObserverPrx::uncheckedCast(obsv);
+ auto observer = Ice::uncheckedCast<NodeObserverPrx>(obsv);
NodeDynamicInfoSeq nodes;
nodes.reserve(_nodes.size());
- for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
+ for(const auto& node : _nodes )
{
- nodes.push_back(p->second);
+ nodes.push_back(node.second);
}
observer->nodeInit(nodes, getContext(_serial));
}
@@ -584,7 +587,7 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
bool
NodeObserverTopic::isServerEnabled(const string& server) const
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return false;
@@ -600,8 +603,8 @@ NodeObserverTopic::isServerEnabled(const string& server) const
}
}
-ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
- const map<string, ApplicationInfo>& applications, Ice::Long serial) :
+ApplicationObserverTopic::ApplicationObserverTopic(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager,
+ const map<string, ApplicationInfo>& applications, long long serial) :
ObserverTopic(topicManager, "ApplicationObserver", serial),
_applications(applications)
{
@@ -609,9 +612,9 @@ ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerP
}
int
-ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationInfoSeq& apps)
+ApplicationObserverTopic::applicationInit(long long dbSerial, const ApplicationInfoSeq& apps)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
@@ -624,9 +627,9 @@ ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationI
}
try
{
- for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->applicationInit(_serial, apps, getContext(_serial, dbSerial));
+ publisher->applicationInit(_serial, apps, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -639,9 +642,9 @@ ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationI
}
int
-ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const ApplicationInfo& info)
+ApplicationObserverTopic::applicationAdded(long long dbSerial, const ApplicationInfo& info)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
@@ -651,9 +654,9 @@ ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const Application
_applications.insert(make_pair(info.descriptor.name, info));
try
{
- for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->applicationAdded(_serial, info, getContext(_serial, dbSerial));
+ publisher->applicationAdded(_serial, info, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -666,9 +669,9 @@ ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const Application
}
int
-ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& name)
+ApplicationObserverTopic::applicationRemoved(long long dbSerial, const string& name)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
@@ -677,9 +680,9 @@ ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& n
_applications.erase(name);
try
{
- for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->applicationRemoved(_serial, name, getContext(_serial, dbSerial));
+ publisher->applicationRemoved(_serial, name, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -692,9 +695,9 @@ ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& n
}
int
-ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const ApplicationUpdateInfo& info)
+ApplicationObserverTopic::applicationUpdated(long long dbSerial, const ApplicationUpdateInfo& info)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
@@ -733,9 +736,9 @@ ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const Applicati
}
try
{
- for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->applicationUpdated(_serial, info, getContext(_serial, dbSerial));
+ publisher->applicationUpdated(_serial, info, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -748,19 +751,19 @@ ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const Applicati
}
void
-ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+ApplicationObserverTopic::initObserver(const shared_ptr<Ice::ObjectPrx>& obsv)
{
- ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv);
+ auto observer = Ice::uncheckedCast<ApplicationObserverPrx>(obsv);
ApplicationInfoSeq applications;
- for(map<string, ApplicationInfo>::const_iterator p = _applications.begin(); p != _applications.end(); ++p)
+ for(const auto& application : _applications)
{
- applications.push_back(p->second);
+ applications.push_back(application.second);
}
observer->applicationInit(_serial, applications, getContext(_serial, _dbSerial));
}
-AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
- const map<string, AdapterInfo>& adapters, Ice::Long serial) :
+AdapterObserverTopic::AdapterObserverTopic(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager,
+ const map<string, AdapterInfo>& adapters, long long serial) :
ObserverTopic(topicManager, "AdapterObserver", serial),
_adapters(adapters)
{
@@ -768,24 +771,24 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi
}
int
-AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpts)
+AdapterObserverTopic::adapterInit(long long dbSerial, const AdapterInfoSeq& adpts)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
}
updateSerial(dbSerial);
_adapters.clear();
- for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
+ for(const auto& adpt : adpts)
{
- _adapters.insert(make_pair(q->id, *q));
+ _adapters.insert({ adpt.id, adpt });
}
try
{
- for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->adapterInit(adpts, getContext(_serial, dbSerial));
+ publisher->adapterInit(adpts, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -798,9 +801,9 @@ AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpt
}
int
-AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info)
+AdapterObserverTopic::adapterAdded(long long dbSerial, const AdapterInfo& info)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
@@ -809,9 +812,9 @@ AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info)
_adapters.insert(make_pair(info.id, info));
try
{
- for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->adapterAdded(info, getContext(_serial, dbSerial));
+ publisher->adapterAdded(info, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -824,9 +827,9 @@ AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info)
}
int
-AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info)
+AdapterObserverTopic::adapterUpdated(long long dbSerial, const AdapterInfo& info)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
@@ -835,9 +838,9 @@ AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info
_adapters[info.id] = info;
try
{
- for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->adapterUpdated(info, getContext(_serial, dbSerial));
+ publisher->adapterUpdated(info, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -850,9 +853,9 @@ AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info
}
int
-AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id)
+AdapterObserverTopic::adapterRemoved(long long dbSerial, const string& id)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
@@ -861,9 +864,9 @@ AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id)
_adapters.erase(id);
try
{
- for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->adapterRemoved(id, getContext(_serial, dbSerial));
+ publisher->adapterRemoved(id, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -876,19 +879,19 @@ AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id)
}
void
-AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+AdapterObserverTopic::initObserver(const shared_ptr<Ice::ObjectPrx>& obsv)
{
- AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv);
+ auto observer = Ice::uncheckedCast<AdapterObserverPrx>(obsv);
AdapterInfoSeq adapters;
- for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ for(const auto& adapter : _adapters)
{
- adapters.push_back(p->second);
+ adapters.push_back(adapter.second);
}
observer->adapterInit(adapters, getContext(_serial, _dbSerial));
}
-ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
- const map<Ice::Identity, ObjectInfo>& objects, Ice::Long serial) :
+ObjectObserverTopic::ObjectObserverTopic(const shared_ptr<IceStorm::TopicManagerPrx>& topicManager,
+ const map<Ice::Identity, ObjectInfo>& objects, long long serial) :
ObserverTopic(topicManager, "ObjectObserver", serial),
_objects(objects)
{
@@ -896,24 +899,24 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM
}
int
-ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects)
+ObjectObserverTopic::objectInit(long long dbSerial, const ObjectInfoSeq& objects)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
}
updateSerial(dbSerial);
_objects.clear();
- for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r)
+ for(const auto& object : objects)
{
- _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r));
+ _objects.insert(make_pair(object.proxy->ice_getIdentity(), object));
}
try
{
- for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->objectInit(objects, getContext(_serial, dbSerial));
+ publisher->objectInit(objects, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -926,9 +929,9 @@ ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects
}
int
-ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info)
+ObjectObserverTopic::objectAdded(long long dbSerial, const ObjectInfo& info)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
@@ -937,9 +940,9 @@ ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info)
_objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
try
{
- for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->objectAdded(info, getContext(_serial, dbSerial));
+ publisher->objectAdded(info, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -952,9 +955,9 @@ ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info)
}
int
-ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info)
+ObjectObserverTopic::objectUpdated(long long dbSerial, const ObjectInfo& info)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
@@ -963,9 +966,9 @@ ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info)
_objects[info.proxy->ice_getIdentity()] = info;
try
{
- for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->objectUpdated(info, getContext(_serial, dbSerial));
+ publisher->objectUpdated(info, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -978,9 +981,9 @@ ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info)
}
int
-ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id)
+ObjectObserverTopic::objectRemoved(long long dbSerial, const Ice::Identity& id)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
@@ -989,9 +992,9 @@ ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id)
_objects.erase(id);
try
{
- for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ for(const auto& publisher : _publishers)
{
- (*p)->objectRemoved(id, getContext(_serial, dbSerial));
+ publisher->objectRemoved(id, getContext(_serial, dbSerial));
}
}
catch(const Ice::LocalException& ex)
@@ -1006,24 +1009,24 @@ ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id)
int
ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
}
- for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ for(const auto& info : infos)
{
updateSerial();
- map<Ice::Identity, ObjectInfo>::iterator q = _objects.find(p->proxy->ice_getIdentity());
+ auto q = _objects.find(info.proxy->ice_getIdentity());
if(q != _objects.end())
{
- q->second = *p;
+ q->second = info;
try
{
- for(vector<ObjectObserverPrx>::const_iterator r = _publishers.begin(); r != _publishers.end(); ++r)
+ for(const auto& publisher : _publishers)
{
- (*r)->objectUpdated(*p, getContext(_serial));
+ publisher->objectUpdated(info, getContext(_serial));
}
}
catch(const Ice::LocalException& ex)
@@ -1034,12 +1037,12 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos)
}
else
{
- _objects.insert(make_pair(p->proxy->ice_getIdentity(), *p));
+ _objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
try
{
- for(vector<ObjectObserverPrx>::const_iterator r = _publishers.begin(); r != _publishers.end(); ++r)
+ for(const auto& publisher : _publishers)
{
- (*r)->objectAdded(*p, getContext(_serial));
+ publisher->objectAdded(info, getContext(_serial));
}
}
catch(const Ice::LocalException& ex)
@@ -1055,28 +1058,27 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos)
// here. This operation is called by ReplicaSessionI.
//
addExpectedUpdate(_serial);
- //waitForSyncedSubscribersNoSync(_serial);
return _serial;
}
int
ObjectObserverTopic::wellKnownObjectsRemoved(const ObjectInfoSeq& infos)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_topics.empty())
{
return -1;
}
- for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ for(const auto& info : infos)
{
updateSerial();
- _objects.erase(p->proxy->ice_getIdentity());
+ _objects.erase(info.proxy->ice_getIdentity());
try
{
- for(vector<ObjectObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
+ for(const auto& publisher : _publishers)
{
- (*q)->objectRemoved(p->proxy->ice_getIdentity(), getContext(_serial));
+ publisher->objectRemoved(info.proxy->ice_getIdentity(), getContext(_serial));
}
}
catch(const Ice::LocalException& ex)
@@ -1088,22 +1090,21 @@ ObjectObserverTopic::wellKnownObjectsRemoved(const ObjectInfoSeq& infos)
//
// We don't need to wait for the update to be received by the
- // replicas here. This operation is only called internaly by
+ // replicas here. This operation is only called internally by
// IceGrid.
//
addExpectedUpdate(_serial);
- //waitForSyncedSubscribersNoSync(_serial);
return _serial;
}
void
-ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+ObjectObserverTopic::initObserver(const shared_ptr<Ice::ObjectPrx>& obsv)
{
- ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv);
+ auto observer = Ice::uncheckedCast<ObjectObserverPrx>(obsv);
ObjectInfoSeq objects;
- for(map<Ice::Identity, ObjectInfo>::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
+ for(const auto& object : _objects)
{
- objects.push_back(p->second);
+ objects.push_back(object.second);
}
observer->objectInit(objects, getContext(_serial, _dbSerial));
}