diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-10-07 18:18:37 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-10-07 18:18:37 +0200 |
commit | 5fc2dc27228263e4c56ba3a49852ab3f8c724299 (patch) | |
tree | a1340491094705a1e604a3df22ec4dad0c8d1a8e /cpp/src/IceGrid/Database.cpp | |
parent | Bug 4251 - add IceUtil::Time double initializers (diff) | |
download | ice-5fc2dc27228263e4c56ba3a49852ab3f8c724299.tar.bz2 ice-5fc2dc27228263e4c56ba3a49852ab3f8c724299.tar.xz ice-5fc2dc27228263e4c56ba3a49852ab3f8c724299.zip |
- Bug 4286: added support for IceStorm/IceGrid database plugins
- Fixed IceGrid database code to first save to the database and then
do state changes.
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 855 |
1 files changed, 575 insertions, 280 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 65379e1dff7..83469e5656a 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -9,6 +9,10 @@ #include <IceUtil/StringUtil.h> #include <IceUtil/Random.h> +#include <IceUtil/Functional.h> +#include <Ice/LoggerUtil.h> +#include <Ice/Communicator.h> +#include <Ice/ObjectAdapter.h> #include <IceGrid/Database.h> #include <IceGrid/TraceLevels.h> #include <IceGrid/Util.h> @@ -17,14 +21,7 @@ #include <IceGrid/ReplicaSessionI.h> #include <IceGrid/Session.h> #include <IceGrid/Topics.h> -#include <IceGrid/DatabaseWrapper.h> -#ifdef QTSQL -# include <IceUtil/Functional.h> -# include <Ice/Communicator.h> -# include <Ice/Instance.h> -# include <Ice/ObjectAdapter.h> -# include <Ice/LoggerUtil.h> -#endif +#include <IceGrid/DB.h> #include <algorithm> #include <functional> @@ -33,13 +30,9 @@ using namespace std; using namespace IceGrid; -#ifdef QTSQL -using namespace IceSQL; -#else -using namespace Freeze; -#endif +using namespace IceDB; -namespace IceGrid +namespace { struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::ObjectPrx, float>&, bool> @@ -74,6 +67,17 @@ isServerUpdated(const ServerInfo& lhs, const ServerInfo& rhs) } } +void +halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex) +{ + { + Ice::Error error(com->getLogger()); + error << "fatal exception: " << ex << "\n*** Aborting application ***"; + } + + abort(); +} + } Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, @@ -81,11 +85,11 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, const string& instanceName, const TraceLevelsPtr& traceLevels, const RegistryInfo& info, + const DatabasePluginPtr& plugin, bool readonly) : _communicator(registryAdapter->getCommunicator()), _internalAdapter(registryAdapter), _topicManager(topicManager), - _envName("Registry"), _instanceName(instanceName), _traceLevels(traceLevels), _master(info.name == "Master"), @@ -96,20 +100,17 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _objectCache(_communicator), _allocatableObjectCache(_communicator), _serverCache(_communicator, _instanceName, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache), - _databaseCache(new IceGrid::DatabaseCache(_communicator, _envName, _instanceName, info.name)), + _databaseCache(plugin->getDatabaseCache()), + _databasePlugin(plugin), _lock(0), _applicationSerial(0) { ServerEntrySeq entries; DatabaseConnectionPtr connection = _databaseCache->getConnection(); - ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); -#ifdef QTSQL - StringApplicationInfoDict applications = applicationsWrapper.getMap(); -#else - StringApplicationInfoDict& applications = applicationsWrapper.getMap(); -#endif - for(StringApplicationInfoDict::iterator p = applications.begin(); p != applications.end(); ++p) + ApplicationsWrapperPtr applicationsWrapper = _databaseCache->getApplications(connection); + map<string, ApplicationInfo> applications = applicationsWrapper->getMap(); + for(map<string, ApplicationInfo>::iterator p = applications.begin(); p != applications.end(); ++p) { try { @@ -131,19 +132,25 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter); _registryObserverTopic = new RegistryObserverTopic(_topicManager); - _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, applicationsWrapper.getMap()); + _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, applications); - AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); - _adapterObserverTopic = new AdapterObserverTopic(_topicManager, adaptersWrapper.getMap()); + AdaptersWrapperPtr adaptersWrapper = _databaseCache->getAdapters(connection); + _adapterObserverTopic = new AdapterObserverTopic(_topicManager, adaptersWrapper->getMap()); - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); - _objectObserverTopic = new ObjectObserverTopic(_topicManager, objectsWrapper.getMap()); + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + _objectObserverTopic = new ObjectObserverTopic(_topicManager, objectsWrapper->getMap()); _registryObserverTopic->registryUp(info); } Database::~Database() { + // + // Release first the cache and then the plugin. This must be done in this order + // to make sure the plugin is destroyed after the database cache. + // + _databaseCache = 0; + _databasePlugin = 0; } std::string @@ -228,26 +235,49 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications) int serial = 0; // Initialize to prevent warning. { Lock sync(*this); - - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - TransactionHolder txHolder(connection); + map<string, ApplicationInfo> oldApplications; + for(;;) + { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + try + { + TransactionHolder txHolder(connection); + ApplicationsWrapperPtr applicationsWrapper = _databaseCache->getApplications(connection); + oldApplications = applicationsWrapper->getMap(); + applicationsWrapper->clear(); + for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p) + { + applicationsWrapper->put(p->descriptor.name, *p); + } + txHolder.commit(); + break; + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } + } + ServerEntrySeq entries; set<string> names; - ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p) { try { - try + map<string, ApplicationInfo>::const_iterator q = oldApplications.find(p->descriptor.name); + if(q != oldApplications.end()) { - ApplicationInfo info = applicationsWrapper.find(p->descriptor.name); - ApplicationHelper previous(_communicator, info.descriptor); + ApplicationHelper previous(_communicator, q->second.descriptor); ApplicationHelper helper(_communicator, p->descriptor); reload(previous, helper, entries, p->uuid, p->revision); } - catch(const NotFoundException&) + else { load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision); } @@ -257,29 +287,19 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications) Ice::Warning warn(_traceLevels->logger); warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason; } - applicationsWrapper.put(p->descriptor.name, *p); names.insert(p->descriptor.name); } -#ifdef QTSQL - StringApplicationInfoDict applications = applicationsWrapper.getMap(); -#else - StringApplicationInfoDict& applications = applicationsWrapper.getMap(); -#endif - StringApplicationInfoDict::iterator s = applications.begin(); - while(s != applications.end()) + for(map<string, ApplicationInfo>::iterator s = oldApplications.begin(); s != oldApplications.end(); ++s) { if(names.find(s->first) == names.end()) { unload(ApplicationHelper(_communicator, s->second.descriptor), entries); - applicationsWrapper.erase(s->first); } - ++s; } - ++_applicationSerial; - + + ++_applicationSerial; serial = _applicationObserverTopic->applicationInit(_applicationSerial, newApplications); - txHolder.commit(); } _applicationObserverTopic->waitForSyncedSubscribers(serial); } @@ -290,18 +310,31 @@ Database::syncAdapters(const AdapterInfoSeq& adapters) int serial; { Lock sync(*this); - - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - TransactionHolder txHolder(connection); - - AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); - adaptersWrapper.clear(); - for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r) + for(;;) { - adaptersWrapper.put(*r); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + try + { + TransactionHolder txHolder(connection); + AdaptersWrapperPtr adaptersWrapper = _databaseCache->getAdapters(connection); + adaptersWrapper->clear(); + for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r) + { + adaptersWrapper->put(r->id, *r); + } + txHolder.commit(); + break; + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } } serial = _adapterObserverTopic->adapterInit(adapters); - txHolder.commit(); } _adapterObserverTopic->waitForSyncedSubscribers(serial); } @@ -312,19 +345,31 @@ Database::syncObjects(const ObjectInfoSeq& objects) int serial; { Lock sync(*this); - - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - TransactionHolder txHolder(connection); - - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); - - objectsWrapper.clear(); - for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) + for(;;) { - objectsWrapper.put(q->proxy->ice_getIdentity(), *q); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + try + { + TransactionHolder txHolder(connection); + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + objectsWrapper->clear(); + for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) + { + objectsWrapper->put(q->proxy->ice_getIdentity(), *q); + } + txHolder.commit(); + break; + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } } serial = _objectObserverTopic->objectInit(objects); - txHolder.commit(); } _objectObserverTopic->waitForSyncedSubscribers(serial); } @@ -332,19 +377,19 @@ Database::syncObjects(const ObjectInfoSeq& objects) void Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) { - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); - ServerEntrySeq entries; + try { Lock sync(*this); checkSessionLock(session); waitForUpdate(info.descriptor.name); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); try { - applicationsWrapper.find(info.descriptor.name); + ApplicationsWrapperPtr applicationsWrapper = _databaseCache->getApplications(connection); + applicationsWrapper->find(info.descriptor.name); throw DeploymentException("application `" + info.descriptor.name + "' already exists"); } catch(const NotFoundException&) @@ -352,10 +397,15 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) } ApplicationHelper helper(_communicator, info.descriptor, true); - checkForAddition(helper); + checkForAddition(helper, connection); + saveApplication(info, connection); load(helper, entries, info.uuid, info.revision); startUpdating(info.descriptor.name, info.uuid, info.revision); } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } if(_master) { @@ -370,12 +420,17 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) Lock sync(*this); entries.clear(); unload(ApplicationHelper(_communicator, info.descriptor), entries); + removeApplication(info.descriptor.name, _databaseCache->getConnection()); } catch(const DeploymentException& ex) { Ice::Error err(_traceLevels->logger); err << "failed to rollback previous application `" << info.descriptor.name << "':\n" << ex.reason; } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } finishUpdating(info.descriptor.name); throw ex; } @@ -384,9 +439,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) int serial; { Lock sync(*this); - - ++_applicationSerial; - applicationsWrapper.put(info.descriptor.name, info); + ++_applicationSerial; serial = _applicationObserverTopic->applicationAdded(_applicationSerial, info); if(_traceLevels->application > 0) @@ -408,6 +461,7 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* se ApplicationInfo oldApp; ApplicationDescriptor newDesc; ApplicationUpdateInfo update = updt; + try { Lock sync(*this); checkSessionLock(session); @@ -415,10 +469,10 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* se waitForUpdate(update.descriptor.name); DatabaseConnectionPtr connection = _databaseCache->getConnection(); - ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); + ApplicationsWrapperPtr applicationsWrapper = _databaseCache->getApplications(connection); try { - oldApp = applicationsWrapper.find(update.descriptor.name); + oldApp = applicationsWrapper->find(update.descriptor.name); } catch(const NotFoundException&) { @@ -432,14 +486,25 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* se ApplicationHelper previous(_communicator, oldApp.descriptor); ApplicationHelper helper(_communicator, previous.update(update.descriptor), true); + newDesc = helper.getDefinition(); - checkForUpdate(previous, helper); - reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1); + checkForUpdate(previous, helper, connection); - newDesc = helper.getDefinition(); + ApplicationInfo info = oldApp; + info.updateTime = update.updateTime; + info.updateUser = update.updateUser; + info.revision = update.revision; + info.descriptor = newDesc; + saveApplication(info, connection); + + reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1); startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1); } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } finishApplicationUpdate(entries, update, oldApp, newDesc, session); } @@ -450,6 +515,7 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS ServerEntrySeq entries; ApplicationUpdateInfo update; ApplicationInfo oldApp; + try { Lock sync(*this); checkSessionLock(session); @@ -457,10 +523,10 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS waitForUpdate(newDesc.name); DatabaseConnectionPtr connection = _databaseCache->getConnection(); - ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); + ApplicationsWrapperPtr applicationsWrapper = _databaseCache->getApplications(connection); try { - oldApp = applicationsWrapper.find(newDesc.name); + oldApp = applicationsWrapper->find(newDesc.name); } catch(const NotFoundException&) { @@ -475,11 +541,23 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS update.revision = oldApp.revision + 1; update.descriptor = helper.diff(previous); - checkForUpdate(previous, helper); + checkForUpdate(previous, helper, connection); + + ApplicationInfo info = oldApp; + info.updateTime = update.updateTime; + info.updateUser = update.updateUser; + info.revision = update.revision; + info.descriptor = newDesc; + saveApplication(info, connection); + reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1); startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1); } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } finishApplicationUpdate(entries, update, oldApp, newDesc, session); } @@ -494,6 +572,8 @@ Database::instantiateServer(const string& application, ApplicationUpdateInfo update; ApplicationInfo oldApp; ApplicationDescriptor newDesc; + + try { Lock sync(*this); checkSessionLock(session); @@ -501,10 +581,10 @@ Database::instantiateServer(const string& application, waitForUpdate(application); DatabaseConnectionPtr connection = _databaseCache->getConnection(); - ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); + ApplicationsWrapperPtr applicationsWrapper = _databaseCache->getApplications(connection); try { - oldApp = applicationsWrapper.find(application); + oldApp = applicationsWrapper->find(application); } catch(const NotFoundException&) { @@ -513,19 +593,30 @@ Database::instantiateServer(const string& application, ApplicationHelper previous(_communicator, oldApp.descriptor); ApplicationHelper helper(_communicator, previous.instantiateServer(node, instance), true); + newDesc = helper.getDefinition(); update.updateTime = IceUtil::Time::now().toMilliSeconds(); update.updateUser = _lockUserId; update.revision = oldApp.revision + 1; update.descriptor = helper.diff(previous); - checkForUpdate(previous, helper); - reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1); + checkForUpdate(previous, helper, connection); - newDesc = helper.getDefinition(); + ApplicationInfo info = oldApp; + info.updateTime = update.updateTime; + info.updateUser = update.updateUser; + info.revision = update.revision; + info.descriptor = newDesc; + saveApplication(info, connection); + + reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1); startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1); } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } finishApplicationUpdate(entries, update, oldApp, newDesc, session); } @@ -533,21 +624,22 @@ Database::instantiateServer(const string& application, void Database::removeApplication(const string& name, AdminSessionI* session) { - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); - ServerEntrySeq entries; int serial; + + try { Lock sync(*this); checkSessionLock(session); waitForUpdate(name); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); ApplicationInfo appInfo; try { - appInfo = applicationsWrapper.find(name); + ApplicationsWrapperPtr applicationsWrapper = _databaseCache->getApplications(connection); + appInfo = applicationsWrapper->find(name); } catch(const NotFoundException&) { @@ -560,6 +652,7 @@ Database::removeApplication(const string& name, AdminSessionI* session) ApplicationHelper helper(_communicator, appInfo.descriptor); init = true; checkForRemove(helper); + removeApplication(name, connection); unload(helper, entries); } catch(const DeploymentException&) @@ -574,10 +667,15 @@ Database::removeApplication(const string& name, AdminSessionI* session) // it's invalid, it's most likely not loaded either. So we // ignore the error and erase the descriptor. // + removeApplication(name, connection); } startUpdating(name, appInfo.uuid, appInfo.revision); } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } if(_master) { @@ -587,9 +685,7 @@ Database::removeApplication(const string& name, AdminSessionI* session) { Lock sync(*this); - applicationsWrapper.erase(name); ++_applicationSerial; - serial = _applicationObserverTopic->applicationRemoved(_applicationSerial, name); if(_traceLevels->application > 0) @@ -610,8 +706,8 @@ Database::getApplicationInfo(const std::string& name) DatabaseConnectionPtr connection = _databaseCache->newConnection(); try { - ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); - return applicationsWrapper.find(name); + ApplicationsWrapperPtr applicationsWrapper = _databaseCache->getApplications(connection); + return applicationsWrapper->find(name); } catch(const NotFoundException&) { @@ -623,8 +719,8 @@ Ice::StringSeq Database::getAllApplications(const string& expression) { DatabaseConnectionPtr connection = _databaseCache->newConnection(); - ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); - return getMatchingKeys<StringApplicationInfoDict>(applicationsWrapper.getMap(), expression); + ApplicationsWrapperPtr applicationsWrapper = _databaseCache->getApplications(connection); + return getMatchingKeys<map<string, ApplicationInfo> >(applicationsWrapper->getMap(), expression); } void @@ -705,43 +801,46 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr } AdapterInfo info; - bool found = false; + info.id = adapterId; + info.proxy = proxy; + info.replicaGroupId = replicaGroupId; - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); - try - { - info = adaptersWrapper.find(adapterId); - found = true; - } - catch(const NotFoundException&) - { - } bool updated = false; - if(proxy) + for(;;) { - if(found) + try { - info.proxy = proxy; - info.replicaGroupId = replicaGroupId; - adaptersWrapper.put(info); - updated = true; + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + AdaptersWrapperPtr adaptersWrapper = _databaseCache->getAdapters(connection); + try + { + adaptersWrapper->find(adapterId); + updated = true; + } + catch(const NotFoundException&) + { + } + + if(proxy) + { + adaptersWrapper->put(adapterId, info); + } + else + { + adaptersWrapper->erase(adapterId); + } + txHolder.commit(); + break; } - else + catch(const DeadlockException&) { - info.id = adapterId; - info.proxy = proxy; - info.replicaGroupId = replicaGroupId; - adaptersWrapper.put(info); - } - } - else - { - if(!found) + continue; + } + catch(const DatabaseException& ex) { - return; + halt(_communicator, ex); } - adaptersWrapper.erase(adapterId); } if(_traceLevels->adapter > 0) @@ -777,17 +876,17 @@ Ice::ObjectPrx Database::getAdapterDirectProxy(const string& id) { DatabaseConnectionPtr connection = _databaseCache->newConnection(); - AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); + AdaptersWrapperPtr adaptersWrapper = _databaseCache->getAdapters(connection); try { - return adaptersWrapper.find(id).proxy; + return adaptersWrapper->find(id).proxy; } catch(const NotFoundException&) { } Ice::EndpointSeq endpoints; - vector<AdapterInfo> infos = adaptersWrapper.findByReplicaGroupId(id); + vector<AdapterInfo> infos = adaptersWrapper->findByReplicaGroupId(id); for(unsigned int i = 0; i < infos.size(); ++i) { Ice::EndpointSeq edpts = infos[i].proxy->ice_getEndpoints(); @@ -816,31 +915,44 @@ Database::removeAdapter(const string& adapterId) throw ex; } - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - TransactionHolder txHolder(connection); - - AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); - AdapterInfoSeq infos; - try - { - adaptersWrapper.find(adapterId); - adaptersWrapper.erase(adapterId); - } - catch(const NotFoundException&) + for(;;) { - infos = adaptersWrapper.findByReplicaGroupId(adapterId); - if(infos.size() == 0) + try { - throw AdapterNotExistException(adapterId); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + AdaptersWrapperPtr adaptersWrapper = _databaseCache->getAdapters(connection); + try + { + adaptersWrapper->find(adapterId); + adaptersWrapper->erase(adapterId); + } + catch(const NotFoundException&) + { + infos = adaptersWrapper->findByReplicaGroupId(adapterId); + if(infos.empty()) + { + throw AdapterNotExistException(adapterId); + } + for(AdapterInfoSeq::iterator p = infos.begin(); p != infos.end(); ++p) + { + p->replicaGroupId.clear(); + adaptersWrapper->put(p->id, *p); + } + } + txHolder.commit(); + break; + } + catch(const DeadlockException&) + { + continue; } - for(unsigned int i = 0; i < infos.size(); ++i) + catch(const DatabaseException& ex) { - infos[i].replicaGroupId = ""; - adaptersWrapper.put(infos[i]); + halt(_communicator, ex); } } - if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); @@ -858,7 +970,6 @@ Database::removeAdapter(const string& adapterId) serial = _adapterObserverTopic->adapterUpdated(*p); } } - txHolder.commit(); } _adapterObserverTopic->waitForSyncedSubscribers(serial); } @@ -904,11 +1015,11 @@ Database::getAdapterInfo(const string& id) // entry the adapter is managed by the registry itself. // DatabaseConnectionPtr connection = _databaseCache->getConnection(); - AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); + AdaptersWrapperPtr adaptersWrapper = _databaseCache->getAdapters(connection); AdapterInfoSeq infos; try { - infos.push_back(adaptersWrapper.find(id)); + infos.push_back(adaptersWrapper->find(id)); } catch(const NotFoundException&) { @@ -916,7 +1027,7 @@ Database::getAdapterInfo(const string& id) // If it's not a regular object adapter, perhaps it's a replica // group... // - infos = adaptersWrapper.findByReplicaGroupId(id); + infos = adaptersWrapper->findByReplicaGroupId(id); if(infos.size() == 0) { throw AdapterNotExistException(id); @@ -936,13 +1047,9 @@ Database::getAllAdapters(const string& expression) set<string> groups; DatabaseConnectionPtr connection = _databaseCache->getConnection(); - AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); -#ifdef QTSQL - StringAdapterInfoDict adapters = adaptersWrapper.getMap(); -#else - StringAdapterInfoDict& adapters = adaptersWrapper.getMap(); -#endif - for(StringAdapterInfoDict::const_iterator p = adapters.begin(); p != adapters.end(); ++p) + AdaptersWrapperPtr adaptersWrapper = _databaseCache->getAdapters(connection); + map<string, AdapterInfo> adapters = adaptersWrapper->getMap(); + for(map<string, AdapterInfo>::const_iterator p = adapters.begin(); p != adapters.end(); ++p) { if(expression.empty() || IceUtilInternal::match(p->first, expression, true)) { @@ -978,17 +1085,34 @@ Database::addObject(const ObjectInfo& info) throw ObjectExistsException(id); } - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); - try - { - objectsWrapper.find(id); - throw ObjectExistsException(id); - } - catch(const NotFoundException&) + for(;;) { + try + { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + try + { + objectsWrapper->find(id); + throw ObjectExistsException(id); + } + catch(const NotFoundException&) + { + } + objectsWrapper->put(id, info); + txHolder.commit(); + break; + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } } - objectsWrapper.put(id, info); serial = _objectObserverTopic->objectAdded(info); @@ -1015,17 +1139,34 @@ Database::addOrUpdateObject(const ObjectInfo& info) } bool update = false; - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); - try - { - objectsWrapper.find(id); - update = true; - } - catch(const NotFoundException&) + for(;;) { + try + { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + try + { + objectsWrapper->find(id); + update = true; + } + catch(const NotFoundException&) + { + } + objectsWrapper->put(id, info); + txHolder.commit(); + break; + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } } - objectsWrapper.put(id, info); if(update) { @@ -1061,21 +1202,38 @@ Database::removeObject(const Ice::Identity& id) throw ex; } - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); - try - { - objectsWrapper.find(id); - } - catch(const NotFoundException&) + for(;;) { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; + try + { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + try + { + objectsWrapper->find(id); + } + catch(const NotFoundException&) + { + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; + } + + objectsWrapper->erase(id); + txHolder.commit(); + break; + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } } - objectsWrapper.erase(id); - serial = _objectObserverTopic->objectRemoved(id); if(_traceLevels->object > 0) @@ -1105,23 +1263,39 @@ Database::updateObject(const Ice::ObjectPrx& proxy) throw ex; } - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); - ObjectInfo info; - try - { - info = objectsWrapper.find(id); - } - catch(const NotFoundException&) + for(;;) { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; + try + { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + try + { + info = objectsWrapper->find(id); + } + catch(const NotFoundException&) + { + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; + } + + info.proxy = proxy; + objectsWrapper->put(id, info); + txHolder.commit(); + break; + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } } - - info.proxy = proxy; - objectsWrapper.put(id, info); serial = _objectObserverTopic->objectUpdated(info); @@ -1139,17 +1313,30 @@ Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects) { Lock sync(*this); - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - TransactionHolder txHolder(connection); - - for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) + for(;;) { - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); - objectsWrapper.put(p->proxy->ice_getIdentity(), *p); + try + { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) + { + objectsWrapper->put(p->proxy->ice_getIdentity(), *p); + } + txHolder.commit(); + break; + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } } int serial = _objectObserverTopic->objectsAddedOrUpdated(objects); - - txHolder.commit(); return serial; } @@ -1158,17 +1345,30 @@ Database::removeObjectsInDatabase(const ObjectInfoSeq& objects) { Lock sync(*this); - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - TransactionHolder txHolder(connection); - - for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) + for(;;) { - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); - objectsWrapper.erase(p->proxy->ice_getIdentity()); + try + { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) + { + objectsWrapper->erase(p->proxy->ice_getIdentity()); + } + txHolder.commit(); + break; + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } } _objectObserverTopic->objectsRemoved(objects); - - txHolder.commit(); } Ice::ObjectPrx @@ -1188,8 +1388,8 @@ Database::getObjectProxy(const Ice::Identity& id) DatabaseConnectionPtr connection = _databaseCache->newConnection(); try { - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); - return objectsWrapper.find(id).proxy; + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + return objectsWrapper->find(id).proxy; } catch(const NotFoundException&) { @@ -1248,8 +1448,8 @@ Database::getObjectsByType(const string& type) Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type); DatabaseConnectionPtr connection = _databaseCache->newConnection(); - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); - vector<ObjectInfo> infos = objectsWrapper.findByType(type); + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + vector<ObjectInfo> infos = objectsWrapper->findByType(type); for(unsigned int i = 0; i < infos.size(); ++i) { proxies.push_back(infos[i].proxy); @@ -1270,10 +1470,10 @@ Database::getObjectInfo(const Ice::Identity& id) } DatabaseConnectionPtr connection = _databaseCache->newConnection(); - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); try { - return objectsWrapper.find(id); + return objectsWrapper->find(id); } catch(const NotFoundException&) { @@ -1287,13 +1487,9 @@ Database::getAllObjectInfos(const string& expression) ObjectInfoSeq infos = _objectCache.getAll(expression); DatabaseConnectionPtr connection = _databaseCache->newConnection(); - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); -#ifdef QTSQL - IdentityObjectInfoDict objects = objectsWrapper.getMap(); -#else - IdentityObjectInfoDict& objects = objectsWrapper.getMap(); -#endif - for(IdentityObjectInfoDict::const_iterator p = objects.begin(); p != objects.end(); ++p) + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + map<Ice::Identity, ObjectInfo> objects = objectsWrapper->getMap(); + for(map<Ice::Identity, ObjectInfo>::const_iterator p = objects.begin(); p != objects.end(); ++p) { if(expression.empty() || IceUtilInternal::match(_communicator->identityToString(p->first), expression, true)) { @@ -1309,8 +1505,8 @@ Database::getObjectInfosByType(const string& type) ObjectInfoSeq infos = _objectCache.getAllByType(type); DatabaseConnectionPtr connection = _databaseCache->newConnection(); - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); - ObjectInfoSeq dbInfos = objectsWrapper.findByType(type); + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + ObjectInfoSeq dbInfos = objectsWrapper->findByType(type); for(unsigned int i = 0; i < dbInfos.size(); ++i) { infos.push_back(dbInfos[i]); @@ -1324,20 +1520,37 @@ Database::addInternalObject(const ObjectInfo& info, bool replace) Lock sync(*this); const Ice::Identity id = info.proxy->ice_getIdentity(); - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - InternalObjectsDictWrapper internalObjectsWrapper(_databaseCache, connection); - if(!replace) + for(;;) { try { - internalObjectsWrapper.find(id); - throw ObjectExistsException(id); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + ObjectsWrapperPtr internalObjectsWrapper = _databaseCache->getInternalObjects(connection); + if(!replace) + { + try + { + internalObjectsWrapper->find(id); + throw ObjectExistsException(id); + } + catch(const NotFoundException&) + { + } + } + internalObjectsWrapper->put(id, info); + txHolder.commit(); + break; } - catch(const NotFoundException&) + catch(const DeadlockException&) { + continue; } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } } - internalObjectsWrapper.put(id, info); } void @@ -1345,19 +1558,36 @@ Database::removeInternalObject(const Ice::Identity& id) { Lock sync(*this); - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - InternalObjectsDictWrapper internalObjectsWrapper(_databaseCache, connection); - try - { - internalObjectsWrapper.find(id); - } - catch(const NotFoundException&) + for(;;) { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; + try + { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + ObjectsWrapperPtr internalObjectsWrapper = _databaseCache->getInternalObjects(connection); + try + { + internalObjectsWrapper->find(id); + } + catch(const NotFoundException&) + { + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; + } + internalObjectsWrapper->erase(id); + txHolder.commit(); + break; + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } } - internalObjectsWrapper.erase(id); } Ice::ObjectProxySeq @@ -1366,8 +1596,8 @@ Database::getInternalObjectsByType(const string& type) Ice::ObjectProxySeq proxies; DatabaseConnectionPtr connection = _databaseCache->newConnection(); - InternalObjectsDictWrapper internalObjectsWrapper(_databaseCache, connection); - vector<ObjectInfo> infos = internalObjectsWrapper.findByType(type); + ObjectsWrapperPtr internalObjectsWrapper = _databaseCache->getInternalObjects(connection); + vector<ObjectInfo> infos = internalObjectsWrapper->findByType(type); for(unsigned int i = 0; i < infos.size(); ++i) { proxies.push_back(infos[i].proxy); @@ -1376,7 +1606,7 @@ Database::getInternalObjectsByType(const string& type) } void -Database::checkForAddition(const ApplicationHelper& app) +Database::checkForAddition(const ApplicationHelper& app, const DatabaseConnectionPtr& connection) { set<string> serverIds; set<string> adapterIds; @@ -1385,8 +1615,22 @@ Database::checkForAddition(const ApplicationHelper& app) app.getIds(serverIds, adapterIds, objectIds); for_each(serverIds.begin(), serverIds.end(), objFunc(*this, &Database::checkServerForAddition)); - for_each(adapterIds.begin(), adapterIds.end(), objFunc(*this, &Database::checkAdapterForAddition)); - for_each(objectIds.begin(), objectIds.end(), objFunc(*this, &Database::checkObjectForAddition)); + if(!adapterIds.empty()) + { + AdaptersWrapperPtr adaptersWrapper = _databaseCache->getAdapters(connection); + for(set<string>::const_iterator p = adapterIds.begin(); p != adapterIds.end(); ++p) + { + checkAdapterForAddition(*p, adaptersWrapper); + } + } + if(!objectIds.empty()) + { + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + for(set<Ice::Identity>::const_iterator p = objectIds.begin(); p != objectIds.end(); ++p) + { + checkObjectForAddition(*p, objectsWrapper); + } + } set<string> repGrps; set<string> adptRepGrps; @@ -1395,7 +1639,9 @@ Database::checkForAddition(const ApplicationHelper& app) } void -Database::checkForUpdate(const ApplicationHelper& origApp, const ApplicationHelper& newApp) +Database::checkForUpdate(const ApplicationHelper& origApp, + const ApplicationHelper& newApp, + const DatabaseConnectionPtr& connection) { set<string> oldSvrs, newSvrs; set<string> oldAdpts, newAdpts; @@ -1410,11 +1656,25 @@ Database::checkForUpdate(const ApplicationHelper& origApp, const ApplicationHelp Ice::StringSeq addedAdpts; set_difference(newAdpts.begin(), newAdpts.end(), oldAdpts.begin(), oldAdpts.end(), back_inserter(addedAdpts)); - for_each(addedAdpts.begin(), addedAdpts.end(), objFunc(*this, &Database::checkAdapterForAddition)); + if(!addedAdpts.empty()) + { + AdaptersWrapperPtr adaptersWrapper = _databaseCache->getAdapters(connection); + for(Ice::StringSeq::const_iterator p = addedAdpts.begin(); p != addedAdpts.end(); ++p) + { + checkAdapterForAddition(*p, adaptersWrapper); + } + } vector<Ice::Identity> addedObjs; set_difference(newObjs.begin(), newObjs.end(), oldObjs.begin(), oldObjs.end(), back_inserter(addedObjs)); - for_each(addedObjs.begin(), addedObjs.end(), objFunc(*this, &Database::checkObjectForAddition)); + if(!addedObjs.empty()) + { + ObjectsWrapperPtr objectsWrapper = _databaseCache->getObjects(connection); + for(vector<Ice::Identity>::const_iterator p = addedObjs.begin(); p != addedObjs.end(); ++p) + { + checkObjectForAddition(*p, objectsWrapper); + } + } set<string> oldRepGrps, newRepGrps; set<string> oldAdptRepGrps, newAdptRepGrps; @@ -1462,7 +1722,7 @@ Database::checkServerForAddition(const string& id) } void -Database::checkAdapterForAddition(const string& id) +Database::checkAdapterForAddition(const string& id, const AdaptersWrapperPtr& adaptersWrapper) { bool found = false; if(_adapterCache.has(id)) @@ -1471,16 +1731,14 @@ Database::checkAdapterForAddition(const string& id) } else { - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); try { - adaptersWrapper.find(id); + adaptersWrapper->find(id); found = true; } catch(const NotFoundException&) { - if(adaptersWrapper.findByReplicaGroupId(id).size() != 0) + if(adaptersWrapper->findByReplicaGroupId(id).size() != 0) { found = true; } @@ -1496,7 +1754,7 @@ Database::checkAdapterForAddition(const string& id) } void -Database::checkObjectForAddition(const Ice::Identity& objectId) +Database::checkObjectForAddition(const Ice::Identity& objectId, const ObjectsWrapperPtr& objectsWrapper) { bool found = false; if(_objectCache.has(objectId) || _allocatableObjectCache.has(objectId)) @@ -1505,11 +1763,9 @@ Database::checkObjectForAddition(const Ice::Identity& objectId) } else { - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - ObjectsDictWrapper objectsWrapper(_databaseCache, connection); try { - objectsWrapper.find(objectId); + objectsWrapper->find(objectId); found = true; } catch(const NotFoundException&) @@ -1758,6 +2014,54 @@ Database::reload(const ApplicationHelper& oldApp, } void +Database::saveApplication(const ApplicationInfo& info, const DatabaseConnectionPtr& connection) +{ + for(;;) + { + try + { + ApplicationsWrapperPtr applicationsWrapper = _databaseCache->getApplications(connection); + TransactionHolder txHolder(connection); + applicationsWrapper->put(info.descriptor.name, info); + txHolder.commit(); + break; + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } + } +} + +void +Database::removeApplication(const string& name, const DatabaseConnectionPtr& connection) +{ + for(;;) + { + try + { + ApplicationsWrapperPtr applicationsWrapper = _databaseCache->getApplications(connection); + TransactionHolder txHolder(connection); + applicationsWrapper->erase(name); + txHolder.commit(); + break; + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } + } +} + +void Database::finishApplicationUpdate(ServerEntrySeq& entries, const ApplicationUpdateInfo& update, const ApplicationInfo& oldApp, @@ -1782,6 +2086,7 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries, entries.clear(); ApplicationHelper previous(_communicator, newDesc); ApplicationHelper helper(_communicator, oldApp.descriptor); + saveApplication(oldApp, _databaseCache->getConnection()); reload(previous, helper, entries, oldApp.uuid, oldApp.revision); } @@ -1810,16 +2115,6 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries, int serial; { Lock sync(*this); - - ApplicationInfo info = oldApp; - info.updateTime = update.updateTime; - info.updateUser = update.updateUser; - info.revision = update.revision; - info.descriptor = newDesc; - - DatabaseConnectionPtr connection = _databaseCache->getConnection(); - ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); - applicationsWrapper.put(update.descriptor.name, info); ++_applicationSerial; if(_traceLevels->application > 0) |