summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Topics.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2017-02-10 18:42:18 +0100
committerBenoit Foucher <benoit@zeroc.com>2017-02-10 18:42:18 +0100
commit3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11 (patch)
tree8a2475dda8b95d5b837bdf5415a1a968c57642c1 /cpp/src/IceGrid/Topics.cpp
parentFix (ICE-7331) - IceGridGUI preference preservation (diff)
downloadice-3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11.tar.bz2
ice-3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11.tar.xz
ice-3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11.zip
Fixed ICE-7328 - IceGrid no longer returns proxies from disabled servers for ByType operations
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r--cpp/src/IceGrid/Topics.cpp162
1 files changed, 101 insertions, 61 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 578f688d0b4..140c2673140 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -139,7 +139,7 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
++p;
}
}
-
+
if(notifyMonitor)
{
notifyAll();
@@ -163,7 +163,7 @@ ObserverTopic::receivedUpdate(const string& name, int serial, const string& fail
if(p != _waitForUpdates.end())
{
p->second.erase(name);
-
+
if(!failure.empty())
{
map<int, map<string, string> >::iterator q = _updateFailures.find(serial);
@@ -237,7 +237,7 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
if(q != _updateFailures.end())
{
map<string, string> failures = q->second;
- _updateFailures.erase(q);
+ _updateFailures.erase(q);
ostringstream os;
for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r)
{
@@ -288,7 +288,7 @@ ObserverTopic::getContext(int serial, Ice::Long dbSerial) const
return context;
}
-RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) :
+RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) :
ObserverTopic(topicManager, "RegistryObserver")
{
_publishers = getPublishers<RegistryObserverPrx>();
@@ -314,11 +314,11 @@ RegistryObserverTopic::registryUp(const RegistryInfo& info)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `registryUp' update:\n" << ex;
+ out << "unexpected exception while publishing `registryUp' update:\n" << ex;
}
}
-void
+void
RegistryObserverTopic::registryDown(const string& name)
{
Lock sync(*this);
@@ -344,7 +344,7 @@ RegistryObserverTopic::registryDown(const string& name)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `registryDown' update:\n" << ex;
+ out << "unexpected exception while publishing `registryDown' update:\n" << ex;
}
}
@@ -361,8 +361,8 @@ RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
observer->registryInit(registries, getContext(_serial));
}
-NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
- const Ice::ObjectAdapterPtr& adapter) :
+NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
+ const Ice::ObjectAdapterPtr& adapter) :
ObserverTopic(topicManager, "NodeObserver")
{
_publishers = getPublishers<NodeObserverPrx>();
@@ -391,6 +391,10 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&)
}
updateSerial();
_nodes.insert(make_pair(info.info.name, info));
+ for(ServerDynamicInfoSeq::const_iterator p = info.servers.begin(); p != info.servers.end(); ++p)
+ {
+ _serverStatus[p->id] = p->enabled;
+ }
try
{
for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
@@ -401,17 +405,17 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing 'nodeUp' update:\n" << ex;
+ out << "unexpected exception while publishing 'nodeUp' update:\n" << ex;
}
}
-void
+void
NodeObserverTopic::nodeDown(const string& /*name*/, const Ice::Current&)
{
assert(false);
}
-void
+void
NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&)
{
Lock sync(*this);
@@ -427,7 +431,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
//
return;
}
-
+
updateSerial();
ServerDynamicInfoSeq& servers = _nodes[node].servers;
@@ -453,6 +457,15 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
servers.push_back(server);
}
+ if(server.state != Destroyed)
+ {
+ _serverStatus[server.id] = server.enabled;
+ }
+ else
+ {
+ _serverStatus.erase(server.id);
+ }
+
try
{
for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
@@ -463,11 +476,11 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `updateServer' update:\n" << ex;
+ out << "unexpected exception while publishing `updateServer' update:\n" << ex;
}
}
-void
+void
NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&)
{
Lock sync(*this);
@@ -508,7 +521,7 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
{
adapters.push_back(adapter);
}
-
+
try
{
for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
@@ -519,11 +532,11 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `updateAdapter' update:\n" << ex;
+ out << "unexpected exception while publishing `updateAdapter' update:\n" << ex;
}
}
-void
+void
NodeObserverTopic::nodeDown(const string& name)
{
Lock sync(*this);
@@ -534,22 +547,30 @@ NodeObserverTopic::nodeDown(const string& name)
updateSerial();
- if(_nodes.find(name) != _nodes.end())
+ if(_nodes.find(name) == _nodes.end())
{
- _nodes.erase(name);
- try
- {
- for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
- {
- (*p)->nodeDown(name);
- }
- }
- catch(const Ice::LocalException& ex)
+ return;
+ }
+
+ ServerDynamicInfoSeq& servers = _nodes[name].servers;
+ for(ServerDynamicInfoSeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
+ {
+ _serverStatus.erase(p->id);
+ }
+
+ _nodes.erase(name);
+ try
+ {
+ for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- Ice::Warning out(_logger);
- out << "unexpected exception while publishing `nodeDown' update:\n" << ex;
+ (*p)->nodeDown(name);
}
}
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `nodeDown' update:\n" << ex;
+ }
}
void
@@ -565,6 +586,25 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
observer->nodeInit(nodes, getContext(_serial));
}
+bool
+NodeObserverTopic::isServerEnabled(const string& server) const
+{
+ Lock sync(*this);
+ if(_topics.empty())
+ {
+ return false;
+ }
+ map<string, bool>::const_iterator p = _serverStatus.find(server);
+ if(p != _serverStatus.end())
+ {
+ return p->second;
+ }
+ else
+ {
+ return true; // Assume the server is enabled if we don't know its status.
+ }
+}
+
ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
const map<string, ApplicationInfo>& applications, Ice::Long serial) :
ObserverTopic(topicManager, "ApplicationObserver", serial),
@@ -597,13 +637,13 @@ ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationI
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `applicationInit' update:\n" << ex;
+ out << "unexpected exception while publishing `applicationInit' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const ApplicationInfo& info)
{
Lock sync(*this);
@@ -624,13 +664,13 @@ ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const Application
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `applicationAdded' update:\n" << ex;
+ out << "unexpected exception while publishing `applicationAdded' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& name)
{
Lock sync(*this);
@@ -650,13 +690,13 @@ ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& n
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex;
+ out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const ApplicationUpdateInfo& info)
{
Lock sync(*this);
@@ -712,13 +752,13 @@ ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const Applicati
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex;
+ out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-void
+void
ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv);
@@ -738,7 +778,7 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi
_publishers = getPublishers<AdapterObserverPrx>();
}
-int
+int
AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpts)
{
Lock sync(*this);
@@ -762,13 +802,13 @@ AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpt
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `adapterInit' update:\n" << ex;
+ out << "unexpected exception while publishing `adapterInit' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info)
{
Lock sync(*this);
@@ -788,13 +828,13 @@ AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `adapterAdded' update:\n" << ex;
+ out << "unexpected exception while publishing `adapterAdded' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info)
{
Lock sync(*this);
@@ -814,7 +854,7 @@ AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex;
+ out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
@@ -840,13 +880,13 @@ AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex;
+ out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-void
+void
AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv);
@@ -854,7 +894,7 @@ AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
{
adapters.push_back(p->second);
- }
+ }
observer->adapterInit(adapters, getContext(_serial, _dbSerial));
}
@@ -866,7 +906,7 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM
_publishers = getPublishers<ObjectObserverPrx>();
}
-int
+int
ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects)
{
Lock sync(*this);
@@ -890,13 +930,13 @@ ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectInit' update:\n" << ex;
+ out << "unexpected exception while publishing `objectInit' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info)
{
Lock sync(*this);
@@ -916,13 +956,13 @@ ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
+ out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info)
{
Lock sync(*this);
@@ -942,7 +982,7 @@ ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
+ out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
@@ -968,7 +1008,7 @@ ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectRemoved' update:\n" << ex;
+ out << "unexpected exception while publishing `objectRemoved' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
@@ -1000,7 +1040,7 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
+ out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
}
}
else
@@ -1016,7 +1056,7 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
+ out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
}
}
}
@@ -1024,7 +1064,7 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos)
//
// We don't wait for the update to be received by the replicas
// here. This operation is called by ReplicaSessionI.
- //
+ //
addExpectedUpdate(_serial);
//waitForSyncedSubscribersNoSync(_serial);
return _serial;
@@ -1053,21 +1093,21 @@ ObjectObserverTopic::wellKnownObjectsRemoved(const ObjectInfoSeq& infos)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
+ out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
}
}
//
// We don't need to wait for the update to be received by the
- // replicas here. This operation is only called internaly by
+ // replicas here. This operation is only called internaly by
// IceGrid.
- //
+ //
addExpectedUpdate(_serial);
//waitForSyncedSubscribersNoSync(_serial);
return _serial;
}
-void
+void
ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv);