summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp237
1 files changed, 224 insertions, 13 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 474ef43ced2..216525c0314 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -22,6 +22,7 @@
#include <IceGrid/Session.h>
#include <IceGrid/Topics.h>
#include <IceGrid/DB.h>
+#include <IceGrid/IceGrid.h>
#include <algorithm>
#include <functional>
@@ -65,6 +66,53 @@ halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
abort();
}
+void
+filterAdapterInfos(const string& filter,
+ const string& replicaGroupId,
+ const RegistryPluginFacadeIPtr& pluginFacade,
+ const Ice::ConnectionPtr& con,
+ const Ice::Context& ctx,
+ AdapterInfoSeq& infos)
+{
+ if(infos.empty() || !pluginFacade->hasReplicaGroupFilters())
+ {
+ return;
+ }
+
+ vector<ReplicaGroupFilterPtr> filters = pluginFacade->getReplicaGroupFilters(filter);
+ if(filters.empty())
+ {
+ return;
+ }
+
+ Ice::StringSeq adapterIds;
+ adapterIds.reserve(infos.size());
+ for(vector<AdapterInfo>::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ {
+ adapterIds.push_back(p->id);
+ }
+
+ for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q)
+ {
+ adapterIds = (*q)->filter(replicaGroupId, adapterIds, con, ctx);
+ }
+
+ vector<AdapterInfo> filteredAdpts;
+ filteredAdpts.reserve(infos.size());
+ for(Ice::StringSeq::const_iterator q = adapterIds.begin(); q != adapterIds.end(); ++q)
+ {
+ for(vector<AdapterInfo>::const_iterator r = infos.begin(); r != infos.end(); ++r)
+ {
+ if(*q == r->id)
+ {
+ filteredAdpts.push_back(*r);
+ break;
+ }
+ }
+ }
+ infos.swap(filteredAdpts);
+}
+
}
Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
@@ -89,6 +137,7 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_serverCache(_communicator, _instanceName, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache),
_connectionPool(plugin->getConnectionPool()),
_databasePlugin(plugin),
+ _pluginFacade(RegistryPluginFacadeIPtr::dynamicCast(getRegistryPluginFacade())),
_lock(0)
{
ServerEntrySeq entries;
@@ -123,6 +172,8 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_objectObserverTopic = new ObjectObserverTopic(_topicManager, _connectionPool->getObjects(connection));
_registryObserverTopic->registryUp(info);
+
+ _pluginFacade->setDatabase(this);
}
Database::~Database()
@@ -142,8 +193,10 @@ Database::getInstanceName() const
}
void
-Database::destroyTopics()
+Database::destroy()
{
+ _pluginFacade->setDatabase(0);
+
_registryObserverTopic->destroy();
_nodeObserverTopic->destroy();
_applicationObserverTopic->destroy();
@@ -951,7 +1004,8 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
}
Ice::ObjectPrx
-Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& encoding)
+Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& encoding, const Ice::ConnectionPtr& con,
+ const Ice::Context& ctx)
{
DatabaseConnectionPtr connection = _connectionPool->newConnection();
AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
@@ -965,6 +1019,7 @@ Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& en
Ice::EndpointSeq endpoints;
vector<AdapterInfo> infos = adaptersWrapper->findByReplicaGroupId(id);
+ filterAdapterInfos("", id, _pluginFacade, con, ctx, infos);
for(unsigned int i = 0; i < infos.size(); ++i)
{
if(infos[i].proxy->ice_getEncodingVersion() < encoding)
@@ -1069,14 +1124,52 @@ Database::getAdapterProxy(const string& adapterId, const string& replicaGroupId,
void
Database::getLocatorAdapterInfo(const string& id,
+ const Ice::ConnectionPtr& connection,
+ const Ice::Context& context,
LocatorAdapterInfoSeq& adpts,
int& count,
bool& replicaGroup,
bool& roundRobin,
const set<string>& excludes)
{
- Lock sync(*this); // Make sure this isn't call during an update.
- _adapterCache.get(id)->getLocatorAdapterInfo(adpts, count, replicaGroup, roundRobin, excludes);
+ string filter;
+ {
+ Lock sync(*this); // Make sure this isn't call during an update.
+ _adapterCache.get(id)->getLocatorAdapterInfo(adpts, count, replicaGroup, roundRobin, filter, excludes);
+ }
+
+ if(_pluginFacade->hasReplicaGroupFilters() && !adpts.empty())
+ {
+ vector<ReplicaGroupFilterPtr> filters = _pluginFacade->getReplicaGroupFilters(filter);
+ if(!filters.empty())
+ {
+ Ice::StringSeq adapterIds;
+ for(LocatorAdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
+ {
+ adapterIds.push_back(q->id);
+ }
+
+ for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q)
+ {
+ adapterIds = (*q)->filter(id, adapterIds, connection, context);
+ }
+
+ LocatorAdapterInfoSeq filteredAdpts;
+ filteredAdpts.reserve(adpts.size());
+ for(Ice::StringSeq::const_iterator q = adapterIds.begin(); q != adapterIds.end(); ++q)
+ {
+ for(LocatorAdapterInfoSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
+ {
+ if(*q == r->id)
+ {
+ filteredAdpts.push_back(*r);
+ break;
+ }
+ }
+ }
+ adpts.swap(filteredAdpts);
+ }
+ }
}
bool
@@ -1123,7 +1216,7 @@ Database::getAdapterInfo(const string& id)
// group...
//
infos = adaptersWrapper->findByReplicaGroupId(id);
- if(infos.size() == 0)
+ if(infos.empty())
{
throw AdapterNotExistException(id);
}
@@ -1131,6 +1224,112 @@ Database::getAdapterInfo(const string& id)
return infos;
}
+AdapterInfoSeq
+Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con, const Ice::Context& ctx)
+{
+ //
+ // First we check if the given adapter id is associated to a
+ // server, if that's the case we get the adapter proxy from the
+ // server.
+ //
+ try
+ {
+ AdapterInfoSeq infos;
+ ReplicaGroupEntryPtr replicaGroup;
+ {
+ Lock sync(*this); // Make sure this isn't call during an update.
+
+ AdapterEntryPtr entry = _adapterCache.get(id);
+ infos = entry->getAdapterInfo();
+ replicaGroup = ReplicaGroupEntryPtr::dynamicCast(entry);
+ }
+ if(replicaGroup)
+ {
+ filterAdapterInfos(replicaGroup->getFilter(), id, _pluginFacade, con, ctx, infos);
+ }
+ return infos;
+ }
+ catch(AdapterNotExistException&)
+ {
+ }
+
+ //
+ // Otherwise, we check the adapter endpoint table -- if there's an
+ // entry the adapter is managed by the registry itself.
+ //
+ DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
+ AdapterInfoSeq infos;
+ try
+ {
+ infos.push_back(adaptersWrapper->find(id));
+ }
+ catch(const NotFoundException&)
+ {
+ //
+ // If it's not a regular object adapter, perhaps it's a replica
+ // group...
+ //
+ infos = adaptersWrapper->findByReplicaGroupId(id);
+ if(infos.empty())
+ {
+ throw AdapterNotExistException(id);
+ }
+ filterAdapterInfos("", id, _pluginFacade, con, ctx, infos);
+ }
+ return infos;
+}
+
+string
+Database::getAdapterServer(const string& id) const
+{
+ try
+ {
+ Lock sync(*this); // Make sure this isn't call during an update.
+ ServerAdapterEntryPtr adapter = ServerAdapterEntryPtr::dynamicCast(_adapterCache.get(id));
+ if(adapter)
+ {
+ return adapter->getServerId();
+ }
+ }
+ catch(AdapterNotExistException&)
+ {
+ }
+ return "";
+}
+
+string
+Database::getAdapterApplication(const string& id) const
+{
+ try
+ {
+ Lock sync(*this); // Make sure this isn't call during an update.
+ return _adapterCache.get(id)->getApplication();
+ }
+ catch(AdapterNotExistException&)
+ {
+ }
+ return "";
+}
+
+string
+Database::getAdapterNode(const string& id) const
+{
+ try
+ {
+ Lock sync(*this); // Make sure this isn't call during an update.
+ ServerAdapterEntryPtr adapter = ServerAdapterEntryPtr::dynamicCast(_adapterCache.get(id));
+ if(adapter)
+ {
+ return adapter->getNodeName();
+ }
+ }
+ catch(AdapterNotExistException&)
+ {
+ }
+ return "";
+}
+
Ice::StringSeq
Database::getAllAdapters(const string& expression)
{
@@ -1505,9 +1704,9 @@ Database::getObjectProxy(const Ice::Identity& id)
}
Ice::ObjectPrx
-Database::getObjectByType(const string& type)
+Database::getObjectByType(const string& type, const Ice::ConnectionPtr& con, const Ice::Context& ctx)
{
- Ice::ObjectProxySeq objs = getObjectsByType(type);
+ Ice::ObjectProxySeq objs = getObjectsByType(type, con, ctx);
if(objs.empty())
{
return 0;
@@ -1516,9 +1715,10 @@ Database::getObjectByType(const string& type)
}
Ice::ObjectPrx
-Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample)
+Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample, const Ice::ConnectionPtr& con,
+ const Ice::Context& ctx)
{
- Ice::ObjectProxySeq objs = getObjectsByType(type);
+ Ice::ObjectProxySeq objs = getObjectsByType(type, con, ctx);
if(objs.empty())
{
return 0;
@@ -1548,7 +1748,7 @@ Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample
Ice::ObjectProxySeq
-Database::getObjectsByType(const string& type)
+Database::getObjectsByType(const string& type, const Ice::ConnectionPtr& con, const Ice::Context& ctx)
{
Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type);
@@ -1559,6 +1759,18 @@ Database::getObjectsByType(const string& type)
{
proxies.push_back(infos[i].proxy);
}
+
+ if(con && !proxies.empty() && _pluginFacade->hasTypeFilters())
+ {
+ vector<TypeFilterPtr> filters = _pluginFacade->getTypeFilters(type);
+ if(!filters.empty())
+ {
+ for(vector<TypeFilterPtr>::const_iterator p = filters.begin(); p != filters.end(); ++p)
+ {
+ proxies = (*p)->filter(type, proxies, con, ctx);
+ }
+ }
+ }
return proxies;
}
@@ -2088,7 +2300,7 @@ Database::reload(const ApplicationHelper& oldApp,
{
ReplicaGroupEntryPtr entry = ReplicaGroupEntryPtr::dynamicCast(_adapterCache.get(r->id));
assert(entry);
- entry->update(application, r->loadBalancing);
+ entry->update(application, r->loadBalancing, r->filter);
}
catch(const AdapterNotExistException&)
{
@@ -2307,8 +2519,7 @@ Database::checkUpdate(const ApplicationHelper& oldApp,
{
#if defined(__SUNPRO_CC) && defined(_RWSTD_NO_MEMBER_TEMPLATES)
Ice::StringSeq nodes;
- for(set<string>::const_iterator p = unreachableNodes.begin(); p != unreachableNodes.end\
- (); ++p)
+ for(set<string>::const_iterator p = unreachableNodes.begin(); p != unreachableNodes.end(); ++p)
{
nodes.push_back(*p);
}