diff options
author | Benoit Foucher <benoit@zeroc.com> | 2005-10-12 17:21:02 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2005-10-12 17:21:02 +0000 |
commit | aac841a43441f7911056ddbc6fc8c21aa6126431 (patch) | |
tree | 8dcad281655b53155e9c10e72b07d436208787a8 /cpp/src/IceGrid/Database.cpp | |
parent | changing getLogger to return a custom Python impl (diff) | |
download | ice-aac841a43441f7911056ddbc6fc8c21aa6126431.tar.bz2 ice-aac841a43441f7911056ddbc6fc8c21aa6126431.tar.xz ice-aac841a43441f7911056ddbc6fc8c21aa6126431.zip |
Added support for replica groups and removed replicated adapters.
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 267 |
1 files changed, 142 insertions, 125 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 3523a17614c..7df15736697 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -24,6 +24,7 @@ using namespace IceGrid; const string Database::_descriptorDbName = "applications"; const string Database::_adapterDbName = "adapters"; +const string Database::_replicaGroupDbName = "replica-groups"; const string Database::_objectDbName = "objects"; namespace IceGrid @@ -50,17 +51,7 @@ public: virtual Ice::ObjectPrx getDirectProxy(const Ice::Current& current) const { - istringstream is(current.id.name); - unsigned int size; - is >> size; - char c; - is >> c; - assert(c == '-'); - string id; - is >> id; - string adapterId = id.substr(0, size); - string replicaId = (id.size() > size) ? id.substr(size + 1) : string(); - return _database->getAdapterDirectProxy(adapterId, replicaId); + return _database->getAdapterDirectProxy(current.id.name); } virtual void @@ -137,6 +128,7 @@ Database::Database(const Ice::ObjectAdapterPtr& adapter, _descriptors(_connection, _descriptorDbName), _objects(_connection, _objectDbName), _adapters(_connection, _adapterDbName), + _replicaGroups(_connection, _replicaGroupDbName), _lock(0), _serial(0) { @@ -509,10 +501,11 @@ Database::getAllNodeServers(const string& node) } bool -Database::setAdapterDirectProxy(const string& adapterId, const string& replicaId, const Ice::ObjectPrx& proxy) +Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy) { Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringObjectProxiesDict adapters(connection, _adapterDbName); + StringProxyDict adapters(connection, _adapterDbName); + StringStringSeqDict replicaGroups(connection, _replicaGroupDbName); if(proxy) { Lock sync(*this); @@ -521,39 +514,46 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaId return false; } - StringObjectProxiesDict::iterator p = adapters.find(adapterId); + StringProxyDict::iterator p = adapters.find(adapterId); if(p != adapters.end()) { - StringObjectProxyDict proxies = p->second; - proxies[replicaId] = proxy; - p.set(proxies); - + p.set(proxy); if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "updated adapter `" << adapterId << "'"; - if(!replicaId.empty()) - { - out << " from replica `" << replicaId << "'"; - } } } else { - StringObjectProxyDict proxies; - proxies[replicaId] = proxy; - adapters.put(StringObjectProxiesDict::value_type(adapterId, proxies)); - + adapters.put(StringProxyDict::value_type(adapterId, proxy)); if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "added adapter `" << adapterId << "'"; - if(!replicaId.empty()) + } + } + + if(!replicaGroupId.empty()) + { + StringStringSeqDict::iterator q = replicaGroups.find(replicaGroupId); + if(q != replicaGroups.end()) + { + if(find(q->second.begin(), q->second.end(), adapterId) == q->second.end()) { - out << " from replica `" << replicaId << "'"; + Ice::StringSeq adapters = q->second; + adapters.push_back(adapterId); + q.set(adapters); } } + else + { + Ice::StringSeq adapters; + adapters.push_back(adapterId); + replicaGroups.put(StringStringSeqDict::value_type(replicaGroupId, adapters)); + } } + return true; } else @@ -564,59 +564,43 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaId return false; } - StringObjectProxiesDict::iterator p = adapters.find(adapterId); - if(p == adapters.end()) + if(adapters.erase(adapterId) == 0) { return true; } - StringObjectProxyDict proxies = p->second; - if(proxies.erase(replicaId) == 0) + if(_traceLevels->adapter > 0) { - throw AdapterNotExistException(adapterId, replicaId); + Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); + out << "removed adapter `" << adapterId << "'"; } - if(proxies.empty()) + if(!replicaGroupId.empty()) { - adapters.erase(p); - - if(_traceLevels->adapter > 0) + StringStringSeqDict::iterator q = replicaGroups.find(replicaGroupId); + if(q == replicaGroups.end()) { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "removed adapter `" << adapterId << "'"; + return true; } + + Ice::StringSeq adapters = q->second; + adapters.erase(remove(adapters.begin(), adapters.end(), adapterId), adapters.end()); + q.set(adapters); } - else - { - p.set(proxies); - if(_traceLevels->adapter > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "removed adapter `" << adapterId << "'"; - if(!replicaId.empty()) - { - out << " from replica `" << replicaId << "'"; - } - } - } return true; } } Ice::ObjectPrx -Database::getAdapterDirectProxy(const string& adapterId, const string& replicaId) +Database::getAdapterDirectProxy(const string& adapterId) { Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringObjectProxiesDict adapters(connection, _adapterDbName); - StringObjectProxiesDict::const_iterator p = adapters.find(adapterId); + StringProxyDict adapters(connection, _adapterDbName); + StringProxyDict::const_iterator p = adapters.find(adapterId); if(p != adapters.end()) { - StringObjectProxyDict::const_iterator q = p->second.find(replicaId); - if(q != p->second.end()) - { - return q->second; - } + return p->second; } return 0; } @@ -637,28 +621,39 @@ Database::removeAdapter(const string& adapterId) } Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringObjectProxiesDict adapters(connection, _adapterDbName); - StringObjectProxiesDict::iterator p = adapters.find(adapterId); + StringProxyDict adapters(connection, _adapterDbName); + StringProxyDict::iterator p = adapters.find(adapterId); if(p != adapters.end()) { adapters.erase(p); - if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "removed adapter `" << adapterId << "'"; } + return; } - else + + StringStringSeqDict replicaGroups(connection, _replicaGroupDbName); + StringStringSeqDict::iterator q = replicaGroups.find(adapterId); + if(q != replicaGroups.end()) { - throw AdapterNotExistException(adapterId, ""); + replicaGroups.erase(q); + if(_traceLevels->adapter > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); + out << "removed adapter `" << adapterId << "'"; + } + return; } + + throw AdapterNotExistException(adapterId); } AdapterPrx -Database::getAdapter(const string& id, const string& replicaId) +Database::getAdapter(const string& id, const string& replicaGroupId) { - return _adapterCache.get(id)->getProxy(replicaId); + return _adapterCache.getServerAdapter(id, false)->getProxy(replicaGroupId); } vector<pair<string, AdapterPrx> > @@ -669,18 +664,12 @@ Database::getAdapters(const string& id, int& endpointCount) // server, if that's the case we get the adapter proxy from the // server. // - auto_ptr<Ice::UserException> exception; try { return _adapterCache.get(id)->getProxies(endpointCount); } - catch(AdapterNotExistException& ex) - { - exception.reset(dynamic_cast<AdapterNotExistException*>(ex.ice_clone())); - } - catch(const NodeUnreachableException& ex) + catch(AdapterNotExistException&) { - exception.reset(dynamic_cast<NodeUnreachableException*>(ex.ice_clone())); } // @@ -688,29 +677,42 @@ Database::getAdapters(const string& id, int& endpointCount) // entry the adapter is managed by the registry itself. // Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringObjectProxiesDict adapters(connection, _adapterDbName); - StringObjectProxiesDict::const_iterator p = adapters.find(id); + StringProxyDict adapters(connection, _adapterDbName); + StringProxyDict::const_iterator p = adapters.find(id); if(p != adapters.end()) { vector<pair<string, AdapterPrx> > adapters; - for(StringObjectProxyDict::const_iterator q = p->second.begin(); q != p->second.end(); ++q) + Ice::Identity identity; + identity.category = "IceGridAdapter"; + identity.name = id; + AdapterPrx adpt = AdapterPrx::uncheckedCast(_internalAdapter->createDirectProxy(identity)); + adapters.push_back(make_pair(id, adpt)); + return adapters; + } + + // + // If it's not a regular object adapter, perhaps it's a replica + // group... + // + StringStringSeqDict replicaGroups(connection, _replicaGroupDbName); + StringStringSeqDict::const_iterator q = replicaGroups.find(id); + if(q != replicaGroups.end()) + { + vector<pair<string, AdapterPrx> > adapters; + for(Ice::StringSeq::const_iterator r = q->second.begin(); r != q->second.end(); ++r) { Ice::Identity identity; identity.category = "IceGridAdapter"; - ostringstream os; - os << id.size() << "-" << id << "-" << q->first; - identity.name = os.str(); - adapters.push_back( - make_pair(q->first, AdapterPrx::uncheckedCast(_internalAdapter->createDirectProxy(identity)))); + identity.name = *r; + AdapterPrx adpt = AdapterPrx::uncheckedCast(_internalAdapter->createDirectProxy(identity)); + adapters.push_back(make_pair(*r, adpt)); } random_shuffle(adapters.begin(), adapters.end()); endpointCount = adapters.size(); return adapters; } - assert(exception.get()); - exception->ice_throw(); - return vector<pair<string, AdapterPrx> >(); // Keeps the compiler happy. + throw AdapterNotExistException(id); } Ice::StringSeq @@ -720,7 +722,9 @@ Database::getAllAdapters(const string& expression) vector<string> result; vector<string> ids = _adapterCache.getAll(expression); result.swap(ids); - ids = getMatchingKeys<StringObjectProxiesDict>(_adapters, expression); + ids = getMatchingKeys<StringProxyDict>(_adapters, expression); + result.insert(result.end(), ids.begin(), ids.end()); + ids = getMatchingKeys<StringStringSeqDict>(_replicaGroups, expression); result.insert(result.end(), ids.begin(), ids.end()); return result; } @@ -991,7 +995,9 @@ Database::checkServerForAddition(const string& id) void Database::checkAdapterForAddition(const string& id) { - if(_adapterCache.has(id) || _adapters.find(id) != _adapters.end()) + if(_adapterCache.has(id) || + _adapters.find(id) != _adapters.end() || + _replicaGroups.find(id) != _replicaGroups.end()) { DeploymentException ex; ex.reason = "adapter `" + id + "' is already registered"; @@ -1020,11 +1026,11 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries) _nodeCache.get(n->first, true)->addDescriptor(application, n->second); } - const ReplicatedAdapterDescriptorSeq& adpts = app.getInstance().replicatedAdapters; - for(ReplicatedAdapterDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r) + const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups; + for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r) { assert(!r->id.empty()); - _adapterCache.get(r->id, true)->enableReplication(r->loadBalancing); + _adapterCache.getReplicaGroup(r->id, true)->set(application, r->loadBalancing); for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) { _objectCache.add(application, r->id, "", *o); @@ -1041,35 +1047,37 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries) void Database::unload(const ApplicationHelper& app, ServerEntrySeq& entries) { - const NodeDescriptorDict& nodes = app.getInstance().nodes; - const string application = app.getInstance().name; - for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n) + map<string, ServerInfo> servers = app.getServerInfos(); + for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p) { - _nodeCache.get(n->first)->removeDescriptor(application); + entries.push_back(_serverCache.remove(p->first)); } - const ReplicatedAdapterDescriptorSeq& adpts = app.getInstance().replicatedAdapters; - for(ReplicatedAdapterDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r) + const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups; + for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r) { for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) { _objectCache.remove(o->id); } - _adapterCache.get(r->id, false)->disableReplication(); + _adapterCache.remove(r->id); } - map<string, ServerInfo> servers = app.getServerInfos(); - for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p) + const NodeDescriptorDict& nodes = app.getInstance().nodes; + const string application = app.getInstance().name; + for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n) { - entries.push_back(_serverCache.remove(p->first)); + _nodeCache.get(n->first)->removeDescriptor(application); } } void Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newApp, ServerEntrySeq& entries) { + const string application = oldApp.getInstance().name; + // - // Figure out which servers need to removed/updated and added. + // Remove destroyed servers. // map<string, ServerInfo> oldServers = oldApp.getServerInfos(); map<string, ServerInfo> newServers = newApp.getServerInfos(); @@ -1088,10 +1096,6 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp load.push_back(p->second); } } - - // - // Remove destroyed servers. - // for(p = oldServers.begin(); p != oldServers.end(); ++p) { map<string, ServerInfo>::const_iterator q = newServers.find(p->first); @@ -1102,10 +1106,35 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp } // + // Remove destroyed replica groups. + // + const ReplicaGroupDescriptorSeq& oldAdpts = oldApp.getInstance().replicaGroups; + const ReplicaGroupDescriptorSeq& newAdpts = newApp.getInstance().replicaGroups; + ReplicaGroupDescriptorSeq::const_iterator r; + for(r = oldAdpts.begin(); r != oldAdpts.end(); ++r) + { + ReplicaGroupDescriptorSeq::const_iterator t; + for(t = newAdpts.begin(); t != newAdpts.end(); ++t) + { + if(t->id == r->id) + { + break; + } + } + for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) + { + _objectCache.remove(o->id); + } + if(t == newAdpts.end()) + { + _adapterCache.remove(r->id); + } + } + + // // Remove all the node descriptors. // const NodeDescriptorDict& oldNodes = oldApp.getInstance().nodes; - const string application = oldApp.getInstance().name; NodeDescriptorDict::const_iterator n; for(n = oldNodes.begin(); n != oldNodes.end(); ++n) { @@ -1113,26 +1142,20 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp } // - // Remove all the replicated adapters. + // Add back node descriptors. // - const ReplicatedAdapterDescriptorSeq& oldAdpts = oldApp.getInstance().replicatedAdapters; - ReplicatedAdapterDescriptorSeq::const_iterator r; - for(r = oldAdpts.begin(); r != oldAdpts.end(); ++r) + const NodeDescriptorDict& newNodes = newApp.getInstance().nodes; + for(n = newNodes.begin(); n != newNodes.end(); ++n) { - for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) - { - _objectCache.remove(o->id); - } - _adapterCache.get(r->id, false)->disableReplication(); + _nodeCache.get(n->first, true)->addDescriptor(application, n->second); } // - // Add back replicated adapters. + // Add back replica groups. // - const ReplicatedAdapterDescriptorSeq& newAdpts = newApp.getInstance().replicatedAdapters; for(r = newAdpts.begin(); r != newAdpts.end(); ++r) { - _adapterCache.get(r->id, true)->enableReplication(r->loadBalancing); + _adapterCache.getReplicaGroup(r->id, true)->set(application, r->loadBalancing); for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) { _objectCache.add(application, r->id, "", *o); @@ -1140,14 +1163,8 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp } // - // Add back node descriptors. + // Add back servers. // - const NodeDescriptorDict& newNodes = newApp.getInstance().nodes; - for(n = newNodes.begin(); n != newNodes.end(); ++n) - { - _nodeCache.get(n->first, true)->addDescriptor(application, n->second); - } - for(vector<ServerInfo>::const_iterator q = load.begin(); q != load.end(); ++q) { entries.push_back(_serverCache.add(*q)); |