diff options
author | Jose <jose@zeroc.com> | 2014-04-18 18:31:48 +0200 |
---|---|---|
committer | Jose <jose@zeroc.com> | 2014-04-18 18:31:48 +0200 |
commit | e7333297345efda9379045495d17aadb571ddd50 (patch) | |
tree | 5bfa86a78d29665e1ef50a9575b76114be1145e1 /cpp/src/IceGrid/Database.cpp | |
parent | Fixed replicaGroup test issue (diff) | |
download | ice-e7333297345efda9379045495d17aadb571ddd50.tar.bz2 ice-e7333297345efda9379045495d17aadb571ddd50.tar.xz ice-e7333297345efda9379045495d17aadb571ddd50.zip |
Fixed (ICE-4858) - Eliminate IceDB
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 579 |
1 files changed, 295 insertions, 284 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 216525c0314..25d513ac724 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -21,8 +21,8 @@ #include <IceGrid/ReplicaSessionI.h> #include <IceGrid/Session.h> #include <IceGrid/Topics.h> -#include <IceGrid/DB.h> #include <IceGrid/IceGrid.h> +#include <IceGrid/SerialsDict.h> #include <algorithm> #include <functional> @@ -30,12 +30,17 @@ using namespace std; using namespace IceGrid; - -using namespace IceDB; +using namespace Freeze; namespace { +const string applicationsDbName = "applications"; +const string adaptersDbName = "adapters"; +const string objectsDbName = "objects"; +const string internalObjectsDbName = "internal-objects"; +const string serialsDbName = "serials"; + struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::ObjectPrx, float>&, bool> { bool operator()(const pair<Ice::ObjectPrx, float>& lhs, const pair<Ice::ObjectPrx, float>& rhs) @@ -44,17 +49,33 @@ struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::Ob } }; -template<typename K, typename V> vector<V> -toVector(const map<K,V>& m) +template<typename K, typename V, typename C, typename Comp> vector<V> +toVector(const Map<K, V, C, Comp>& m) { vector<V> v; - for(typename map<K,V>::const_iterator p = m.begin(); p != m.end(); ++p) + for(typename Map<K, V, C, Comp>::const_iterator p = m.begin(); p != m.end(); ++p) { v.push_back(p->second); } return v; } +template<typename K, typename V, typename C, typename Comp> map<K, V> +toMap(const Map<K, V, C, Comp>& d) +{ + std::map<K, V> m; + for(typename Map<K, V, C, Comp>::const_iterator p = d.begin(); p != d.end(); ++p) + { +#ifdef __SUNPRO_CC + std::map<Key, Value>::value_type v(p->first, p->second); + m.insert(v); +#else + m.insert(*p); +#endif + } + return m; +} + void halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex) { @@ -113,6 +134,73 @@ filterAdapterInfos(const string& filter, infos.swap(filteredAdpts); } +Ice::Long +getSerial(const Freeze::ConnectionPtr& connection, const string& dbName) +{ + SerialsDict dict(connection, serialsDbName); + + // + // If a serial number is provided, juste update the serial number from the database, + // otherwise if the serial is 0, we increment the serial from the database. + // + SerialsDict::iterator p = dict.find(dbName); + if(p == dict.end()) + { + dict.insert(SerialsDict::value_type(dbName, 1)); + return 1; + } + return p->second; +} + +Ice::Long +updateSerial(const Freeze::ConnectionPtr& connection, const string& dbName, Ice::Long serial = 0) +{ + if(serial == -1) // Master doesn't support serials. + { + return -1; + } + + SerialsDict dict(connection, serialsDbName); + + // + // If a serial number is provided, juste update the serial number from the database, + // otherwise if the serial is 0, we increment the serial from the database. + // + SerialsDict::iterator p = dict.find(dbName); + if(p == dict.end()) + { + dict.insert(SerialsDict::value_type(dbName, serial == 0 ? 1 : serial)); + return 1; + } + else + { + p.set(serial == 0 ? p->second + 1 : serial); + return p->second; + } +} + +vector<AdapterInfo> +findByReplicaGroupId(const StringAdapterInfoDict& dict, const string& name) +{ + vector<AdapterInfo> result; + for(StringAdapterInfoDict::const_iterator p = dict.findByReplicaGroupId(name, true); p != dict.end(); ++p) + { + result.push_back(p->second); + } + return result; +} + +vector<ObjectInfo> +findByType(const IdentityObjectInfoDict& dict, const string& type) +{ + vector<ObjectInfo> result; + for(IdentityObjectInfoDict::const_iterator p = dict.findByType(type); p != dict.end(); ++p) + { + result.push_back(p->second); + } + return result; +} + } Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, @@ -120,7 +208,8 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, const string& instanceName, const TraceLevelsPtr& traceLevels, const RegistryInfo& info, - const DatabasePluginPtr& plugin, + const Freeze::ConnectionPtr& connection, + const string& envName, bool readonly) : _communicator(registryAdapter->getCommunicator()), _internalAdapter(registryAdapter), @@ -135,17 +224,18 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _objectCache(_communicator), _allocatableObjectCache(_communicator), _serverCache(_communicator, _instanceName, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache), - _connectionPool(plugin->getConnectionPool()), - _databasePlugin(plugin), + _connection(connection), + _envName(envName), + _applications(_connection, applicationsDbName), + _adapters(_connection, adaptersDbName), + _objects(_connection, objectsDbName), + _internalObjects(_connection, internalObjectsDbName), _pluginFacade(RegistryPluginFacadeIPtr::dynamicCast(getRegistryPluginFacade())), _lock(0) { ServerEntrySeq entries; - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); - map<string, ApplicationInfo> applications = applicationsWrapper->getMap(); - for(map<string, ApplicationInfo>::iterator p = applications.begin(); p != applications.end(); ++p) + for(StringApplicationInfoDict::iterator p = _applications.begin(); p != _applications.end(); ++p) { try { @@ -167,25 +257,15 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter); _registryObserverTopic = new RegistryObserverTopic(_topicManager); - _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, applicationsWrapper); - _adapterObserverTopic = new AdapterObserverTopic(_topicManager, _connectionPool->getAdapters(connection)); - _objectObserverTopic = new ObjectObserverTopic(_topicManager, _connectionPool->getObjects(connection)); + _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, toMap(_applications), getSerial(_connection, applicationsDbName)); + _adapterObserverTopic = new AdapterObserverTopic(_topicManager, toMap(_adapters), getSerial(_connection, adaptersDbName)); + _objectObserverTopic = new ObjectObserverTopic(_topicManager, toMap(_objects), getSerial(_connection, objectsDbName)); _registryObserverTopic->registryUp(info); _pluginFacade->setDatabase(this); } -Database::~Database() -{ - // - // Release first the cache and then the plugin. This must be done in this order - // to make sure the plugin is destroyed after the database cache. - // - _connectionPool = 0; - _databasePlugin = 0; -} - std::string Database::getInstanceName() const { @@ -275,18 +355,16 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long map<string, ApplicationInfo> oldApplications; for(;;) { - DatabaseConnectionPtr connection = _connectionPool->getConnection(); try { - TransactionHolder txHolder(connection); - ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); - oldApplications = applicationsWrapper->getMap(); - applicationsWrapper->clear(); + TransactionHolder txHolder(_connection); + oldApplications = toMap(_applications); + _applications.clear(); for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p) { - applicationsWrapper->put(p->descriptor.name, *p); + _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p)); } - dbSerial = applicationsWrapper->updateSerial(dbSerial); + dbSerial = updateSerial(_connection, applicationsDbName, dbSerial); txHolder.commit(); break; } @@ -299,7 +377,7 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long halt(_communicator, ex); } } - + ServerEntrySeq entries; set<string> names; @@ -357,17 +435,15 @@ Database::syncAdapters(const AdapterInfoSeq& adapters, Ice::Long dbSerial) Lock sync(*this); for(;;) { - DatabaseConnectionPtr connection = _connectionPool->getConnection(); try { - TransactionHolder txHolder(connection); - AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); - adaptersWrapper->clear(); + TransactionHolder txHolder(_connection); + _adapters.clear(); for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r) { - adaptersWrapper->put(r->id, *r); + _adapters.put(StringAdapterInfoDict::value_type(r->id, *r)); } - dbSerial = adaptersWrapper->updateSerial(dbSerial); + dbSerial = updateSerial(_connection, adaptersDbName, dbSerial); txHolder.commit(); break; } @@ -401,17 +477,15 @@ Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial) Lock sync(*this); for(;;) { - DatabaseConnectionPtr connection = _connectionPool->getConnection(); try { - TransactionHolder txHolder(connection); - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); - objectsWrapper->clear(); + TransactionHolder txHolder(_connection); + _objects.clear(); for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) { - objectsWrapper->put(q->proxy->ice_getIdentity(), *q); + _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); } - dbSerial = objectsWrapper->updateSerial(dbSerial); + dbSerial = updateSerial(_connection, objectsDbName, dbSerial); txHolder.commit(); break; } @@ -443,11 +517,11 @@ Database::getApplications(Ice::Long& serial) const { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); TransactionHolder txHolder(connection); - ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); - serial = applicationsWrapper->getSerial(); - return toVector(applicationsWrapper->getMap()); + StringApplicationInfoDict applications(connection, applicationsDbName); + serial = getSerial(connection, applicationsDbName); + return toVector(applications); } catch(const DeadlockException&) { @@ -467,11 +541,11 @@ Database::getAdapters(Ice::Long& serial) const { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); TransactionHolder txHolder(connection); - AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); - serial = adaptersWrapper->getSerial(); - return toVector(adaptersWrapper->getMap()); + StringAdapterInfoDict adapters(connection, adaptersDbName); + serial = getSerial(connection, adaptersDbName); + return toVector(adapters); } catch(const DeadlockException&) { @@ -491,11 +565,11 @@ Database::getObjects(Ice::Long& serial) const { try { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); TransactionHolder txHolder(connection); - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); - serial = objectsWrapper->getSerial(); - return toVector(objectsWrapper->getMap()); + IdentityObjectInfoDict objects(connection, objectsDbName); + serial = getSerial(connection, objectsDbName); + return toVector(objects); } catch(const DeadlockException&) { @@ -511,7 +585,7 @@ Database::getObjects(Ice::Long& serial) const StringLongDict Database::getSerials() const { - return _connectionPool->getSerials(); + return toMap(SerialsDict(Freeze::createConnection(_communicator, _envName), serialsDbName)); } void @@ -528,20 +602,15 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic waitForUpdate(info.descriptor.name); - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - try + StringApplicationInfoDict::const_iterator i = _applications.find(info.descriptor.name); + if(i != _applications.end()) { - ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); - applicationsWrapper->find(info.descriptor.name); throw DeploymentException("application `" + info.descriptor.name + "' already exists"); } - catch(const NotFoundException&) - { - } ApplicationHelper helper(_communicator, info.descriptor, true); - checkForAddition(helper, connection); - dbSerial = saveApplication(info, connection, dbSerial); + checkForAddition(helper, _connection); + dbSerial = saveApplication(info, _connection, dbSerial); load(helper, entries, info.uuid, info.revision); startUpdating(info.descriptor.name, info.uuid, info.revision); @@ -589,7 +658,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic Lock sync(*this); entries.clear(); unload(ApplicationHelper(_communicator, info.descriptor), entries); - dbSerial = removeApplication(info.descriptor.name, _connectionPool->getConnection()); + dbSerial = removeApplication(info.descriptor.name, _connection); for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); serial = _applicationObserverTopic->applicationRemoved(dbSerial, info.descriptor.name); @@ -635,16 +704,12 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, A waitForUpdate(update.descriptor.name); - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); - try - { - oldApp = applicationsWrapper->find(update.descriptor.name); - } - catch(const NotFoundException&) + StringApplicationInfoDict::const_iterator i = _applications.find(update.descriptor.name); + if(i == _applications.end()) { throw ApplicationNotExistException(update.descriptor.name); } + oldApp = i->second; if(update.revision < 0) { @@ -680,16 +745,12 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool n waitForUpdate(newDesc.name); - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); - try - { - oldApp = applicationsWrapper->find(newDesc.name); - } - catch(const NotFoundException&) + StringApplicationInfoDict::const_iterator i = _applications.find(newDesc.name); + if(i == _applications.end()) { throw ApplicationNotExistException(newDesc.name); } + oldApp = i->second; previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor)); helper.reset(new ApplicationHelper(_communicator, newDesc, true)); @@ -724,21 +785,18 @@ Database::instantiateServer(const string& application, try { - Lock sync(*this); + Lock sync(*this); checkSessionLock(session); waitForUpdate(application); - - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); - try - { - oldApp = applicationsWrapper->find(application); - } - catch(const NotFoundException&) + + StringApplicationInfoDict::const_iterator i = _applications.find(application); + if(i == _applications.end()) { throw ApplicationNotExistException(application); + } + oldApp = i->second; previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor)); helper.reset(new ApplicationHelper(_communicator, previous->instantiateServer(node, instance), true)); @@ -772,17 +830,14 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon waitForUpdate(name); - DatabaseConnectionPtr connection = _connectionPool->getConnection(); ApplicationInfo appInfo; - try - { - ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); - appInfo = applicationsWrapper->find(name); - } - catch(const NotFoundException&) + + StringApplicationInfoDict::const_iterator i = _applications.find(name); + if(i == _applications.end()) { throw ApplicationNotExistException(name); } + appInfo = i->second; bool init = false; try @@ -800,7 +855,7 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon } } - dbSerial = removeApplication(name, connection, dbSerial); + dbSerial = removeApplication(name, _connection, dbSerial); startUpdating(name, appInfo.uuid, appInfo.revision); for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); @@ -829,24 +884,22 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon ApplicationInfo Database::getApplicationInfo(const std::string& name) { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); - try - { - ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); - return applicationsWrapper->find(name); - } - catch(const NotFoundException&) + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + StringApplicationInfoDict applications(connection, applicationsDbName); + StringApplicationInfoDict::const_iterator i = applications.find(name); + if(i == applications.end()) { throw ApplicationNotExistException(name); } + return i->second; } Ice::StringSeq Database::getAllApplications(const string& expression) { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); - ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); - return getMatchingKeys<map<string, ApplicationInfo> >(applicationsWrapper->getMap(), expression); + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + StringApplicationInfoDict applications(connection, applicationsDbName); + return getMatchingKeys<map<string, ApplicationInfo> >(toMap(applications), expression); } void @@ -939,27 +992,29 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr { try { - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - TransactionHolder txHolder(connection); - AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); - try - { - adaptersWrapper->find(adapterId); - updated = true; - } - catch(const NotFoundException&) - { - } - + TransactionHolder txHolder(_connection); + StringAdapterInfoDict::iterator i = _adapters.find(adapterId); if(proxy) { - adaptersWrapper->put(adapterId, info); + if(i == _adapters.end()) + { + _adapters.put(StringAdapterInfoDict::value_type(adapterId, info)); + } + else + { + updated = true; + i.set(info); + } } else { - adaptersWrapper->erase(adapterId); + if(i == _adapters.end()) + { + return; + } + _adapters.erase(i); } - dbSerial = adaptersWrapper->updateSerial(dbSerial); + dbSerial = updateSerial(_connection, adaptersDbName, dbSerial); txHolder.commit(); break; } @@ -1007,18 +1062,16 @@ Ice::ObjectPrx Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& encoding, const Ice::ConnectionPtr& con, const Ice::Context& ctx) { - DatabaseConnectionPtr connection = _connectionPool->newConnection(); - AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); - try - { - return adaptersWrapper->find(id).proxy; - } - catch(const NotFoundException&) + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + StringAdapterInfoDict adapters(connection, adaptersDbName); + StringAdapterInfoDict::const_iterator i = adapters.find(id); + if(i != adapters.end()) { + return i->second.proxy; } Ice::EndpointSeq endpoints; - vector<AdapterInfo> infos = adaptersWrapper->findByReplicaGroupId(id); + vector<AdapterInfo> infos = findByReplicaGroupId(adapters, id); filterAdapterInfos("", id, _pluginFacade, con, ctx, infos); for(unsigned int i = 0; i < infos.size(); ++i) { @@ -1059,17 +1112,15 @@ Database::removeAdapter(const string& adapterId) { try { - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - TransactionHolder txHolder(connection); - AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); - try + TransactionHolder txHolder(_connection); + StringAdapterInfoDict::iterator i = _adapters.find(adapterId); + if(i != _adapters.end()) { - adaptersWrapper->find(adapterId); - adaptersWrapper->erase(adapterId); + _adapters.erase(i); } - catch(const NotFoundException&) + else { - infos = adaptersWrapper->findByReplicaGroupId(adapterId); + infos = findByReplicaGroupId(_adapters, adapterId); if(infos.empty()) { throw AdapterNotExistException(adapterId); @@ -1077,10 +1128,10 @@ Database::removeAdapter(const string& adapterId) for(AdapterInfoSeq::iterator p = infos.begin(); p != infos.end(); ++p) { p->replicaGroupId.clear(); - adaptersWrapper->put(p->id, *p); + _adapters.put(StringAdapterInfoDict::value_type(p->id, *p)); } } - dbSerial = adaptersWrapper->updateSerial(); + dbSerial = updateSerial(_connection, adaptersDbName); txHolder.commit(); break; } @@ -1202,20 +1253,21 @@ Database::getAdapterInfo(const string& id) // Otherwise, we check the adapter endpoint table -- if there's an // entry the adapter is managed by the registry itself. // - DatabaseConnectionPtr connection = _connectionPool->newConnection(); - AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + StringAdapterInfoDict adapters(connection, adaptersDbName); AdapterInfoSeq infos; - try + StringAdapterInfoDict::const_iterator i = adapters.find(id); + if(i != adapters.end()) { - infos.push_back(adaptersWrapper->find(id)); + infos.push_back(i->second); } - catch(const NotFoundException&) + else { // // If it's not a regular object adapter, perhaps it's a replica // group... // - infos = adaptersWrapper->findByReplicaGroupId(id); + infos = findByReplicaGroupId(adapters, id); if(infos.empty()) { throw AdapterNotExistException(id); @@ -1257,20 +1309,21 @@ Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con // Otherwise, we check the adapter endpoint table -- if there's an // entry the adapter is managed by the registry itself. // - DatabaseConnectionPtr connection = _connectionPool->newConnection(); - AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + StringAdapterInfoDict adapters(connection, adaptersDbName); AdapterInfoSeq infos; - try + StringAdapterInfoDict::const_iterator i = adapters.find(id); + if(i != adapters.end()) { - infos.push_back(adaptersWrapper->find(id)); + infos.push_back(i->second); } - catch(const NotFoundException&) + else { // // If it's not a regular object adapter, perhaps it's a replica // group... // - infos = adaptersWrapper->findByReplicaGroupId(id); + infos = findByReplicaGroupId(adapters, id); if(infos.empty()) { throw AdapterNotExistException(id); @@ -1339,10 +1392,7 @@ Database::getAllAdapters(const string& expression) result.swap(ids); set<string> groups; - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); - map<string, AdapterInfo> adapters = adaptersWrapper->getMap(); - for(map<string, AdapterInfo>::const_iterator p = adapters.begin(); p != adapters.end(); ++p) + for(StringAdapterInfoDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) { if(expression.empty() || IceUtilInternal::match(p->first, expression, true)) { @@ -1385,19 +1435,14 @@ Database::addObject(const ObjectInfo& info) { try { - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - TransactionHolder txHolder(connection); - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); - try + TransactionHolder txHolder(_connection); + IdentityObjectInfoDict::const_iterator i = _objects.find(id); + if(i != _objects.end()) { - objectsWrapper->find(id); throw ObjectExistsException(id); } - catch(const NotFoundException&) - { - } - objectsWrapper->put(id, info); - dbSerial = objectsWrapper->updateSerial(); + _objects.put(IdentityObjectInfoDict::value_type(id, info)); + dbSerial = updateSerial(_connection, objectsDbName); txHolder.commit(); break; } @@ -1442,19 +1487,18 @@ Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial) { try { - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - TransactionHolder txHolder(connection); - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); - try + TransactionHolder txHolder(_connection); + IdentityObjectInfoDict::iterator i = _objects.find(id); + if(i != _objects.end()) { - objectsWrapper->find(id); update = true; + i.set(info); } - catch(const NotFoundException&) + else { + _objects.put(IdentityObjectInfoDict::value_type(id, info)); } - objectsWrapper->put(id, info); - dbSerial = objectsWrapper->updateSerial(dbSerial); + dbSerial = updateSerial(_connection, objectsDbName, dbSerial); txHolder.commit(); break; } @@ -1508,22 +1552,17 @@ Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial) { try { - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - TransactionHolder txHolder(connection); - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); - try - { - objectsWrapper->find(id); - } - catch(const NotFoundException&) + TransactionHolder txHolder(_connection); + IdentityObjectInfoDict::iterator i = _objects.find(id); + if(i == _objects.end()) { ObjectNotRegisteredException ex; ex.id = id; throw ex; } - objectsWrapper->erase(id); - dbSerial = objectsWrapper->updateSerial(dbSerial); + _objects.erase(i); + dbSerial = updateSerial(_connection, objectsDbName, dbSerial); txHolder.commit(); break; } @@ -1574,23 +1613,18 @@ Database::updateObject(const Ice::ObjectPrx& proxy) { try { - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - TransactionHolder txHolder(connection); - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); - try - { - info = objectsWrapper->find(id); - } - catch(const NotFoundException&) + TransactionHolder txHolder(_connection); + IdentityObjectInfoDict::iterator i = _objects.find(id); + if(i == _objects.end()) { ObjectNotRegisteredException ex; ex.id = id; throw ex; } - + info = i->second; info.proxy = proxy; - objectsWrapper->put(id, info); - dbSerial = objectsWrapper->updateSerial(); + i.set(info); + dbSerial = updateSerial(_connection, objectsDbName); txHolder.commit(); break; } @@ -1605,7 +1639,6 @@ Database::updateObject(const Ice::ObjectPrx& proxy) } serial = _objectObserverTopic->objectUpdated(dbSerial, info); - if(_traceLevels->object > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); @@ -1623,12 +1656,10 @@ Database::addOrUpdateRegistryWellKnownObjects(const ObjectInfoSeq& objects) { try { - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - TransactionHolder txHolder(connection); - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); + TransactionHolder txHolder(_connection); for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) { - objectsWrapper->put(p->proxy->ice_getIdentity(), *p); + _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p)); } txHolder.commit(); break; @@ -1653,12 +1684,10 @@ Database::removeRegistryWellKnownObjects(const ObjectInfoSeq& objects) { try { - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - TransactionHolder txHolder(connection); - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); + TransactionHolder txHolder(_connection); for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) { - objectsWrapper->erase(p->proxy->ice_getIdentity()); + _objects.erase(p->proxy->ice_getIdentity()); } txHolder.commit(); break; @@ -1689,18 +1718,16 @@ Database::getObjectProxy(const Ice::Identity& id) { } - DatabaseConnectionPtr connection = _connectionPool->newConnection(); - try - { - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); - return objectsWrapper->find(id).proxy; - } - catch(const NotFoundException&) + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + IdentityObjectInfoDict objects(connection, objectsDbName); + IdentityObjectInfoDict::const_iterator i = objects.find(id); + if(i == objects.end()) { ObjectNotRegisteredException ex; ex.id = id; throw ex; } + return i->second.proxy; } Ice::ObjectPrx @@ -1752,9 +1779,9 @@ Database::getObjectsByType(const string& type, const Ice::ConnectionPtr& con, co { Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type); - DatabaseConnectionPtr connection = _connectionPool->newConnection(); - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); - vector<ObjectInfo> infos = objectsWrapper->findByType(type); + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + IdentityObjectInfoDict objects(connection, objectsDbName); + vector<ObjectInfo> infos = findByType(objects, type); for(unsigned int i = 0; i < infos.size(); ++i) { proxies.push_back(infos[i].proxy); @@ -1786,16 +1813,14 @@ Database::getObjectInfo(const Ice::Identity& id) { } - DatabaseConnectionPtr connection = _connectionPool->newConnection(); - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); - try - { - return objectsWrapper->find(id); - } - catch(const NotFoundException&) + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + IdentityObjectInfoDict objects(connection, objectsDbName); + IdentityObjectInfoDict::const_iterator i = objects.find(id); + if(i == objects.end()) { throw ObjectNotRegisteredException(id); } + return i->second; } ObjectInfoSeq @@ -1803,10 +1828,9 @@ Database::getAllObjectInfos(const string& expression) { ObjectInfoSeq infos = _objectCache.getAll(expression); - DatabaseConnectionPtr connection = _connectionPool->newConnection(); - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); - map<Ice::Identity, ObjectInfo> objects = objectsWrapper->getMap(); - for(map<Ice::Identity, ObjectInfo>::const_iterator p = objects.begin(); p != objects.end(); ++p) + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + IdentityObjectInfoDict objects(connection, objectsDbName); + for(IdentityObjectInfoDict::const_iterator p = objects.begin(); p != objects.end(); ++p) { if(expression.empty() || IceUtilInternal::match(_communicator->identityToString(p->first), expression, true)) { @@ -1821,9 +1845,9 @@ Database::getObjectInfosByType(const string& type) { ObjectInfoSeq infos = _objectCache.getAllByType(type); - DatabaseConnectionPtr connection = _connectionPool->newConnection(); - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); - ObjectInfoSeq dbInfos = objectsWrapper->findByType(type); + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + IdentityObjectInfoDict objects(connection, objectsDbName); + ObjectInfoSeq dbInfos = findByType(objects, type); for(unsigned int i = 0; i < dbInfos.size(); ++i) { infos.push_back(dbInfos[i]); @@ -1834,28 +1858,23 @@ Database::getObjectInfosByType(const string& type) void Database::addInternalObject(const ObjectInfo& info, bool replace) { - Lock sync(*this); + Lock sync(*this); const Ice::Identity id = info.proxy->ice_getIdentity(); for(;;) { try { - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - TransactionHolder txHolder(connection); - ObjectsWrapperPtr internalObjectsWrapper = _connectionPool->getInternalObjects(connection); + TransactionHolder txHolder(_connection); if(!replace) { - try + IdentityObjectInfoDict::const_iterator i = _internalObjects.find(id); + if(i != _internalObjects.end()) { - internalObjectsWrapper->find(id); throw ObjectExistsException(id); } - catch(const NotFoundException&) - { - } } - internalObjectsWrapper->put(id, info); + _internalObjects.put(IdentityObjectInfoDict::value_type(id, info)); txHolder.commit(); break; } @@ -1866,7 +1885,7 @@ Database::addInternalObject(const ObjectInfo& info, bool replace) catch(const DatabaseException& ex) { halt(_communicator, ex); - } + } } } @@ -1879,20 +1898,15 @@ Database::removeInternalObject(const Ice::Identity& id) { try { - DatabaseConnectionPtr connection = _connectionPool->getConnection(); - TransactionHolder txHolder(connection); - ObjectsWrapperPtr internalObjectsWrapper = _connectionPool->getInternalObjects(connection); - try - { - internalObjectsWrapper->find(id); - } - catch(const NotFoundException&) + TransactionHolder txHolder(_connection); + IdentityObjectInfoDict::iterator i = _internalObjects.find(id); + if(i == _internalObjects.end()) { ObjectNotRegisteredException ex; ex.id = id; throw ex; } - internalObjectsWrapper->erase(id); + _internalObjects.erase(i); txHolder.commit(); break; } @@ -1912,9 +1926,9 @@ Database::getInternalObjectsByType(const string& type) { Ice::ObjectProxySeq proxies; - DatabaseConnectionPtr connection = _connectionPool->newConnection(); - ObjectsWrapperPtr internalObjectsWrapper = _connectionPool->getInternalObjects(connection); - vector<ObjectInfo> infos = internalObjectsWrapper->findByType(type); + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + IdentityObjectInfoDict internalObjects(connection, internalObjectsDbName); + vector<ObjectInfo> infos = findByType(internalObjects, type); for(unsigned int i = 0; i < infos.size(); ++i) { proxies.push_back(infos[i].proxy); @@ -1923,7 +1937,7 @@ Database::getInternalObjectsByType(const string& type) } void -Database::checkForAddition(const ApplicationHelper& app, const DatabaseConnectionPtr& connection) +Database::checkForAddition(const ApplicationHelper& app, const ConnectionPtr& connection) { set<string> serverIds; set<string> adapterIds; @@ -1934,18 +1948,18 @@ Database::checkForAddition(const ApplicationHelper& app, const DatabaseConnectio for_each(serverIds.begin(), serverIds.end(), objFunc(*this, &Database::checkServerForAddition)); if(!adapterIds.empty()) { - AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); + StringAdapterInfoDict adapters(connection, adaptersDbName); for(set<string>::const_iterator p = adapterIds.begin(); p != adapterIds.end(); ++p) { - checkAdapterForAddition(*p, adaptersWrapper); + checkAdapterForAddition(*p, adapters); } } if(!objectIds.empty()) { - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); + IdentityObjectInfoDict objects(connection, objectsDbName); for(set<Ice::Identity>::const_iterator p = objectIds.begin(); p != objectIds.end(); ++p) { - checkObjectForAddition(*p, objectsWrapper); + checkObjectForAddition(*p, objects); } } @@ -1958,7 +1972,7 @@ Database::checkForAddition(const ApplicationHelper& app, const DatabaseConnectio void Database::checkForUpdate(const ApplicationHelper& origApp, const ApplicationHelper& newApp, - const DatabaseConnectionPtr& connection) + const ConnectionPtr& connection) { set<string> oldSvrs, newSvrs; set<string> oldAdpts, newAdpts; @@ -1975,10 +1989,10 @@ Database::checkForUpdate(const ApplicationHelper& origApp, set_difference(newAdpts.begin(), newAdpts.end(), oldAdpts.begin(), oldAdpts.end(), back_inserter(addedAdpts)); if(!addedAdpts.empty()) { - AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); + StringAdapterInfoDict adapters(connection, adaptersDbName); for(Ice::StringSeq::const_iterator p = addedAdpts.begin(); p != addedAdpts.end(); ++p) { - checkAdapterForAddition(*p, adaptersWrapper); + checkAdapterForAddition(*p, adapters); } } @@ -1986,10 +2000,10 @@ Database::checkForUpdate(const ApplicationHelper& origApp, set_difference(newObjs.begin(), newObjs.end(), oldObjs.begin(), oldObjs.end(), back_inserter(addedObjs)); if(!addedObjs.empty()) { - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); + IdentityObjectInfoDict objects(connection, objectsDbName); for(vector<Ice::Identity>::const_iterator p = addedObjs.begin(); p != addedObjs.end(); ++p) { - checkObjectForAddition(*p, objectsWrapper); + checkObjectForAddition(*p, objects); } } @@ -2039,7 +2053,7 @@ Database::checkServerForAddition(const string& id) } void -Database::checkAdapterForAddition(const string& id, const AdaptersWrapperPtr& adaptersWrapper) +Database::checkAdapterForAddition(const string& id, const StringAdapterInfoDict& adapters) { bool found = false; if(_adapterCache.has(id)) @@ -2048,14 +2062,14 @@ Database::checkAdapterForAddition(const string& id, const AdaptersWrapperPtr& ad } else { - try + StringAdapterInfoDict::const_iterator i = adapters.find(id); + if(i != adapters.end()) { - adaptersWrapper->find(id); found = true; } - catch(const NotFoundException&) + else { - if(adaptersWrapper->findByReplicaGroupId(id).size() != 0) + if(!findByReplicaGroupId(adapters, id).empty()) { found = true; } @@ -2071,7 +2085,7 @@ Database::checkAdapterForAddition(const string& id, const AdaptersWrapperPtr& ad } void -Database::checkObjectForAddition(const Ice::Identity& objectId, const ObjectsWrapperPtr& objectsWrapper) +Database::checkObjectForAddition(const Ice::Identity& objectId, const IdentityObjectInfoDict& objects) { bool found = false; if(_objectCache.has(objectId) || _allocatableObjectCache.has(objectId)) @@ -2080,14 +2094,11 @@ Database::checkObjectForAddition(const Ice::Identity& objectId, const ObjectsWra } else { - try + IdentityObjectInfoDict::const_iterator i = objects.find(objectId); + if(i != objects.end()) { - objectsWrapper->find(objectId); found = true; } - catch(const NotFoundException&) - { - } } if(found) @@ -2323,17 +2334,17 @@ Database::reload(const ApplicationHelper& oldApp, } Ice::Long -Database::saveApplication(const ApplicationInfo& info, const DatabaseConnectionPtr& connection, Ice::Long dbSerial) +Database::saveApplication(const ApplicationInfo& info, const ConnectionPtr& connection, Ice::Long dbSerial) { assert(dbSerial != 0 || _master); for(;;) { try { - ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); + StringApplicationInfoDict applications(connection, applicationsDbName); TransactionHolder txHolder(connection); - applicationsWrapper->put(info.descriptor.name, info); - dbSerial = applicationsWrapper->updateSerial(dbSerial); + applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info)); + dbSerial = updateSerial(connection, applicationsDbName, dbSerial); txHolder.commit(); break; } @@ -2350,17 +2361,17 @@ Database::saveApplication(const ApplicationInfo& info, const DatabaseConnectionP } Ice::Long -Database::removeApplication(const string& name, const DatabaseConnectionPtr& connection, Ice::Long dbSerial) +Database::removeApplication(const string& name, const ConnectionPtr& connection, Ice::Long dbSerial) { assert(dbSerial != 0 || _master); for(;;) { try { - ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); + StringApplicationInfoDict applications(connection, applicationsDbName); TransactionHolder txHolder(connection); - applicationsWrapper->erase(name); - dbSerial = applicationsWrapper->updateSerial(dbSerial); + applications.erase(name); + dbSerial = updateSerial(connection, applicationsDbName, dbSerial); txHolder.commit(); break; } @@ -2560,7 +2571,7 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update, Ice::Long dbSerial) { const ApplicationDescriptor& newDesc = helper.getDefinition(); - DatabaseConnectionPtr connection = _connectionPool->newConnection(); + ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); ServerEntrySeq entries; int serial = 0; |