summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2005-10-12 17:21:02 +0000
committerBenoit Foucher <benoit@zeroc.com>2005-10-12 17:21:02 +0000
commitaac841a43441f7911056ddbc6fc8c21aa6126431 (patch)
tree8dcad281655b53155e9c10e72b07d436208787a8 /cpp/src/IceGrid/Database.cpp
parentchanging getLogger to return a custom Python impl (diff)
downloadice-aac841a43441f7911056ddbc6fc8c21aa6126431.tar.bz2
ice-aac841a43441f7911056ddbc6fc8c21aa6126431.tar.xz
ice-aac841a43441f7911056ddbc6fc8c21aa6126431.zip
Added support for replica groups and removed replicated adapters.
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp267
1 files changed, 142 insertions, 125 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 3523a17614c..7df15736697 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -24,6 +24,7 @@ using namespace IceGrid;
const string Database::_descriptorDbName = "applications";
const string Database::_adapterDbName = "adapters";
+const string Database::_replicaGroupDbName = "replica-groups";
const string Database::_objectDbName = "objects";
namespace IceGrid
@@ -50,17 +51,7 @@ public:
virtual Ice::ObjectPrx
getDirectProxy(const Ice::Current& current) const
{
- istringstream is(current.id.name);
- unsigned int size;
- is >> size;
- char c;
- is >> c;
- assert(c == '-');
- string id;
- is >> id;
- string adapterId = id.substr(0, size);
- string replicaId = (id.size() > size) ? id.substr(size + 1) : string();
- return _database->getAdapterDirectProxy(adapterId, replicaId);
+ return _database->getAdapterDirectProxy(current.id.name);
}
virtual void
@@ -137,6 +128,7 @@ Database::Database(const Ice::ObjectAdapterPtr& adapter,
_descriptors(_connection, _descriptorDbName),
_objects(_connection, _objectDbName),
_adapters(_connection, _adapterDbName),
+ _replicaGroups(_connection, _replicaGroupDbName),
_lock(0),
_serial(0)
{
@@ -509,10 +501,11 @@ Database::getAllNodeServers(const string& node)
}
bool
-Database::setAdapterDirectProxy(const string& adapterId, const string& replicaId, const Ice::ObjectPrx& proxy)
+Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy)
{
Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringObjectProxiesDict adapters(connection, _adapterDbName);
+ StringProxyDict adapters(connection, _adapterDbName);
+ StringStringSeqDict replicaGroups(connection, _replicaGroupDbName);
if(proxy)
{
Lock sync(*this);
@@ -521,39 +514,46 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaId
return false;
}
- StringObjectProxiesDict::iterator p = adapters.find(adapterId);
+ StringProxyDict::iterator p = adapters.find(adapterId);
if(p != adapters.end())
{
- StringObjectProxyDict proxies = p->second;
- proxies[replicaId] = proxy;
- p.set(proxies);
-
+ p.set(proxy);
if(_traceLevels->adapter > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
out << "updated adapter `" << adapterId << "'";
- if(!replicaId.empty())
- {
- out << " from replica `" << replicaId << "'";
- }
}
}
else
{
- StringObjectProxyDict proxies;
- proxies[replicaId] = proxy;
- adapters.put(StringObjectProxiesDict::value_type(adapterId, proxies));
-
+ adapters.put(StringProxyDict::value_type(adapterId, proxy));
if(_traceLevels->adapter > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
out << "added adapter `" << adapterId << "'";
- if(!replicaId.empty())
+ }
+ }
+
+ if(!replicaGroupId.empty())
+ {
+ StringStringSeqDict::iterator q = replicaGroups.find(replicaGroupId);
+ if(q != replicaGroups.end())
+ {
+ if(find(q->second.begin(), q->second.end(), adapterId) == q->second.end())
{
- out << " from replica `" << replicaId << "'";
+ Ice::StringSeq adapters = q->second;
+ adapters.push_back(adapterId);
+ q.set(adapters);
}
}
+ else
+ {
+ Ice::StringSeq adapters;
+ adapters.push_back(adapterId);
+ replicaGroups.put(StringStringSeqDict::value_type(replicaGroupId, adapters));
+ }
}
+
return true;
}
else
@@ -564,59 +564,43 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaId
return false;
}
- StringObjectProxiesDict::iterator p = adapters.find(adapterId);
- if(p == adapters.end())
+ if(adapters.erase(adapterId) == 0)
{
return true;
}
- StringObjectProxyDict proxies = p->second;
- if(proxies.erase(replicaId) == 0)
+ if(_traceLevels->adapter > 0)
{
- throw AdapterNotExistException(adapterId, replicaId);
+ Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
+ out << "removed adapter `" << adapterId << "'";
}
- if(proxies.empty())
+ if(!replicaGroupId.empty())
{
- adapters.erase(p);
-
- if(_traceLevels->adapter > 0)
+ StringStringSeqDict::iterator q = replicaGroups.find(replicaGroupId);
+ if(q == replicaGroups.end())
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "removed adapter `" << adapterId << "'";
+ return true;
}
+
+ Ice::StringSeq adapters = q->second;
+ adapters.erase(remove(adapters.begin(), adapters.end(), adapterId), adapters.end());
+ q.set(adapters);
}
- else
- {
- p.set(proxies);
- if(_traceLevels->adapter > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "removed adapter `" << adapterId << "'";
- if(!replicaId.empty())
- {
- out << " from replica `" << replicaId << "'";
- }
- }
- }
return true;
}
}
Ice::ObjectPrx
-Database::getAdapterDirectProxy(const string& adapterId, const string& replicaId)
+Database::getAdapterDirectProxy(const string& adapterId)
{
Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringObjectProxiesDict adapters(connection, _adapterDbName);
- StringObjectProxiesDict::const_iterator p = adapters.find(adapterId);
+ StringProxyDict adapters(connection, _adapterDbName);
+ StringProxyDict::const_iterator p = adapters.find(adapterId);
if(p != adapters.end())
{
- StringObjectProxyDict::const_iterator q = p->second.find(replicaId);
- if(q != p->second.end())
- {
- return q->second;
- }
+ return p->second;
}
return 0;
}
@@ -637,28 +621,39 @@ Database::removeAdapter(const string& adapterId)
}
Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringObjectProxiesDict adapters(connection, _adapterDbName);
- StringObjectProxiesDict::iterator p = adapters.find(adapterId);
+ StringProxyDict adapters(connection, _adapterDbName);
+ StringProxyDict::iterator p = adapters.find(adapterId);
if(p != adapters.end())
{
adapters.erase(p);
-
if(_traceLevels->adapter > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
out << "removed adapter `" << adapterId << "'";
}
+ return;
}
- else
+
+ StringStringSeqDict replicaGroups(connection, _replicaGroupDbName);
+ StringStringSeqDict::iterator q = replicaGroups.find(adapterId);
+ if(q != replicaGroups.end())
{
- throw AdapterNotExistException(adapterId, "");
+ replicaGroups.erase(q);
+ if(_traceLevels->adapter > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
+ out << "removed adapter `" << adapterId << "'";
+ }
+ return;
}
+
+ throw AdapterNotExistException(adapterId);
}
AdapterPrx
-Database::getAdapter(const string& id, const string& replicaId)
+Database::getAdapter(const string& id, const string& replicaGroupId)
{
- return _adapterCache.get(id)->getProxy(replicaId);
+ return _adapterCache.getServerAdapter(id, false)->getProxy(replicaGroupId);
}
vector<pair<string, AdapterPrx> >
@@ -669,18 +664,12 @@ Database::getAdapters(const string& id, int& endpointCount)
// server, if that's the case we get the adapter proxy from the
// server.
//
- auto_ptr<Ice::UserException> exception;
try
{
return _adapterCache.get(id)->getProxies(endpointCount);
}
- catch(AdapterNotExistException& ex)
- {
- exception.reset(dynamic_cast<AdapterNotExistException*>(ex.ice_clone()));
- }
- catch(const NodeUnreachableException& ex)
+ catch(AdapterNotExistException&)
{
- exception.reset(dynamic_cast<NodeUnreachableException*>(ex.ice_clone()));
}
//
@@ -688,29 +677,42 @@ Database::getAdapters(const string& id, int& endpointCount)
// entry the adapter is managed by the registry itself.
//
Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringObjectProxiesDict adapters(connection, _adapterDbName);
- StringObjectProxiesDict::const_iterator p = adapters.find(id);
+ StringProxyDict adapters(connection, _adapterDbName);
+ StringProxyDict::const_iterator p = adapters.find(id);
if(p != adapters.end())
{
vector<pair<string, AdapterPrx> > adapters;
- for(StringObjectProxyDict::const_iterator q = p->second.begin(); q != p->second.end(); ++q)
+ Ice::Identity identity;
+ identity.category = "IceGridAdapter";
+ identity.name = id;
+ AdapterPrx adpt = AdapterPrx::uncheckedCast(_internalAdapter->createDirectProxy(identity));
+ adapters.push_back(make_pair(id, adpt));
+ return adapters;
+ }
+
+ //
+ // If it's not a regular object adapter, perhaps it's a replica
+ // group...
+ //
+ StringStringSeqDict replicaGroups(connection, _replicaGroupDbName);
+ StringStringSeqDict::const_iterator q = replicaGroups.find(id);
+ if(q != replicaGroups.end())
+ {
+ vector<pair<string, AdapterPrx> > adapters;
+ for(Ice::StringSeq::const_iterator r = q->second.begin(); r != q->second.end(); ++r)
{
Ice::Identity identity;
identity.category = "IceGridAdapter";
- ostringstream os;
- os << id.size() << "-" << id << "-" << q->first;
- identity.name = os.str();
- adapters.push_back(
- make_pair(q->first, AdapterPrx::uncheckedCast(_internalAdapter->createDirectProxy(identity))));
+ identity.name = *r;
+ AdapterPrx adpt = AdapterPrx::uncheckedCast(_internalAdapter->createDirectProxy(identity));
+ adapters.push_back(make_pair(*r, adpt));
}
random_shuffle(adapters.begin(), adapters.end());
endpointCount = adapters.size();
return adapters;
}
- assert(exception.get());
- exception->ice_throw();
- return vector<pair<string, AdapterPrx> >(); // Keeps the compiler happy.
+ throw AdapterNotExistException(id);
}
Ice::StringSeq
@@ -720,7 +722,9 @@ Database::getAllAdapters(const string& expression)
vector<string> result;
vector<string> ids = _adapterCache.getAll(expression);
result.swap(ids);
- ids = getMatchingKeys<StringObjectProxiesDict>(_adapters, expression);
+ ids = getMatchingKeys<StringProxyDict>(_adapters, expression);
+ result.insert(result.end(), ids.begin(), ids.end());
+ ids = getMatchingKeys<StringStringSeqDict>(_replicaGroups, expression);
result.insert(result.end(), ids.begin(), ids.end());
return result;
}
@@ -991,7 +995,9 @@ Database::checkServerForAddition(const string& id)
void
Database::checkAdapterForAddition(const string& id)
{
- if(_adapterCache.has(id) || _adapters.find(id) != _adapters.end())
+ if(_adapterCache.has(id) ||
+ _adapters.find(id) != _adapters.end() ||
+ _replicaGroups.find(id) != _replicaGroups.end())
{
DeploymentException ex;
ex.reason = "adapter `" + id + "' is already registered";
@@ -1020,11 +1026,11 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries)
_nodeCache.get(n->first, true)->addDescriptor(application, n->second);
}
- const ReplicatedAdapterDescriptorSeq& adpts = app.getInstance().replicatedAdapters;
- for(ReplicatedAdapterDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
+ const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups;
+ for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
{
assert(!r->id.empty());
- _adapterCache.get(r->id, true)->enableReplication(r->loadBalancing);
+ _adapterCache.getReplicaGroup(r->id, true)->set(application, r->loadBalancing);
for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
{
_objectCache.add(application, r->id, "", *o);
@@ -1041,35 +1047,37 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries)
void
Database::unload(const ApplicationHelper& app, ServerEntrySeq& entries)
{
- const NodeDescriptorDict& nodes = app.getInstance().nodes;
- const string application = app.getInstance().name;
- for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n)
+ map<string, ServerInfo> servers = app.getServerInfos();
+ for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p)
{
- _nodeCache.get(n->first)->removeDescriptor(application);
+ entries.push_back(_serverCache.remove(p->first));
}
- const ReplicatedAdapterDescriptorSeq& adpts = app.getInstance().replicatedAdapters;
- for(ReplicatedAdapterDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
+ const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups;
+ for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
{
for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
{
_objectCache.remove(o->id);
}
- _adapterCache.get(r->id, false)->disableReplication();
+ _adapterCache.remove(r->id);
}
- map<string, ServerInfo> servers = app.getServerInfos();
- for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p)
+ const NodeDescriptorDict& nodes = app.getInstance().nodes;
+ const string application = app.getInstance().name;
+ for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n)
{
- entries.push_back(_serverCache.remove(p->first));
+ _nodeCache.get(n->first)->removeDescriptor(application);
}
}
void
Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newApp, ServerEntrySeq& entries)
{
+ const string application = oldApp.getInstance().name;
+
//
- // Figure out which servers need to removed/updated and added.
+ // Remove destroyed servers.
//
map<string, ServerInfo> oldServers = oldApp.getServerInfos();
map<string, ServerInfo> newServers = newApp.getServerInfos();
@@ -1088,10 +1096,6 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp
load.push_back(p->second);
}
}
-
- //
- // Remove destroyed servers.
- //
for(p = oldServers.begin(); p != oldServers.end(); ++p)
{
map<string, ServerInfo>::const_iterator q = newServers.find(p->first);
@@ -1102,10 +1106,35 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp
}
//
+ // Remove destroyed replica groups.
+ //
+ const ReplicaGroupDescriptorSeq& oldAdpts = oldApp.getInstance().replicaGroups;
+ const ReplicaGroupDescriptorSeq& newAdpts = newApp.getInstance().replicaGroups;
+ ReplicaGroupDescriptorSeq::const_iterator r;
+ for(r = oldAdpts.begin(); r != oldAdpts.end(); ++r)
+ {
+ ReplicaGroupDescriptorSeq::const_iterator t;
+ for(t = newAdpts.begin(); t != newAdpts.end(); ++t)
+ {
+ if(t->id == r->id)
+ {
+ break;
+ }
+ }
+ for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
+ {
+ _objectCache.remove(o->id);
+ }
+ if(t == newAdpts.end())
+ {
+ _adapterCache.remove(r->id);
+ }
+ }
+
+ //
// Remove all the node descriptors.
//
const NodeDescriptorDict& oldNodes = oldApp.getInstance().nodes;
- const string application = oldApp.getInstance().name;
NodeDescriptorDict::const_iterator n;
for(n = oldNodes.begin(); n != oldNodes.end(); ++n)
{
@@ -1113,26 +1142,20 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp
}
//
- // Remove all the replicated adapters.
+ // Add back node descriptors.
//
- const ReplicatedAdapterDescriptorSeq& oldAdpts = oldApp.getInstance().replicatedAdapters;
- ReplicatedAdapterDescriptorSeq::const_iterator r;
- for(r = oldAdpts.begin(); r != oldAdpts.end(); ++r)
+ const NodeDescriptorDict& newNodes = newApp.getInstance().nodes;
+ for(n = newNodes.begin(); n != newNodes.end(); ++n)
{
- for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
- {
- _objectCache.remove(o->id);
- }
- _adapterCache.get(r->id, false)->disableReplication();
+ _nodeCache.get(n->first, true)->addDescriptor(application, n->second);
}
//
- // Add back replicated adapters.
+ // Add back replica groups.
//
- const ReplicatedAdapterDescriptorSeq& newAdpts = newApp.getInstance().replicatedAdapters;
for(r = newAdpts.begin(); r != newAdpts.end(); ++r)
{
- _adapterCache.get(r->id, true)->enableReplication(r->loadBalancing);
+ _adapterCache.getReplicaGroup(r->id, true)->set(application, r->loadBalancing);
for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
{
_objectCache.add(application, r->id, "", *o);
@@ -1140,14 +1163,8 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp
}
//
- // Add back node descriptors.
+ // Add back servers.
//
- const NodeDescriptorDict& newNodes = newApp.getInstance().nodes;
- for(n = newNodes.begin(); n != newNodes.end(); ++n)
- {
- _nodeCache.get(n->first, true)->addDescriptor(application, n->second);
- }
-
for(vector<ServerInfo>::const_iterator q = load.begin(); q != load.end(); ++q)
{
entries.push_back(_serverCache.add(*q));