diff options
author | Benoit Foucher <benoit@zeroc.com> | 2017-02-10 18:42:18 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2017-02-10 18:42:18 +0100 |
commit | 3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11 (patch) | |
tree | 8a2475dda8b95d5b837bdf5415a1a968c57642c1 | |
parent | Fix (ICE-7331) - IceGridGUI preference preservation (diff) | |
download | ice-3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11.tar.bz2 ice-3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11.tar.xz ice-3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11.zip |
Fixed ICE-7328 - IceGrid no longer returns proxies from disabled servers for ByType operations
-rw-r--r-- | CHANGELOG-3.7.md | 6 | ||||
-rw-r--r-- | cpp/src/IceGrid/Allocatable.h | 1 | ||||
-rw-r--r-- | cpp/src/IceGrid/AllocatableObjectCache.cpp | 27 | ||||
-rw-r--r-- | cpp/src/IceGrid/AllocatableObjectCache.h | 9 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 17 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/ObjectCache.cpp | 27 | ||||
-rw-r--r-- | cpp/src/IceGrid/ObjectCache.h | 17 | ||||
-rw-r--r-- | cpp/src/IceGrid/QueryI.cpp | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerCache.cpp | 15 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerCache.h | 11 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 162 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.h | 5 | ||||
-rw-r--r-- | cpp/test/IceGrid/activation/AllTests.cpp | 14 | ||||
-rw-r--r-- | cpp/test/IceGrid/activation/application.xml | 14 | ||||
-rw-r--r-- | cpp/test/IceGrid/allocation/AllTests.cpp | 23 | ||||
-rw-r--r-- | cpp/test/IceGrid/allocation/application.xml | 4 |
17 files changed, 250 insertions, 106 deletions
diff --git a/CHANGELOG-3.7.md b/CHANGELOG-3.7.md index efe1c7612b3..309367ac13d 100644 --- a/CHANGELOG-3.7.md +++ b/CHANGELOG-3.7.md @@ -17,6 +17,12 @@ These are the changes since Ice 3.6.3. ## General Changes +- The `findObjectByType`, `findAllObjectsByType`, `findObjectByTypeOnLeastLoadedNode` + operations from the `IceGrid::Query` interface and the `allocateObjectByType` + operation from the `IceGrid::Session` interfaces now only returns proxies for + Ice objects from enabled servers. If a server is disabled, its well-known or + allocatable Ice objects won't be returned anymore to clients. + - A Slice enumeration (enum) creates now a new namespace scope for its enumerators. In previous releases, the enumerators were in the same namespace scope as the enumeration. For example: diff --git a/cpp/src/IceGrid/Allocatable.h b/cpp/src/IceGrid/Allocatable.h index 0f10b94f4ae..2bf31a4447c 100644 --- a/cpp/src/IceGrid/Allocatable.h +++ b/cpp/src/IceGrid/Allocatable.h @@ -85,6 +85,7 @@ public: bool isAllocatable() const { return _allocatable; } SessionIPtr getSession() const; + virtual bool isEnabled() const = 0; virtual void allocated(const SessionIPtr&) = 0; virtual void released(const SessionIPtr&) = 0; virtual bool canTryAllocate() { return false; } diff --git a/cpp/src/IceGrid/AllocatableObjectCache.cpp b/cpp/src/IceGrid/AllocatableObjectCache.cpp index 3a9799e1eac..438add8b263 100644 --- a/cpp/src/IceGrid/AllocatableObjectCache.cpp +++ b/cpp/src/IceGrid/AllocatableObjectCache.cpp @@ -12,6 +12,7 @@ #include <Ice/LoggerUtil.h> #include <Ice/LocalException.h> #include <IceGrid/AllocatableObjectCache.h> +#include <IceGrid/ServerCache.h> #include <IceGrid/SessionI.h> using namespace std; @@ -126,7 +127,7 @@ AllocatableObjectCache::AllocatableObjectCache(const Ice::CommunicatorPtr& commu } void -AllocatableObjectCache::add(const ObjectInfo& info, const AllocatablePtr& parent) +AllocatableObjectCache::add(const ObjectInfo& info, const ServerEntryPtr& parent) { const Ice::Identity& id = info.proxy->ice_getIdentity(); @@ -217,13 +218,18 @@ AllocatableObjectCache::allocateByType(const string& type, const ObjectAllocatio vector<AllocatableObjectEntryPtr> objects = p->second.getObjects(); RandomNumberGenerator rng; random_shuffle(objects.begin(), objects.end(), rng); // TODO: OPTIMIZE + int allocatable = 0; try { for(vector<AllocatableObjectEntryPtr>::const_iterator q = objects.begin(); q != objects.end(); ++q) { - if((*q)->tryAllocate(request)) + if((*q)->isEnabled()) { - return; + ++allocatable; + if((*q)->tryAllocate(request)) + { + return; + } } } } @@ -231,7 +237,10 @@ AllocatableObjectCache::allocateByType(const string& type, const ObjectAllocatio { return; // The request has been answered already, no need to throw here. } - + if(allocatable == 0) + { + throw AllocationException("no allocatable objects with type `" + type + "' enabled"); + } p->second.addAllocationRequest(request); } @@ -252,12 +261,14 @@ AllocatableObjectCache::canTryAllocate(const AllocatableObjectEntryPtr& entry) AllocatableObjectEntry::AllocatableObjectEntry(AllocatableObjectCache& cache, const ObjectInfo& info, - const AllocatablePtr& parent) : + const ServerEntryPtr& parent) : Allocatable(true, parent), _cache(cache), _info(info), + _server(parent), _destroyed(false) { + assert(_server); } Ice::ObjectPrx @@ -278,6 +289,12 @@ AllocatableObjectEntry::canRemove() return true; } +bool +AllocatableObjectEntry::isEnabled() const +{ + return _server->isEnabled(); +} + void AllocatableObjectEntry::allocated(const SessionIPtr& session) { diff --git a/cpp/src/IceGrid/AllocatableObjectCache.h b/cpp/src/IceGrid/AllocatableObjectCache.h index 382810c3653..9f2db6f3121 100644 --- a/cpp/src/IceGrid/AllocatableObjectCache.h +++ b/cpp/src/IceGrid/AllocatableObjectCache.h @@ -19,18 +19,22 @@ namespace IceGrid { +class ServerEntry; +typedef IceUtil::Handle<ServerEntry> ServerEntryPtr; + class AllocatableObjectCache; class AllocatableObjectEntry : public Allocatable { public: - AllocatableObjectEntry(AllocatableObjectCache&, const ObjectInfo&, const AllocatablePtr&); + AllocatableObjectEntry(AllocatableObjectCache&, const ObjectInfo&, const ServerEntryPtr&); Ice::ObjectPrx getProxy() const; std::string getType() const; bool canRemove(); + virtual bool isEnabled() const; virtual void allocated(const SessionIPtr&); virtual void released(const SessionIPtr&); virtual bool canTryAllocate(); @@ -42,6 +46,7 @@ private: AllocatableObjectCache& _cache; const ObjectInfo _info; + ServerEntryPtr _server; bool _destroyed; }; typedef IceUtil::Handle<AllocatableObjectEntry> AllocatableObjectEntryPtr; @@ -77,7 +82,7 @@ public: AllocatableObjectCache(const Ice::CommunicatorPtr&); - void add(const ObjectInfo&, const AllocatablePtr&); + void add(const ObjectInfo&, const ServerEntryPtr&); AllocatableObjectEntryPtr get(const Ice::Identity&) const; void remove(const Ice::Identity&); diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index ce458e52936..e20d176cbe8 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -265,6 +265,8 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter); _registryObserverTopic = new RegistryObserverTopic(_topicManager); + _serverCache.setNodeObserverTopic(_nodeObserverTopic); + // Set all serials to 1 if they have not yet been set. Ice::Long serial; if(!_serials.get(txn, applicationsDbName, serial)) @@ -1777,7 +1779,16 @@ Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample Ice::ObjectProxySeq Database::getObjectsByType(const string& type, const Ice::ConnectionPtr& con, const Ice::Context& ctx) { - Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type); + Ice::ObjectProxySeq proxies; + + vector<ObjectEntryPtr> objects = _objectCache.getObjectsByType(type); + for(vector<ObjectEntryPtr>::const_iterator q = objects.begin(); q != objects.end(); ++q) + { + if(_nodeObserverTopic->isServerEnabled((*q)->getServer())) // Only return proxies from enabled servers. + { + proxies.push_back((*q)->getProxy()); + } + } IceDB::ReadOnlyTxn txn(_env); vector<ObjectInfo> infos = findByType(txn, _objects, _objectsByType, type); @@ -2162,7 +2173,7 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries, const stri _adapterCache.addReplicaGroup(*r, application); for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) { - _objectCache.add(toObjectInfo(_communicator, *o, r->id), application); + _objectCache.add(toObjectInfo(_communicator, *o, r->id), application, ""); } } @@ -2305,7 +2316,7 @@ Database::reload(const ApplicationHelper& oldApp, for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) { - _objectCache.add(toObjectInfo(_communicator, *o, r->id), application); + _objectCache.add(toObjectInfo(_communicator, *o, r->id), application, ""); } } diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index e83d6c3cdc3..294e20f393d 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -971,6 +971,7 @@ NodeI::observerUpdateServer(const ServerDynamicInfo& info) if(sent.find(p->second) == sent.end()) { queueUpdate(p->second, new UpdateServer(this, p->second, info)); + sent.insert(p->second); } } } @@ -1001,6 +1002,7 @@ NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info) if(sent.find(p->second) == sent.end()) { queueUpdate(p->second, new UpdateAdapter(this, p->second, info)); + sent.insert(p->second); } } } diff --git a/cpp/src/IceGrid/ObjectCache.cpp b/cpp/src/IceGrid/ObjectCache.cpp index cc76cc2d9b7..8cd6aaedeb2 100644 --- a/cpp/src/IceGrid/ObjectCache.cpp +++ b/cpp/src/IceGrid/ObjectCache.cpp @@ -74,7 +74,7 @@ ObjectCache::ObjectCache(const Ice::CommunicatorPtr& communicator) : _communicat } void -ObjectCache::add(const ObjectInfo& info, const string& application) +ObjectCache::add(const ObjectInfo& info, const string& application, const string& server) { const Ice::Identity& id = info.proxy->ice_getIdentity(); @@ -86,7 +86,7 @@ ObjectCache::add(const ObjectInfo& info, const string& application) return; } - ObjectEntryPtr entry = new ObjectEntry(info, application); + ObjectEntryPtr entry = new ObjectEntry(info, application, server); addImpl(id, entry); map<string, TypeEntry>::iterator p = _types.find(entry->getType()); @@ -142,22 +142,16 @@ ObjectCache::remove(const Ice::Identity& id) } } -Ice::ObjectProxySeq +vector<ObjectEntryPtr> ObjectCache::getObjectsByType(const string& type) { Lock sync(*this); - Ice::ObjectProxySeq proxies; map<string, TypeEntry>::const_iterator p = _types.find(type); if(p == _types.end()) { - return proxies; + return vector<ObjectEntryPtr>(); } - const vector<ObjectEntryPtr>& objects = p->second.getObjects(); - for(vector<ObjectEntryPtr>::const_iterator q = objects.begin(); q != objects.end(); ++q) - { - proxies.push_back((*q)->getProxy()); - } - return proxies; + return p->second.getObjects(); } ObjectInfoSeq @@ -194,9 +188,10 @@ ObjectCache::getAllByType(const string& type) return infos; } -ObjectEntry::ObjectEntry(const ObjectInfo& info, const string& application) : +ObjectEntry::ObjectEntry(const ObjectInfo& info, const string& application, const string& server) : _info(info), - _application(application) + _application(application), + _server(server) { } @@ -218,6 +213,12 @@ ObjectEntry::getApplication() const return _application; } +string +ObjectEntry::getServer() const +{ + return _server; +} + const ObjectInfo& ObjectEntry::getObjectInfo() const { diff --git a/cpp/src/IceGrid/ObjectCache.h b/cpp/src/IceGrid/ObjectCache.h index fba99917e1b..fbeb5aeb9f4 100644 --- a/cpp/src/IceGrid/ObjectCache.h +++ b/cpp/src/IceGrid/ObjectCache.h @@ -23,11 +23,12 @@ class ObjectCache; class ObjectEntry : public IceUtil::Shared { public: - - ObjectEntry(const ObjectInfo&, const std::string&); + + ObjectEntry(const ObjectInfo&, const std::string&, const std::string&); Ice::ObjectPrx getProxy() const; std::string getType() const; std::string getApplication() const; + std::string getServer() const; const ObjectInfo& getObjectInfo() const; bool canRemove(); @@ -36,6 +37,7 @@ private: const ObjectInfo _info; const std::string _application; + const std::string _server; }; typedef IceUtil::Handle<ObjectEntry> ObjectEntryPtr; @@ -45,18 +47,19 @@ public: ObjectCache(const Ice::CommunicatorPtr&); - void add(const ObjectInfo&, const std::string&); + void add(const ObjectInfo&, const std::string&, const std::string&); ObjectEntryPtr get(const Ice::Identity&) const; void remove(const Ice::Identity&); - Ice::ObjectProxySeq getObjectsByType(const std::string&); + std::vector<ObjectEntryPtr> getObjectsByType(const std::string&); + ObjectInfoSeq getAll(const std::string&); ObjectInfoSeq getAllByType(const std::string&); const Ice::CommunicatorPtr& getCommunicator() const { return _communicator; } private: - + class TypeEntry { public: @@ -65,11 +68,11 @@ private: void add(const ObjectEntryPtr&); bool remove(const ObjectEntryPtr&); - + const std::vector<ObjectEntryPtr>& getObjects() const { return _objects; } private: - + std::vector<ObjectEntryPtr> _objects; }; diff --git a/cpp/src/IceGrid/QueryI.cpp b/cpp/src/IceGrid/QueryI.cpp index 366f5eb607d..713b197c36c 100644 --- a/cpp/src/IceGrid/QueryI.cpp +++ b/cpp/src/IceGrid/QueryI.cpp @@ -86,7 +86,7 @@ QueryI::findAllReplicas(const Ice::ObjectPrx& proxy, const Ice::Current& current try { AdapterInfoSeq infos = _database->getFilteredAdapterInfo(prx->ice_getAdapterId(), current.con, current.ctx); - if(infos.empty() || infos[0].replicaGroupId != prx->ice_getAdapterId()) + if(infos.empty() || infos[0].replicaGroupId != prx->ice_getAdapterId()) { // The adapter id doesn't refer to a replica group or the replica group is empty. return Ice::ObjectProxySeq(); diff --git a/cpp/src/IceGrid/ServerCache.cpp b/cpp/src/IceGrid/ServerCache.cpp index 03b4332f43b..bafafa42553 100644 --- a/cpp/src/IceGrid/ServerCache.cpp +++ b/cpp/src/IceGrid/ServerCache.cpp @@ -18,6 +18,7 @@ #include <IceGrid/AllocatableObjectCache.h> #include <IceGrid/SessionI.h> #include <IceGrid/DescriptorHelper.h> +#include <IceGrid/Topics.h> using namespace std; using namespace IceGrid; @@ -263,6 +264,12 @@ ServerCache::clear(const string& id) } void +ServerCache::setNodeObserverTopic(const NodeObserverTopicPtr& nodeObserverTopic) +{ + _nodeObserverTopic = nodeObserverTopic; +} + +void ServerCache::addCommunicator(const CommunicatorDescriptorPtr& oldDesc, const CommunicatorDescriptorPtr& newDesc, const ServerEntryPtr& server, @@ -291,7 +298,7 @@ ServerCache::addCommunicator(const CommunicatorDescriptorPtr& oldDesc, for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r) { - _objectCache.add(toObjectInfo(_communicator, *r, q->id), application); + _objectCache.add(toObjectInfo(_communicator, *r, q->id), application, server->getId()); } for(ObjectDescriptorSeq::const_iterator r = q->allocatables.begin(); r != q->allocatables.end(); ++r) { @@ -1091,6 +1098,12 @@ ServerEntry::checkUpdate(const ServerInfo& info, bool noRestart) return new CheckUpdateResult(_id, oldInfo.node, noRestart, desc, server->begin_checkUpdate(desc, noRestart)); } +bool +ServerEntry::isEnabled() const +{ + return _cache.getNodeObserverTopic()->isServerEnabled(_id); +} + void ServerEntry::allocated(const SessionIPtr& session) { diff --git a/cpp/src/IceGrid/ServerCache.h b/cpp/src/IceGrid/ServerCache.h index d604439caa8..8379fe4f8c3 100644 --- a/cpp/src/IceGrid/ServerCache.h +++ b/cpp/src/IceGrid/ServerCache.h @@ -34,6 +34,9 @@ typedef IceUtil::Handle<NodeEntry> NodeEntryPtr; class CheckServerResult; typedef IceUtil::Handle<CheckServerResult> CheckServerResultPtr; +class NodeObserverTopic; +typedef IceUtil::Handle<NodeObserverTopic> NodeObserverTopicPtr; + class CheckUpdateResult : public IceUtil::Shared { public: @@ -97,6 +100,7 @@ public: void destroyCallback(); void exception(const Ice::Exception&); + virtual bool isEnabled() const; virtual void allocated(const SessionIPtr&); virtual void allocatedNoSync(const SessionIPtr&); virtual void released(const SessionIPtr&); @@ -139,7 +143,8 @@ public: using CacheByString<ServerEntry>::remove; #endif - ServerCache(const Ice::CommunicatorPtr&, const std::string&, NodeCache&, AdapterCache&, ObjectCache&, AllocatableObjectCache&); + ServerCache(const Ice::CommunicatorPtr&, const std::string&, NodeCache&, AdapterCache&, ObjectCache&, + AllocatableObjectCache&); ServerEntryPtr add(const ServerInfo&); ServerEntryPtr get(const std::string&) const; @@ -155,6 +160,9 @@ public: Ice::CommunicatorPtr getCommunicator() const { return _communicator; } const std::string& getInstanceName() const { return _instanceName; } + const NodeObserverTopicPtr& getNodeObserverTopic() const { return _nodeObserverTopic; } + void setNodeObserverTopic(const NodeObserverTopicPtr&); + private: void addCommunicator(const CommunicatorDescriptorPtr&, const CommunicatorDescriptorPtr&, const ServerEntryPtr&, @@ -170,6 +178,7 @@ private: AdapterCache& _adapterCache; ObjectCache& _objectCache; AllocatableObjectCache& _allocatableObjectCache; + NodeObserverTopicPtr _nodeObserverTopic; }; }; diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 578f688d0b4..140c2673140 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -139,7 +139,7 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name) ++p; } } - + if(notifyMonitor) { notifyAll(); @@ -163,7 +163,7 @@ ObserverTopic::receivedUpdate(const string& name, int serial, const string& fail if(p != _waitForUpdates.end()) { p->second.erase(name); - + if(!failure.empty()) { map<int, map<string, string> >::iterator q = _updateFailures.find(serial); @@ -237,7 +237,7 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) if(q != _updateFailures.end()) { map<string, string> failures = q->second; - _updateFailures.erase(q); + _updateFailures.erase(q); ostringstream os; for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r) { @@ -288,7 +288,7 @@ ObserverTopic::getContext(int serial, Ice::Long dbSerial) const return context; } -RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : +RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : ObserverTopic(topicManager, "RegistryObserver") { _publishers = getPublishers<RegistryObserverPrx>(); @@ -314,11 +314,11 @@ RegistryObserverTopic::registryUp(const RegistryInfo& info) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `registryUp' update:\n" << ex; + out << "unexpected exception while publishing `registryUp' update:\n" << ex; } } -void +void RegistryObserverTopic::registryDown(const string& name) { Lock sync(*this); @@ -344,7 +344,7 @@ RegistryObserverTopic::registryDown(const string& name) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `registryDown' update:\n" << ex; + out << "unexpected exception while publishing `registryDown' update:\n" << ex; } } @@ -361,8 +361,8 @@ RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv) observer->registryInit(registries, getContext(_serial)); } -NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager, - const Ice::ObjectAdapterPtr& adapter) : +NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager, + const Ice::ObjectAdapterPtr& adapter) : ObserverTopic(topicManager, "NodeObserver") { _publishers = getPublishers<NodeObserverPrx>(); @@ -391,6 +391,10 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&) } updateSerial(); _nodes.insert(make_pair(info.info.name, info)); + for(ServerDynamicInfoSeq::const_iterator p = info.servers.begin(); p != info.servers.end(); ++p) + { + _serverStatus[p->id] = p->enabled; + } try { for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) @@ -401,17 +405,17 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing 'nodeUp' update:\n" << ex; + out << "unexpected exception while publishing 'nodeUp' update:\n" << ex; } } -void +void NodeObserverTopic::nodeDown(const string& /*name*/, const Ice::Current&) { assert(false); } -void +void NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&) { Lock sync(*this); @@ -427,7 +431,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser // return; } - + updateSerial(); ServerDynamicInfoSeq& servers = _nodes[node].servers; @@ -453,6 +457,15 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser servers.push_back(server); } + if(server.state != Destroyed) + { + _serverStatus[server.id] = server.enabled; + } + else + { + _serverStatus.erase(server.id); + } + try { for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) @@ -463,11 +476,11 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `updateServer' update:\n" << ex; + out << "unexpected exception while publishing `updateServer' update:\n" << ex; } } -void +void NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&) { Lock sync(*this); @@ -508,7 +521,7 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a { adapters.push_back(adapter); } - + try { for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) @@ -519,11 +532,11 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `updateAdapter' update:\n" << ex; + out << "unexpected exception while publishing `updateAdapter' update:\n" << ex; } } -void +void NodeObserverTopic::nodeDown(const string& name) { Lock sync(*this); @@ -534,22 +547,30 @@ NodeObserverTopic::nodeDown(const string& name) updateSerial(); - if(_nodes.find(name) != _nodes.end()) + if(_nodes.find(name) == _nodes.end()) { - _nodes.erase(name); - try - { - for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) - { - (*p)->nodeDown(name); - } - } - catch(const Ice::LocalException& ex) + return; + } + + ServerDynamicInfoSeq& servers = _nodes[name].servers; + for(ServerDynamicInfoSeq::const_iterator p = servers.begin(); p != servers.end(); ++p) + { + _serverStatus.erase(p->id); + } + + _nodes.erase(name); + try + { + for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p) { - Ice::Warning out(_logger); - out << "unexpected exception while publishing `nodeDown' update:\n" << ex; + (*p)->nodeDown(name); } } + catch(const Ice::LocalException& ex) + { + Ice::Warning out(_logger); + out << "unexpected exception while publishing `nodeDown' update:\n" << ex; + } } void @@ -565,6 +586,25 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv) observer->nodeInit(nodes, getContext(_serial)); } +bool +NodeObserverTopic::isServerEnabled(const string& server) const +{ + Lock sync(*this); + if(_topics.empty()) + { + return false; + } + map<string, bool>::const_iterator p = _serverStatus.find(server); + if(p != _serverStatus.end()) + { + return p->second; + } + else + { + return true; // Assume the server is enabled if we don't know its status. + } +} + ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const map<string, ApplicationInfo>& applications, Ice::Long serial) : ObserverTopic(topicManager, "ApplicationObserver", serial), @@ -597,13 +637,13 @@ ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationI catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `applicationInit' update:\n" << ex; + out << "unexpected exception while publishing `applicationInit' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const ApplicationInfo& info) { Lock sync(*this); @@ -624,13 +664,13 @@ ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const Application catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `applicationAdded' update:\n" << ex; + out << "unexpected exception while publishing `applicationAdded' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& name) { Lock sync(*this); @@ -650,13 +690,13 @@ ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& n catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex; + out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const ApplicationUpdateInfo& info) { Lock sync(*this); @@ -712,13 +752,13 @@ ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const Applicati catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex; + out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -void +void ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv); @@ -738,7 +778,7 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi _publishers = getPublishers<AdapterObserverPrx>(); } -int +int AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpts) { Lock sync(*this); @@ -762,13 +802,13 @@ AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpt catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `adapterInit' update:\n" << ex; + out << "unexpected exception while publishing `adapterInit' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info) { Lock sync(*this); @@ -788,13 +828,13 @@ AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `adapterAdded' update:\n" << ex; + out << "unexpected exception while publishing `adapterAdded' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info) { Lock sync(*this); @@ -814,7 +854,7 @@ AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex; + out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; @@ -840,13 +880,13 @@ AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex; + out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -void +void AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv); @@ -854,7 +894,7 @@ AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) { adapters.push_back(p->second); - } + } observer->adapterInit(adapters, getContext(_serial, _dbSerial)); } @@ -866,7 +906,7 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM _publishers = getPublishers<ObjectObserverPrx>(); } -int +int ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects) { Lock sync(*this); @@ -890,13 +930,13 @@ ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectInit' update:\n" << ex; + out << "unexpected exception while publishing `objectInit' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info) { Lock sync(*this); @@ -916,13 +956,13 @@ ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectAdded' update:\n" << ex; + out << "unexpected exception while publishing `objectAdded' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; } -int +int ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info) { Lock sync(*this); @@ -942,7 +982,7 @@ ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; + out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; @@ -968,7 +1008,7 @@ ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectRemoved' update:\n" << ex; + out << "unexpected exception while publishing `objectRemoved' update:\n" << ex; } addExpectedUpdate(_serial); return _serial; @@ -1000,7 +1040,7 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; + out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } } else @@ -1016,7 +1056,7 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectAdded' update:\n" << ex; + out << "unexpected exception while publishing `objectAdded' update:\n" << ex; } } } @@ -1024,7 +1064,7 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos) // // We don't wait for the update to be received by the replicas // here. This operation is called by ReplicaSessionI. - // + // addExpectedUpdate(_serial); //waitForSyncedSubscribersNoSync(_serial); return _serial; @@ -1053,21 +1093,21 @@ ObjectObserverTopic::wellKnownObjectsRemoved(const ObjectInfoSeq& infos) catch(const Ice::LocalException& ex) { Ice::Warning out(_logger); - out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; + out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } } // // We don't need to wait for the update to be received by the - // replicas here. This operation is only called internaly by + // replicas here. This operation is only called internaly by // IceGrid. - // + // addExpectedUpdate(_serial); //waitForSyncedSubscribersNoSync(_serial); return _serial; } -void +void ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv); diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h index ceaf6f4ca38..44affbacec3 100644 --- a/cpp/src/IceGrid/Topics.h +++ b/cpp/src/IceGrid/Topics.h @@ -88,7 +88,7 @@ typedef IceUtil::Handle<RegistryObserverTopic> RegistryObserverTopicPtr; class NodeObserverTopic : public ObserverTopic, public NodeObserver { public: - + NodeObserverTopic(const IceStorm::TopicManagerPrx&, const Ice::ObjectAdapterPtr&); virtual void nodeInit(const NodeDynamicInfoSeq&, const Ice::Current&); @@ -102,11 +102,14 @@ public: void nodeDown(const std::string&); virtual void initObserver(const Ice::ObjectPrx&); + bool isServerEnabled(const std::string&) const; + private: const NodeObserverPrx _externalPublisher; std::vector<NodeObserverPrx> _publishers; std::map<std::string, NodeDynamicInfo> _nodes; + std::map<std::string, bool> _serverStatus; }; typedef IceUtil::Handle<NodeObserverTopic> NodeObserverTopicPtr; diff --git a/cpp/test/IceGrid/activation/AllTests.cpp b/cpp/test/IceGrid/activation/AllTests.cpp index 3ae92ba6e95..7fba9e9975e 100644 --- a/cpp/test/IceGrid/activation/AllTests.cpp +++ b/cpp/test/IceGrid/activation/AllTests.cpp @@ -92,6 +92,10 @@ allTests(const Ice::CommunicatorPtr& communicator) IceGrid::RegistryPrx registry = IceGrid::RegistryPrx::checkedCast( communicator->stringToProxy(communicator->getDefaultLocator()->ice_getIdentity().category + "/Registry")); test(registry); + + IceGrid::QueryPrx query = IceGrid::QueryPrx::checkedCast( + communicator->stringToProxy(communicator->getDefaultLocator()->ice_getIdentity().category + "/Query")); + IceGrid::AdminSessionPrx session = registry->createAdminSession("foo", "bar"); session->ice_getConnection()->setACM(registry->getACMTimeout(), IceUtil::None, Ice::ICE_ENUM(ACMHeartbeat, HeartbeatAlways)); @@ -253,6 +257,8 @@ allTests(const Ice::CommunicatorPtr& communicator) cout << "testing server disable... " << flush; try { + int count = query->findAllObjectsByType("Test").size(); + test(admin->getServerState("server") == IceGrid::Inactive); admin->enableServer("server", false); try @@ -263,6 +269,9 @@ allTests(const Ice::CommunicatorPtr& communicator) catch(const Ice::NoEndpointException&) { } + + test(query->findAllObjectsByType("Test").size() == count - 1); + try { admin->startServer("server"); @@ -292,6 +301,7 @@ allTests(const Ice::CommunicatorPtr& communicator) { } test(admin->getServerState("server-manual") == IceGrid::Inactive); + test(query->findAllObjectsByType("Test").size() == count - 2); test(admin->getServerState("server-always") == IceGrid::Active); admin->enableServer("server-always", false); @@ -314,7 +324,7 @@ allTests(const Ice::CommunicatorPtr& communicator) { } test(admin->getServerState("server-always") == IceGrid::Inactive); - + test(query->findAllObjectsByType("Test").size() == count - 3); test(admin->getServerState("server") == IceGrid::Inactive); admin->enableServer("server", true); @@ -335,6 +345,8 @@ allTests(const Ice::CommunicatorPtr& communicator) test(admin->getServerPid("server") == pid); admin->stopServer("server"); test(admin->getServerState("server") == IceGrid::Inactive); + + test(query->findAllObjectsByType("Test").size() == count - 2); } catch(const Ice::LocalException& ex) { diff --git a/cpp/test/IceGrid/activation/application.xml b/cpp/test/IceGrid/activation/application.xml index 2b5ea2c2a72..a73b1cde237 100644 --- a/cpp/test/IceGrid/activation/application.xml +++ b/cpp/test/IceGrid/activation/application.xml @@ -14,9 +14,9 @@ activation-timeout="${activation-timeout}" deactivation-timeout="${deactivation-timeout}"> <adapter name="TestAdapter" endpoints="default"> - <object identity="${server}" type="Test"/> - <allocatable identity="${server}" type="Test"/> - </adapter> + <object identity="${server}" type="Test"/> + <allocatable identity="${server}" type="Test"/> + </adapter> <property name="ActivationDelay" value="${activation-delay}"/> <property name="DeactivationDelay" value="${deactivation-delay}"/> <property name="FailOnStartup" value="${fail-on-startup}"/> @@ -53,13 +53,13 @@ <server id="invalid-exe" exe="server2" activation="on-demand"> <adapter name="TestAdapter" endpoints="default"> - <object identity="${server}" type="Test"/> - </adapter> + <object identity="${server}" type="Test"/> + </adapter> </server> <server id="invalid-pwd" exe="./server" pwd="./bogus"> <adapter name="TestAdapter" endpoints="default"> - <object identity="${server}" type="Test"/> - </adapter> + <object identity="${server}" type="Test"/> + </adapter> </server> <server id="invalid-exe-no-oa" exe="./server2"> <property name="Ice.Admin.Endpoints" value=""/> diff --git a/cpp/test/IceGrid/allocation/AllTests.cpp b/cpp/test/IceGrid/allocation/AllTests.cpp index 82b7131aa6e..e0c0871c4d5 100644 --- a/cpp/test/IceGrid/allocation/AllTests.cpp +++ b/cpp/test/IceGrid/allocation/AllTests.cpp @@ -480,7 +480,6 @@ allTests(const Ice::CommunicatorPtr& communicator) session1->setAllocationTimeout(0); session2->setAllocationTimeout(0); - try { obj = session1->allocateObjectByType("::Unknown"); @@ -595,6 +594,17 @@ allTests(const Ice::CommunicatorPtr& communicator) session1->releaseObject(obj->ice_getIdentity()); + admin->enableServer("ObjectAllocation", false); + try + { + session1->allocateObjectByType("::Test"); + test(false); + } + catch(const AllocationException&) + { + } + admin->enableServer("ObjectAllocation", true); + cout << "ok" << endl; cout << "testing object allocation timeout... " << flush; @@ -817,6 +827,17 @@ allTests(const Ice::CommunicatorPtr& communicator) session2->releaseObject(obj1->ice_getIdentity()); session2->releaseObject(obj2->ice_getIdentity()); + admin->enableServer("ServerAllocation", false); + try + { + session1->allocateObjectByType("::TestServer1"); + test(false); + } + catch(const AllocationException&) + { + } + admin->enableServer("ServerAllocation", true); + cout << "ok" << endl; cout << "testing concurrent allocations... " << flush; diff --git a/cpp/test/IceGrid/allocation/application.xml b/cpp/test/IceGrid/allocation/application.xml index 27c8ad021f7..2855186151b 100644 --- a/cpp/test/IceGrid/allocation/application.xml +++ b/cpp/test/IceGrid/allocation/application.xml @@ -27,8 +27,8 @@ <server id="ObjectAllocation" exe="${server.dir}/server" activation="on-demand" pwd="."> <adapter name="Server" endpoints="default"> - <allocatable identity="allocatable" type="::Test"/> - <allocatable identity="allocatablebis" type="::TestBis"/> + <allocatable identity="allocatable" type="::Test"/> + <allocatable identity="allocatablebis" type="::TestBis"/> <object identity="nonallocatable" type="::Test"/> <object identity="nonallocatable2" type="::NotAllocatable"/> |