diff options
author | Benoit Foucher <benoit@zeroc.com> | 2017-02-10 18:42:18 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2017-02-10 18:42:18 +0100 |
commit | 3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11 (patch) | |
tree | 8a2475dda8b95d5b837bdf5415a1a968c57642c1 /cpp/src/IceGrid/Topics.cpp | |
parent | Fix (ICE-7331) - IceGridGUI preference preservation (diff) | |
download | ice-3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11.tar.bz2 ice-3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11.tar.xz ice-3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11.zip |
Fixed ICE-7328 - IceGrid no longer returns proxies from disabled servers for ByType operations
Diffstat (limited to 'cpp/src/IceGrid/Topics.cpp')
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 162 |
1 files changed, 101 insertions, 61 deletions
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 578f688d0b4..140c2673140 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -139,7 +139,7 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name) ++p; } } - + if(notifyMonitor) { notifyAll(); @@ -163,7 +163,7 @@ ObserverTopic::receivedUpdate(const string& name, int serial, const string& fail if(p != _waitForUpdates.end()) { p->second.erase(name); - + if(!failure.empty()) { map<int, map<string, string> >::iterator q = _updateFailures.find(serial); @@ -237,7 +237,7 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) if(q != _updateFailures.end()) { map<string, string> failures = q->second; - _updateFailures.erase(q); + _updateFailures.erase(q); ostringstream os; for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r) { @@ -288,7 +288,7 @@ ObserverTopic::getContext(int serial, Ice::Long dbSerial) const return context; } -RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : +RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : ObserverTopic(topicManager, "RegistryObserver") { _publishers = getPublishers<RegistryObserverPrx>(); @@ -314,11 +314,11 @@ RegistryObserverTopic::registryUp(const RegistryInfo& info) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `registryUp' update:\n" << ex; + out << "unexpected exception while publishing `registryUp' update:\n" << ex; } } -void +void RegistryObserverTopic::registryDown(const string& name) { Lock sync(*this); @@ -344,7 +344,7 @@ RegistryObserverTopic::registryDown(const string& name) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `registryDown' update:\n" << ex; + out << "unexpected exception while publishing `registryDown' update:\n" << ex; } } @@ -361,8 +361,8 @@ RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv) observer->registryInit(registries, getContext(_serial)); } -NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager, - const Ice::ObjectAdapterPtr& adapter) : +NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager, + const Ice::ObjectAdapterPtr& adapter) : ObserverTopic(topicManager, "NodeObserver") { _publishers = getPublishers<NodeObserverPrx>(); @@ -391,6 +391,10 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&) } updateSerial(); _nodes.insert(make_pair(info.info.name, info)); + for(ServerDynamicInfoSeq::const_iterator p = info.servers.begin(); p != info.servers.end(); ++p) + { + _serverStatus[p->id] = p->enabled; + } try { for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) @@ -401,17 +405,17 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing 'nodeUp' update:\n" << ex; + out << "unexpected exception while publishing 'nodeUp' update:\n" << ex; } } -void +void NodeObserverTopic::nodeDown(const string& /*name*/, const Ice::Current&) { assert(false); } -void +void NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&) { Lock sync(*this); @@ -427,7 +431,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser // return; } - + updateSerial(); ServerDynamicInfoSeq& servers = _nodes[node].servers; @@ -453,6 +457,15 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser servers.push_back(server); } + if(server.state != Destroyed) + { + _serverStatus[server.id] = server.enabled; + } + else + { + _serverStatus.erase(server.id); + } + try { for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) @@ -463,11 +476,11 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `updateServer' update:\n" << ex; + out << "unexpected exception while publishing `updateServer' update:\n" << ex; } } -void +void NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&) { Lock sync(*this); @@ -508,7 +521,7 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a { adapters.push_back(adapter); } - + try { for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) @@ -519,11 +532,11 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `updateAdapter' update:\n" << ex; + out << "unexpected exception while publishing `updateAdapter' update:\n" << ex; } } -void +void NodeObserverTopic::nodeDown(const string& name) { Lock sync(*this); @@ -534,22 +547,30 @@ NodeObserverTopic::nodeDown(const string& name) updateSerial(); - if(_nodes.find(name) != _nodes.end()) + if(_nodes.find(name) == _nodes.end()) { - _nodes.erase(name); - try - { - for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) - { - (*p)->nodeDown(name); - } - } - catch(const Ice::LocalException& ex) + return; + } + + ServerDynamicInfoSeq& servers = _nodes[name].servers; + for(ServerDynamicInfoSeq::const_iterator p = servers.begin(); p != servers.end(); ++p) + { + _serverStatus.erase(p->id); + } + + _nodes.erase(name); + try + { + for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - Ice::Warning out(_logger); - out << "unexpected exception while publishing `nodeDown' update:\n" << ex; + (*p)->nodeDown(name); } } + catch(const Ice::LocalException& ex) + { + Ice::Warning out(_logger); + out << "unexpected exception while publishing `nodeDown' update:\n" << ex; + } } void @@ -565,6 +586,25 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv) observer->nodeInit(nodes, getContext(_serial)); } +bool +NodeObserverTopic::isServerEnabled(const string& server) const +{ + Lock sync(*this); + if(_topics.empty()) + { + return false; + } + map<string, bool>::const_iterator p = _serverStatus.find(server); + if(p != _serverStatus.end()) + { + return p->second; + } + else + { + return true; // Assume the server is enabled if we don't know its status. + } +} + ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const map<string, ApplicationInfo>& applications, Ice::Long serial) : ObserverTopic(topicManager, "ApplicationObserver", serial), @@ -597,13 +637,13 @@ ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationI catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `applicationInit' update:\n" << ex; + out << "unexpected exception while publishing `applicationInit' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const ApplicationInfo& info) { Lock sync(*this); @@ -624,13 +664,13 @@ ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const Application catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `applicationAdded' update:\n" << ex; + out << "unexpected exception while publishing `applicationAdded' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& name) { Lock sync(*this); @@ -650,13 +690,13 @@ ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& n catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex; + out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const ApplicationUpdateInfo& info) { Lock sync(*this); @@ -712,13 +752,13 @@ ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const Applicati catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex; + out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -void +void ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv); @@ -738,7 +778,7 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi _publishers = getPublishers<AdapterObserverPrx>(); } -int +int AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpts) { Lock sync(*this); @@ -762,13 +802,13 @@ AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpt catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `adapterInit' update:\n" << ex; + out << "unexpected exception while publishing `adapterInit' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info) { Lock sync(*this); @@ -788,13 +828,13 @@ AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `adapterAdded' update:\n" << ex; + out << "unexpected exception while publishing `adapterAdded' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info) { Lock sync(*this); @@ -814,7 +854,7 @@ AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex; + out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; @@ -840,13 +880,13 @@ AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex; + out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -void +void AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv); @@ -854,7 +894,7 @@ AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) { adapters.push_back(p->second); - } + } observer->adapterInit(adapters, getContext(_serial, _dbSerial)); } @@ -866,7 +906,7 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM _publishers = getPublishers<ObjectObserverPrx>(); } -int +int ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects) { Lock sync(*this); @@ -890,13 +930,13 @@ ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectInit' update:\n" << ex; + out << "unexpected exception while publishing `objectInit' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info) { Lock sync(*this); @@ -916,13 +956,13 @@ ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectAdded' update:\n" << ex; + out << "unexpected exception while publishing `objectAdded' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info) { Lock sync(*this); @@ -942,7 +982,7 @@ ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; + out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; @@ -968,7 +1008,7 @@ ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectRemoved' update:\n" << ex; + out << "unexpected exception while publishing `objectRemoved' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; @@ -1000,7 +1040,7 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; + out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } } else @@ -1016,7 +1056,7 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectAdded' update:\n" << ex; + out << "unexpected exception while publishing `objectAdded' update:\n" << ex; } } } @@ -1024,7 +1064,7 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos) // // We don't wait for the update to be received by the replicas // here. This operation is called by ReplicaSessionI. - // + // addExpectedUpdate(_serial); //waitForSyncedSubscribersNoSync(_serial); return _serial; @@ -1053,21 +1093,21 @@ ObjectObserverTopic::wellKnownObjectsRemoved(const ObjectInfoSeq& infos) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; + out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } } // // We don't need to wait for the update to be received by the - // replicas here. This operation is only called internaly by + // replicas here. This operation is only called internaly by // IceGrid. - // + // addExpectedUpdate(_serial); //waitForSyncedSubscribersNoSync(_serial); return _serial; } -void +void ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv); |