diff options
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 237 |
1 files changed, 224 insertions, 13 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 474ef43ced2..216525c0314 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -22,6 +22,7 @@ #include <IceGrid/Session.h> #include <IceGrid/Topics.h> #include <IceGrid/DB.h> +#include <IceGrid/IceGrid.h> #include <algorithm> #include <functional> @@ -65,6 +66,53 @@ halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex) abort(); } +void +filterAdapterInfos(const string& filter, + const string& replicaGroupId, + const RegistryPluginFacadeIPtr& pluginFacade, + const Ice::ConnectionPtr& con, + const Ice::Context& ctx, + AdapterInfoSeq& infos) +{ + if(infos.empty() || !pluginFacade->hasReplicaGroupFilters()) + { + return; + } + + vector<ReplicaGroupFilterPtr> filters = pluginFacade->getReplicaGroupFilters(filter); + if(filters.empty()) + { + return; + } + + Ice::StringSeq adapterIds; + adapterIds.reserve(infos.size()); + for(vector<AdapterInfo>::const_iterator p = infos.begin(); p != infos.end(); ++p) + { + adapterIds.push_back(p->id); + } + + for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q) + { + adapterIds = (*q)->filter(replicaGroupId, adapterIds, con, ctx); + } + + vector<AdapterInfo> filteredAdpts; + filteredAdpts.reserve(infos.size()); + for(Ice::StringSeq::const_iterator q = adapterIds.begin(); q != adapterIds.end(); ++q) + { + for(vector<AdapterInfo>::const_iterator r = infos.begin(); r != infos.end(); ++r) + { + if(*q == r->id) + { + filteredAdpts.push_back(*r); + break; + } + } + } + infos.swap(filteredAdpts); +} + } Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, @@ -89,6 +137,7 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _serverCache(_communicator, _instanceName, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache), _connectionPool(plugin->getConnectionPool()), _databasePlugin(plugin), + _pluginFacade(RegistryPluginFacadeIPtr::dynamicCast(getRegistryPluginFacade())), _lock(0) { ServerEntrySeq entries; @@ -123,6 +172,8 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _objectObserverTopic = new ObjectObserverTopic(_topicManager, _connectionPool->getObjects(connection)); _registryObserverTopic->registryUp(info); + + _pluginFacade->setDatabase(this); } Database::~Database() @@ -142,8 +193,10 @@ Database::getInstanceName() const } void -Database::destroyTopics() +Database::destroy() { + _pluginFacade->setDatabase(0); + _registryObserverTopic->destroy(); _nodeObserverTopic->destroy(); _applicationObserverTopic->destroy(); @@ -951,7 +1004,8 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr } Ice::ObjectPrx -Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& encoding) +Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& encoding, const Ice::ConnectionPtr& con, + const Ice::Context& ctx) { DatabaseConnectionPtr connection = _connectionPool->newConnection(); AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); @@ -965,6 +1019,7 @@ Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& en Ice::EndpointSeq endpoints; vector<AdapterInfo> infos = adaptersWrapper->findByReplicaGroupId(id); + filterAdapterInfos("", id, _pluginFacade, con, ctx, infos); for(unsigned int i = 0; i < infos.size(); ++i) { if(infos[i].proxy->ice_getEncodingVersion() < encoding) @@ -1069,14 +1124,52 @@ Database::getAdapterProxy(const string& adapterId, const string& replicaGroupId, void Database::getLocatorAdapterInfo(const string& id, + const Ice::ConnectionPtr& connection, + const Ice::Context& context, LocatorAdapterInfoSeq& adpts, int& count, bool& replicaGroup, bool& roundRobin, const set<string>& excludes) { - Lock sync(*this); // Make sure this isn't call during an update. - _adapterCache.get(id)->getLocatorAdapterInfo(adpts, count, replicaGroup, roundRobin, excludes); + string filter; + { + Lock sync(*this); // Make sure this isn't call during an update. + _adapterCache.get(id)->getLocatorAdapterInfo(adpts, count, replicaGroup, roundRobin, filter, excludes); + } + + if(_pluginFacade->hasReplicaGroupFilters() && !adpts.empty()) + { + vector<ReplicaGroupFilterPtr> filters = _pluginFacade->getReplicaGroupFilters(filter); + if(!filters.empty()) + { + Ice::StringSeq adapterIds; + for(LocatorAdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q) + { + adapterIds.push_back(q->id); + } + + for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q) + { + adapterIds = (*q)->filter(id, adapterIds, connection, context); + } + + LocatorAdapterInfoSeq filteredAdpts; + filteredAdpts.reserve(adpts.size()); + for(Ice::StringSeq::const_iterator q = adapterIds.begin(); q != adapterIds.end(); ++q) + { + for(LocatorAdapterInfoSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r) + { + if(*q == r->id) + { + filteredAdpts.push_back(*r); + break; + } + } + } + adpts.swap(filteredAdpts); + } + } } bool @@ -1123,7 +1216,7 @@ Database::getAdapterInfo(const string& id) // group... // infos = adaptersWrapper->findByReplicaGroupId(id); - if(infos.size() == 0) + if(infos.empty()) { throw AdapterNotExistException(id); } @@ -1131,6 +1224,112 @@ Database::getAdapterInfo(const string& id) return infos; } +AdapterInfoSeq +Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con, const Ice::Context& ctx) +{ + // + // First we check if the given adapter id is associated to a + // server, if that's the case we get the adapter proxy from the + // server. + // + try + { + AdapterInfoSeq infos; + ReplicaGroupEntryPtr replicaGroup; + { + Lock sync(*this); // Make sure this isn't call during an update. + + AdapterEntryPtr entry = _adapterCache.get(id); + infos = entry->getAdapterInfo(); + replicaGroup = ReplicaGroupEntryPtr::dynamicCast(entry); + } + if(replicaGroup) + { + filterAdapterInfos(replicaGroup->getFilter(), id, _pluginFacade, con, ctx, infos); + } + return infos; + } + catch(AdapterNotExistException&) + { + } + + // + // Otherwise, we check the adapter endpoint table -- if there's an + // entry the adapter is managed by the registry itself. + // + DatabaseConnectionPtr connection = _connectionPool->newConnection(); + AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); + AdapterInfoSeq infos; + try + { + infos.push_back(adaptersWrapper->find(id)); + } + catch(const NotFoundException&) + { + // + // If it's not a regular object adapter, perhaps it's a replica + // group... + // + infos = adaptersWrapper->findByReplicaGroupId(id); + if(infos.empty()) + { + throw AdapterNotExistException(id); + } + filterAdapterInfos("", id, _pluginFacade, con, ctx, infos); + } + return infos; +} + +string +Database::getAdapterServer(const string& id) const +{ + try + { + Lock sync(*this); // Make sure this isn't call during an update. + ServerAdapterEntryPtr adapter = ServerAdapterEntryPtr::dynamicCast(_adapterCache.get(id)); + if(adapter) + { + return adapter->getServerId(); + } + } + catch(AdapterNotExistException&) + { + } + return ""; +} + +string +Database::getAdapterApplication(const string& id) const +{ + try + { + Lock sync(*this); // Make sure this isn't call during an update. + return _adapterCache.get(id)->getApplication(); + } + catch(AdapterNotExistException&) + { + } + return ""; +} + +string +Database::getAdapterNode(const string& id) const +{ + try + { + Lock sync(*this); // Make sure this isn't call during an update. + ServerAdapterEntryPtr adapter = ServerAdapterEntryPtr::dynamicCast(_adapterCache.get(id)); + if(adapter) + { + return adapter->getNodeName(); + } + } + catch(AdapterNotExistException&) + { + } + return ""; +} + Ice::StringSeq Database::getAllAdapters(const string& expression) { @@ -1505,9 +1704,9 @@ Database::getObjectProxy(const Ice::Identity& id) } Ice::ObjectPrx -Database::getObjectByType(const string& type) +Database::getObjectByType(const string& type, const Ice::ConnectionPtr& con, const Ice::Context& ctx) { - Ice::ObjectProxySeq objs = getObjectsByType(type); + Ice::ObjectProxySeq objs = getObjectsByType(type, con, ctx); if(objs.empty()) { return 0; @@ -1516,9 +1715,10 @@ Database::getObjectByType(const string& type) } Ice::ObjectPrx -Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample) +Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample, const Ice::ConnectionPtr& con, + const Ice::Context& ctx) { - Ice::ObjectProxySeq objs = getObjectsByType(type); + Ice::ObjectProxySeq objs = getObjectsByType(type, con, ctx); if(objs.empty()) { return 0; @@ -1548,7 +1748,7 @@ Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample Ice::ObjectProxySeq -Database::getObjectsByType(const string& type) +Database::getObjectsByType(const string& type, const Ice::ConnectionPtr& con, const Ice::Context& ctx) { Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type); @@ -1559,6 +1759,18 @@ Database::getObjectsByType(const string& type) { proxies.push_back(infos[i].proxy); } + + if(con && !proxies.empty() && _pluginFacade->hasTypeFilters()) + { + vector<TypeFilterPtr> filters = _pluginFacade->getTypeFilters(type); + if(!filters.empty()) + { + for(vector<TypeFilterPtr>::const_iterator p = filters.begin(); p != filters.end(); ++p) + { + proxies = (*p)->filter(type, proxies, con, ctx); + } + } + } return proxies; } @@ -2088,7 +2300,7 @@ Database::reload(const ApplicationHelper& oldApp, { ReplicaGroupEntryPtr entry = ReplicaGroupEntryPtr::dynamicCast(_adapterCache.get(r->id)); assert(entry); - entry->update(application, r->loadBalancing); + entry->update(application, r->loadBalancing, r->filter); } catch(const AdapterNotExistException&) { @@ -2307,8 +2519,7 @@ Database::checkUpdate(const ApplicationHelper& oldApp, { #if defined(__SUNPRO_CC) && defined(_RWSTD_NO_MEMBER_TEMPLATES) Ice::StringSeq nodes; - for(set<string>::const_iterator p = unreachableNodes.begin(); p != unreachableNodes.end\ - (); ++p) + for(set<string>::const_iterator p = unreachableNodes.begin(); p != unreachableNodes.end(); ++p) { nodes.push_back(*p); } |