diff options
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 1200 |
1 files changed, 630 insertions, 570 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 8782672ac31..973f39272b3 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -22,7 +22,6 @@ #include <IceGrid/Session.h> #include <IceGrid/Topics.h> #include <IceGrid/IceGrid.h> -#include <IceGrid/SerialsDict.h> #include <algorithm> #include <functional> @@ -30,15 +29,23 @@ using namespace std; using namespace IceGrid; -using namespace Freeze; + +typedef IceDB::ReadWriteCursor<string, ApplicationInfo, IceDB::IceContext, Ice::OutputStream> ApplicationMapRWCursor; +typedef IceDB::ReadOnlyCursor<string, AdapterInfo, IceDB::IceContext, Ice::OutputStream> AdapterMapROCursor; +typedef IceDB::Cursor<string, string, IceDB::IceContext, Ice::OutputStream> AdaptersByGroupMapCursor; +typedef IceDB::ReadOnlyCursor<string, Ice::Identity, IceDB::IceContext, Ice::OutputStream> ObjectsByTypeMapROCursor; +typedef IceDB::ReadOnlyCursor<Ice::Identity, ObjectInfo, IceDB::IceContext, Ice::OutputStream> ObjectsMapROCursor; namespace { const string applicationsDbName = "applications"; const string adaptersDbName = "adapters"; +const string adaptersByReplicaGroupIdDbName = "adaptersByReplicaGroupId"; const string objectsDbName = "objects"; +const string objectsByTypeDbName = "objectsByType"; const string internalObjectsDbName = "internal-objects"; +const string internalObjectsByTypeDbName = "internal-objectsByType"; const string serialsDbName = "serials"; struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::ObjectPrx, float>&, bool> @@ -49,42 +56,41 @@ struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::Ob } }; -template<typename K, typename V, typename C, typename Comp> vector<V> -toVector(const Map<K, V, C, Comp>& m) +template<typename K, typename V, typename C, typename H> vector<V> +toVector(const IceDB::ReadOnlyTxn& txn, const IceDB::Dbi<K, V, C, H>& m) { vector<V> v; - for(typename Map<K, V, C, Comp>::const_iterator p = m.begin(); p != m.end(); ++p) + IceDB::ReadOnlyCursor<K, V, C, H> cursor(m, txn); + K key; + V value; + while(cursor.get(key, value, MDB_NEXT)) { - v.push_back(p->second); + v.push_back(value); } return v; } -template<typename K, typename V, typename C, typename Comp> map<K, V> -toMap(const Map<K, V, C, Comp>& d) +template<typename K, typename V, typename C, typename H> map<K, V> +toMap(const IceDB::Txn& txn, const IceDB::Dbi<K, V, C, H>& d) { std::map<K, V> m; - for(typename Map<K, V, C, Comp>::const_iterator p = d.begin(); p != d.end(); ++p) + IceDB::Cursor<K, V, C, H> cursor(d, txn); + K key; + V value; + while(cursor.get(key, value, MDB_NEXT)) { -#ifdef __SUNPRO_CC - std::map<Key, Value>::value_type v(p->first, p->second); + typename std::map<K, V>::value_type v(key, value); m.insert(v); -#else - m.insert(*p); -#endif } + cursor.close(); return m; } void -halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex) +logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex) { - { - Ice::Error error(com->getLogger()); - error << "fatal exception: " << ex << "\n*** Aborting application ***"; - } - - abort(); + Ice::Error error(com->getLogger()); + error << "LMDB error: " << ex; } void @@ -134,69 +140,52 @@ filterAdapterInfos(const string& filter, infos.swap(filteredAdpts); } -Ice::Long -getSerial(const Freeze::ConnectionPtr& connection, const string& dbName) -{ - SerialsDict dict(connection, serialsDbName); - - // - // If a serial number is provided, juste update the serial number from the database, - // otherwise if the serial is 0, we increment the serial from the database. - // - SerialsDict::iterator p = dict.find(dbName); - if(p == dict.end()) - { - dict.insert(SerialsDict::value_type(dbName, 1)); - return 1; - } - return p->second; -} - -Ice::Long -updateSerial(const Freeze::ConnectionPtr& connection, const string& dbName, Ice::Long serial = 0) -{ - if(serial == -1) // Master doesn't support serials. - { - return -1; - } - - SerialsDict dict(connection, serialsDbName); - - // - // If a serial number is provided, juste update the serial number from the database, - // otherwise if the serial is 0, we increment the serial from the database. - // - SerialsDict::iterator p = dict.find(dbName); - if(p == dict.end()) - { - dict.insert(SerialsDict::value_type(dbName, serial == 0 ? 1 : serial)); - return 1; - } - else - { - p.set(serial == 0 ? p->second + 1 : serial); - return p->second; - } -} - vector<AdapterInfo> -findByReplicaGroupId(const StringAdapterInfoDict& dict, const string& name) +findByReplicaGroupId(const IceDB::Txn& txn, + const StringAdapterInfoMap& adapters, + const StringStringMap& adaptersByGroupId, + const string& name) { vector<AdapterInfo> result; - for(StringAdapterInfoDict::const_iterator p = dict.findByReplicaGroupId(name, true); p != dict.end(); ++p) + AdaptersByGroupMapCursor cursor(adaptersByGroupId, txn); + string id; + if(cursor.find(name, id)) { - result.push_back(p->second); + AdapterInfo info; + adapters.get(txn, id, info); + result.push_back(info); + + string n; + while(cursor.get(n, id, MDB_NEXT) && n == name) + { + adapters.get(txn, id, info); + result.push_back(info); + } } return result; } vector<ObjectInfo> -findByType(const IdentityObjectInfoDict& dict, const string& type) +findByType(const IceDB::ReadOnlyTxn& txn, + const IdentityObjectInfoMap& objects, + const StringIdentityMap& objectsByType, + const string& type) { vector<ObjectInfo> result; - for(IdentityObjectInfoDict::const_iterator p = dict.findByType(type); p != dict.end(); ++p) + ObjectsByTypeMapROCursor cursor(objectsByType, txn); + Ice::Identity id; + if(cursor.find(type, id)) { - result.push_back(p->second); + ObjectInfo info; + objects.get(txn, id, info); + result.push_back(info); + + string t; + while(cursor.get(t, id, MDB_NEXT) && t == type) + { + objects.get(txn, id, info); + result.push_back(info); + } } return result; } @@ -208,8 +197,6 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, const string& instanceName, const TraceLevelsPtr& traceLevels, const RegistryInfo& info, - const Freeze::ConnectionPtr& connection, - const string& envName, bool readonly) : _communicator(registryAdapter->getCommunicator()), _internalAdapter(registryAdapter), @@ -224,27 +211,47 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _objectCache(_communicator), _allocatableObjectCache(_communicator), _serverCache(_communicator, _instanceName, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache), - _connection(connection), - _envName(envName), - _applications(_connection, applicationsDbName), - _adapters(_connection, adaptersDbName), - _objects(_connection, objectsDbName), - _internalObjects(_connection, internalObjectsDbName), + _dbLock(_communicator->getProperties()->getProperty("IceGrid.Registry.LMDB.Path") + "/icedb.lock"), + _env(_communicator->getProperties()->getProperty("IceGrid.Registry.LMDB.Path"), 8, + IceDB::getMapSize(_communicator->getProperties()->getPropertyAsInt("IceGrid.Registry.LMDB.MapSize"))), _pluginFacade(RegistryPluginFacadeIPtr::dynamicCast(getRegistryPluginFacade())), _lock(0) { + IceDB::ReadWriteTxn txn(_env); + + IceDB::IceContext context; + context.communicator = _communicator; + context.encoding.major = 1; + context.encoding.minor = 1; + + _applications = StringApplicationInfoMap(txn, applicationsDbName, context, MDB_CREATE); + + _adapters = StringAdapterInfoMap(txn, adaptersDbName, context, MDB_CREATE); + _adaptersByGroupId = StringStringMap(txn, adaptersByReplicaGroupIdDbName, context, MDB_CREATE|MDB_DUPSORT); + + _objects = IdentityObjectInfoMap(txn, objectsDbName, context, MDB_CREATE); + _objectsByType = StringIdentityMap(txn, objectsByTypeDbName, context, MDB_CREATE|MDB_DUPSORT); + + _internalObjects = IdentityObjectInfoMap(txn, internalObjectsDbName, context, MDB_CREATE); + _internalObjectsByType = StringIdentityMap(txn, internalObjectsByTypeDbName, context, MDB_CREATE|MDB_DUPSORT); + + _serials = StringLongMap(txn, serialsDbName, context, MDB_CREATE); + ServerEntrySeq entries; - for(StringApplicationInfoDict::iterator p = _applications.begin(); p != _applications.end(); ++p) + string k; + ApplicationInfo v; + ApplicationMapRWCursor cursor(_applications, txn); + while(cursor.get(k, v, MDB_NEXT)) { try { - load(ApplicationHelper(_communicator, p->second.descriptor), entries, p->second.uuid, p->second.revision); + load(ApplicationHelper(_communicator, v.descriptor), entries, v.uuid, v.revision); } catch(const DeploymentException& ex) { Ice::Error err(_traceLevels->logger); - err << "invalid application `" << p->first << "':\n" << ex.reason; + err << "invalid application `" << k << "':\n" << ex.reason; } } @@ -257,9 +264,30 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter); _registryObserverTopic = new RegistryObserverTopic(_topicManager); - _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, toMap(_applications), getSerial(_connection, applicationsDbName)); - _adapterObserverTopic = new AdapterObserverTopic(_topicManager, toMap(_adapters), getSerial(_connection, adaptersDbName)); - _objectObserverTopic = new ObjectObserverTopic(_topicManager, toMap(_objects), getSerial(_connection, objectsDbName)); + + // Set all serials to 1 if they have not yet been set. + Ice::Long serial; + if(!_serials.get(txn, applicationsDbName, serial)) + { + _serials.put(txn, applicationsDbName, 1); + } + if(!_serials.get(txn, adaptersDbName, serial)) + { + _serials.put(txn, adaptersDbName, 1); + } + if(!_serials.get(txn, objectsDbName, serial)) + { + _serials.put(txn, objectsDbName, 1); + } + + _applicationObserverTopic = + new ApplicationObserverTopic(_topicManager, toMap(txn, _applications), getSerial(txn, applicationsDbName)); + _adapterObserverTopic = + new AdapterObserverTopic(_topicManager, toMap(txn, _adapters), getSerial(txn, adaptersDbName)); + _objectObserverTopic = + new ObjectObserverTopic(_topicManager, toMap(txn, _objects), getSerial(txn, objectsDbName)); + + txn.commit(); _registryObserverTopic->registryUp(info); @@ -353,29 +381,24 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long Lock sync(*this); map<string, ApplicationInfo> oldApplications; - for(;;) + try { - try - { - TransactionHolder txHolder(_connection); - oldApplications = toMap(_applications); - _applications.clear(); - for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p) - { - _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p)); - } - dbSerial = updateSerial(_connection, applicationsDbName, dbSerial); - txHolder.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) + IceDB::ReadWriteTxn txn(_env); + + oldApplications = toMap(txn, _applications); + _applications.clear(txn); + for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p) { - halt(_communicator, ex); + _applications.put(txn, p->descriptor.name, *p); } + dbSerial = updateSerial(txn, applicationsDbName, dbSerial); + + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; } ServerEntrySeq entries; @@ -433,28 +456,28 @@ Database::syncAdapters(const AdapterInfoSeq& adapters, Ice::Long dbSerial) int serial = 0; { Lock sync(*this); - for(;;) + try { - try - { - TransactionHolder txHolder(_connection); - _adapters.clear(); - for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r) - { - _adapters.put(StringAdapterInfoDict::value_type(r->id, *r)); - } - dbSerial = updateSerial(_connection, adaptersDbName, dbSerial); - txHolder.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) + IceDB::ReadWriteTxn txn(_env); + + _adapters.clear(txn); + _adaptersByGroupId.clear(txn); + for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r) { - halt(_communicator, ex); + addAdapter(txn, *r); } + dbSerial = updateSerial(txn, adaptersDbName, dbSerial); + + txn.commit(); + } + catch(const IceDB::KeyTooLongException&) + { + throw; + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; } if(_traceLevels->adapter > 0) @@ -475,28 +498,24 @@ Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial) int serial = 0; { Lock sync(*this); - for(;;) + try { - try - { - TransactionHolder txHolder(_connection); - _objects.clear(); - for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) - { - _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); - } - dbSerial = updateSerial(_connection, objectsDbName, dbSerial); - txHolder.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) + IceDB::ReadWriteTxn txn(_env); + + _objects.clear(txn); + _objectsByType.clear(txn); + for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) { - halt(_communicator, ex); + addObject(txn, *q, false); } + dbSerial = updateSerial(txn, objectsDbName, dbSerial); + + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; } if(_traceLevels->object > 0) @@ -511,82 +530,67 @@ Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial) } ApplicationInfoSeq -Database::getApplications(Ice::Long& serial) const +Database::getApplications(Ice::Long& serial) { - for(;;) + try { - try - { - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - TransactionHolder txHolder(connection); - StringApplicationInfoDict applications(connection, applicationsDbName); - serial = getSerial(connection, applicationsDbName); - return toVector(applications); - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_communicator, ex); - } + IceDB::ReadOnlyTxn txn(_env); + + serial = getSerial(txn, applicationsDbName); + return toVector(txn, _applications); } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; + } + assert(false); + return ApplicationInfoSeq(); } AdapterInfoSeq -Database::getAdapters(Ice::Long& serial) const +Database::getAdapters(Ice::Long& serial) { - for(;;) + try { - try - { - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - TransactionHolder txHolder(connection); - StringAdapterInfoDict adapters(connection, adaptersDbName); - serial = getSerial(connection, adaptersDbName); - return toVector(adapters); - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_communicator, ex); - } + IceDB::ReadOnlyTxn txn(_env); + + serial = getSerial(txn, adaptersDbName); + return toVector(txn, _adapters); + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; } + assert(false); + return AdapterInfoSeq(); } ObjectInfoSeq -Database::getObjects(Ice::Long& serial) const +Database::getObjects(Ice::Long& serial) { - for(;;) + try { - try - { - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - TransactionHolder txHolder(connection); - IdentityObjectInfoDict objects(connection, objectsDbName); - serial = getSerial(connection, objectsDbName); - return toVector(objects); - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_communicator, ex); - } + IceDB::ReadOnlyTxn txn(_env); + + serial = getSerial(txn, objectsDbName); + return toVector(txn, _objects); } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; + } + assert(false); + return ObjectInfoSeq(); } StringLongDict Database::getSerials() const { - SerialsDict serials(Freeze::createConnection(_communicator, _envName), serialsDbName); - return toMap(serials); + IceDB::ReadOnlyTxn txn(_env); + return toMap(txn, _serials); } void @@ -603,24 +607,33 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic waitForUpdate(info.descriptor.name); - StringApplicationInfoDict::const_iterator i = _applications.find(info.descriptor.name); - if(i != _applications.end()) + IceDB::ReadWriteTxn txn(_env); + + if(_applications.find(txn, info.descriptor.name)) { throw DeploymentException("application `" + info.descriptor.name + "' already exists"); } ApplicationHelper helper(_communicator, info.descriptor, true); - checkForAddition(helper, _connection); - dbSerial = saveApplication(info, _connection, dbSerial); + checkForAddition(helper, txn); + dbSerial = saveApplication(info, txn, dbSerial); + + txn.commit(); + load(helper, entries, info.uuid, info.revision); startUpdating(info.descriptor.name, info.uuid, info.revision); for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); serial = _applicationObserverTopic->applicationAdded(dbSerial, info); } - catch(const DatabaseException& ex) + catch(const IceDB::KeyTooLongException& ex) + { + throw DeploymentException("application name `" + info.descriptor.name + "' is too long: " + ex.what()); + } + catch(const IceDB::LMDBException& ex) { - halt(_communicator, ex); + logError(_communicator, ex); + throw; } _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for replicas to be updated. @@ -659,7 +672,10 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic Lock sync(*this); entries.clear(); unload(ApplicationHelper(_communicator, info.descriptor), entries); - dbSerial = removeApplication(info.descriptor.name, _connection); + + IceDB::ReadWriteTxn txn(_env); + dbSerial = removeApplication(info.descriptor.name, txn); + txn.commit(); for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); serial = _applicationObserverTopic->applicationRemoved(dbSerial, info.descriptor.name); @@ -669,10 +685,11 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic Ice::Error err(_traceLevels->logger); err << "failed to rollback previous application `" << info.descriptor.name << "':\n" << ex.reason; } - catch(const DatabaseException& ex) + catch(const IceDB::LMDBException& ex) { - halt(_communicator, ex); + logError(_communicator, ex); } + _applicationObserverTopic->waitForSyncedSubscribers(serial); for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow)); finishUpdating(info.descriptor.name); @@ -705,12 +722,12 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, A waitForUpdate(update.descriptor.name); - StringApplicationInfoDict::const_iterator i = _applications.find(update.descriptor.name); - if(i == _applications.end()) + IceDB::ReadOnlyTxn txn(_env); + + if(!_applications.get(txn, update.descriptor.name, oldApp)) { throw ApplicationNotExistException(update.descriptor.name); } - oldApp = i->second; if(update.revision < 0) { @@ -722,9 +739,10 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, A startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1); } - catch(const DatabaseException& ex) + catch(const IceDB::LMDBException& ex) { - halt(_communicator, ex); + logError(_communicator, ex); + throw; } finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart, dbSerial); @@ -746,12 +764,12 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool n waitForUpdate(newDesc.name); - StringApplicationInfoDict::const_iterator i = _applications.find(newDesc.name); - if(i == _applications.end()) + IceDB::ReadOnlyTxn txn(_env); + + if(!_applications.get(txn, newDesc.name, oldApp)) { throw ApplicationNotExistException(newDesc.name); } - oldApp = i->second; previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor)); helper.reset(new ApplicationHelper(_communicator, newDesc, true)); @@ -763,9 +781,10 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool n startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1); } - catch(const DatabaseException& ex) + catch(const IceDB::LMDBException& ex) { - halt(_communicator, ex); + logError(_communicator, ex); + throw; } finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart); @@ -791,13 +810,12 @@ Database::instantiateServer(const string& application, waitForUpdate(application); - StringApplicationInfoDict::const_iterator i = _applications.find(application); - if(i == _applications.end()) + IceDB::ReadOnlyTxn txn(_env); + + if(!_applications.get(txn, application, oldApp)) { throw ApplicationNotExistException(application); - } - oldApp = i->second; previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor)); helper.reset(new ApplicationHelper(_communicator, previous->instantiateServer(node, instance), true)); @@ -809,9 +827,10 @@ Database::instantiateServer(const string& application, startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1); } - catch(const DatabaseException& ex) + catch(const IceDB::LMDBException& ex) { - halt(_communicator, ex); + logError(_communicator, ex); + throw; } finishApplicationUpdate(update, oldApp, *previous, *helper, session, true); @@ -833,12 +852,12 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon ApplicationInfo appInfo; - StringApplicationInfoDict::const_iterator i = _applications.find(name); - if(i == _applications.end()) + IceDB::ReadWriteTxn txn(_env); + + if(!_applications.get(txn, name, appInfo)) { throw ApplicationNotExistException(name); } - appInfo = i->second; bool init = false; try @@ -855,17 +874,21 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon throw; } } + dbSerial = removeApplication(name, txn, dbSerial); + + txn.commit(); - dbSerial = removeApplication(name, _connection, dbSerial); startUpdating(name, appInfo.uuid, appInfo.revision); for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); serial = _applicationObserverTopic->applicationRemoved(dbSerial, name); } - catch(const DatabaseException& ex) + catch(const IceDB::LMDBException& ex) { - halt(_communicator, ex); + logError(_communicator, ex); + throw; } + _applicationObserverTopic->waitForSyncedSubscribers(serial); if(_master) @@ -885,22 +908,21 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon ApplicationInfo Database::getApplicationInfo(const std::string& name) { - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringApplicationInfoDict applications(connection, applicationsDbName); - StringApplicationInfoDict::const_iterator i = applications.find(name); - if(i == applications.end()) + IceDB::ReadOnlyTxn txn(_env); + + ApplicationInfo info; + if(!_applications.get(txn, name, info)) { throw ApplicationNotExistException(name); } - return i->second; + return info; } Ice::StringSeq Database::getAllApplications(const string& expression) { - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringApplicationInfoDict applications(connection, applicationsDbName); - return getMatchingKeys<map<string, ApplicationInfo> >(toMap(applications), expression); + IceDB::ReadOnlyTxn txn(_env); + return getMatchingKeys<map<string, ApplicationInfo> >(toMap(txn, _applications), expression); } void @@ -989,44 +1011,42 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr info.replicaGroupId = replicaGroupId; bool updated = false; - for(;;) + try { - try + IceDB::ReadWriteTxn txn(_env); + + AdapterInfo oldInfo; + bool found = _adapters.get(txn, adapterId, oldInfo); + if(proxy) { - TransactionHolder txHolder(_connection); - StringAdapterInfoDict::iterator i = _adapters.find(adapterId); - if(proxy) + updated = found; + + if(replicaGroupId != oldInfo.replicaGroupId) { - if(i == _adapters.end()) - { - _adapters.put(StringAdapterInfoDict::value_type(adapterId, info)); - } - else - { - updated = true; - i.set(info); - } + _adaptersByGroupId.del(txn, oldInfo.replicaGroupId, adapterId); } - else - { - if(i == _adapters.end()) - { - return; - } - _adapters.erase(i); - } - dbSerial = updateSerial(_connection, adaptersDbName, dbSerial); - txHolder.commit(); - break; + addAdapter(txn, info); } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) + else { - halt(_communicator, ex); + if(!found) + { + return; + } + deleteAdapter(txn, oldInfo); } + dbSerial = updateSerial(txn, adaptersDbName, dbSerial); + + txn.commit(); + } + catch(const IceDB::KeyTooLongException&) + { + throw; + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; } if(_traceLevels->adapter > 0) @@ -1037,7 +1057,7 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr { out << " with replica group `" << replicaGroupId << "'"; } - out << " (serial = `" << dbSerial << "')"; + out << " (serial = `" << dbSerial << "')"; } if(proxy) @@ -1063,16 +1083,16 @@ Ice::ObjectPrx Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& encoding, const Ice::ConnectionPtr& con, const Ice::Context& ctx) { - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringAdapterInfoDict adapters(connection, adaptersDbName); - StringAdapterInfoDict::const_iterator i = adapters.find(id); - if(i != adapters.end()) + IceDB::ReadOnlyTxn txn(_env); + + AdapterInfo info; + if(_adapters.get(txn, id, info)) { - return i->second.proxy; + return info.proxy; } Ice::EndpointSeq endpoints; - vector<AdapterInfo> infos = findByReplicaGroupId(adapters, id); + vector<AdapterInfo> infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id); filterAdapterInfos("", id, _pluginFacade, con, ctx, infos); for(unsigned int i = 0; i < infos.size(); ++i) { @@ -1109,41 +1129,41 @@ Database::removeAdapter(const string& adapterId) AdapterInfoSeq infos; Ice::Long dbSerial = 0; - for(;;) + try { - try + IceDB::ReadWriteTxn txn(_env); + + AdapterInfo info; + if(_adapters.get(txn, adapterId, info)) { - TransactionHolder txHolder(_connection); - StringAdapterInfoDict::iterator i = _adapters.find(adapterId); - if(i != _adapters.end()) + deleteAdapter(txn, info); + } + else + { + infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, adapterId); + if(infos.empty()) { - _adapters.erase(i); + throw AdapterNotExistException(adapterId); } - else + for(AdapterInfoSeq::iterator p = infos.begin(); p != infos.end(); ++p) { - infos = findByReplicaGroupId(_adapters, adapterId); - if(infos.empty()) - { - throw AdapterNotExistException(adapterId); - } - for(AdapterInfoSeq::iterator p = infos.begin(); p != infos.end(); ++p) - { - p->replicaGroupId.clear(); - _adapters.put(StringAdapterInfoDict::value_type(p->id, *p)); - } + _adaptersByGroupId.del(txn, p->replicaGroupId, p->id); + p->replicaGroupId.clear(); + addAdapter(txn, *p); } - dbSerial = updateSerial(_connection, adaptersDbName); - txHolder.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_communicator, ex); } + dbSerial = updateSerial(txn, adaptersDbName); + + txn.commit(); + } + catch(const IceDB::KeyTooLongException&) + { + throw; + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; } if(_traceLevels->adapter > 0) @@ -1254,13 +1274,13 @@ Database::getAdapterInfo(const string& id) // Otherwise, we check the adapter endpoint table -- if there's an // entry the adapter is managed by the registry itself. // - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringAdapterInfoDict adapters(connection, adaptersDbName); + IceDB::ReadOnlyTxn txn(_env); + + AdapterInfo info; AdapterInfoSeq infos; - StringAdapterInfoDict::const_iterator i = adapters.find(id); - if(i != adapters.end()) + if(_adapters.get(txn, id, info)) { - infos.push_back(i->second); + infos.push_back(info); } else { @@ -1268,7 +1288,7 @@ Database::getAdapterInfo(const string& id) // If it's not a regular object adapter, perhaps it's a replica // group... // - infos = findByReplicaGroupId(adapters, id); + infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id); if(infos.empty()) { throw AdapterNotExistException(id); @@ -1310,13 +1330,13 @@ Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con // Otherwise, we check the adapter endpoint table -- if there's an // entry the adapter is managed by the registry itself. // - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringAdapterInfoDict adapters(connection, adaptersDbName); + IceDB::ReadOnlyTxn txn(_env); + + AdapterInfo info; AdapterInfoSeq infos; - StringAdapterInfoDict::const_iterator i = adapters.find(id); - if(i != adapters.end()) + if(_adapters.get(txn, id, info)) { - infos.push_back(i->second); + infos.push_back(info); } else { @@ -1324,7 +1344,7 @@ Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con // If it's not a regular object adapter, perhaps it's a replica // group... // - infos = findByReplicaGroupId(adapters, id); + infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id); if(infos.empty()) { throw AdapterNotExistException(id); @@ -1393,18 +1413,25 @@ Database::getAllAdapters(const string& expression) result.swap(ids); set<string> groups; - for(StringAdapterInfoDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + IceDB::ReadOnlyTxn txn(_env); + + string name; + AdapterInfo info; + AdapterMapROCursor cursor(_adapters, txn); + while(cursor.get(name, info, MDB_NEXT)) { - if(expression.empty() || IceUtilInternal::match(p->first, expression, true)) + if(expression.empty() || IceUtilInternal::match(name, expression, true)) { - result.push_back(p->first); + result.push_back(name); } - string replicaGroupId = p->second.replicaGroupId; + string replicaGroupId = info.replicaGroupId; if(!replicaGroupId.empty() && (expression.empty() || IceUtilInternal::match(replicaGroupId, expression, true))) { groups.insert(replicaGroupId); } } + cursor.close(); + // // COMPILERFIX: We're not using result.insert() here, this doesn't compile on Sun. // @@ -1432,29 +1459,23 @@ Database::addObject(const ObjectInfo& info) } Ice::Long dbSerial = 0; - for(;;) + try { - try - { - TransactionHolder txHolder(_connection); - IdentityObjectInfoDict::const_iterator i = _objects.find(id); - if(i != _objects.end()) - { - throw ObjectExistsException(id); - } - _objects.put(IdentityObjectInfoDict::value_type(id, info)); - dbSerial = updateSerial(_connection, objectsDbName); - txHolder.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) + IceDB::ReadWriteTxn txn(_env); + + if(_objects.find(txn, id)) { - halt(_communicator, ex); + throw ObjectExistsException(id); } + addObject(txn, info, false); + dbSerial = updateSerial(txn, objectsDbName); + + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; } serial = _objectObserverTopic->objectAdded(dbSerial, info); @@ -1484,33 +1505,26 @@ Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial) } bool update = false; - for(;;) + try { - try - { - TransactionHolder txHolder(_connection); - IdentityObjectInfoDict::iterator i = _objects.find(id); - if(i != _objects.end()) - { - update = true; - i.set(info); - } - else - { - _objects.put(IdentityObjectInfoDict::value_type(id, info)); - } - dbSerial = updateSerial(_connection, objectsDbName, dbSerial); - txHolder.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) + IceDB::ReadWriteTxn txn(_env); + + Ice::Identity k; + ObjectInfo v; + update = _objects.get(txn, k, v); + if(update) { - halt(_communicator, ex); + _objectsByType.del(txn, v.type, v.proxy->ice_getIdentity()); } + addObject(txn, info, false); + dbSerial = updateSerial(txn, objectsDbName, dbSerial); + + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; } if(update) @@ -1549,32 +1563,26 @@ Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial) throw ex; } - for(;;) + try { - try - { - TransactionHolder txHolder(_connection); - IdentityObjectInfoDict::iterator i = _objects.find(id); - if(i == _objects.end()) - { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; - } + IceDB::ReadWriteTxn txn(_env); - _objects.erase(i); - dbSerial = updateSerial(_connection, objectsDbName, dbSerial); - txHolder.commit(); - break; - } - catch(const DeadlockException&) + ObjectInfo info; + if(!_objects.get(txn, id, info)) { - continue; - } - catch(const DatabaseException& ex) - { - halt(_communicator, ex); + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; } + deleteObject(txn, info, false); + dbSerial = updateSerial(txn, objectsDbName, dbSerial); + + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; } serial = _objectObserverTopic->objectRemoved(dbSerial, id); @@ -1610,33 +1618,26 @@ Database::updateObject(const Ice::ObjectPrx& proxy) ObjectInfo info; Ice::Long dbSerial = 0; - for(;;) + try { - try - { - TransactionHolder txHolder(_connection); - IdentityObjectInfoDict::iterator i = _objects.find(id); - if(i == _objects.end()) - { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; - } - info = i->second; - info.proxy = proxy; - i.set(info); - dbSerial = updateSerial(_connection, objectsDbName); - txHolder.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) + IceDB::ReadWriteTxn txn(_env); + + if(!_objects.get(txn, id, info)) { - halt(_communicator, ex); + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; } + info.proxy = proxy; + addObject(txn, info, false); + dbSerial = updateSerial(txn, objectsDbName); + + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; } serial = _objectObserverTopic->objectUpdated(dbSerial, info); @@ -1653,27 +1654,27 @@ int Database::addOrUpdateRegistryWellKnownObjects(const ObjectInfoSeq& objects) { Lock sync(*this); - for(;;) + try { - try + IceDB::ReadWriteTxn txn(_env); + for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) { - TransactionHolder txHolder(_connection); - for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) + Ice::Identity id = p->proxy->ice_getIdentity(); + ObjectInfo info; + if(_objects.get(txn, id, info)) { - _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p)); + _objectsByType.del(txn, info.type, id); } - txHolder.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_communicator, ex); + addObject(txn, *p, false); } + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; } + return _objectObserverTopic->wellKnownObjectsAddedOrUpdated(objects); } @@ -1681,27 +1682,26 @@ int Database::removeRegistryWellKnownObjects(const ObjectInfoSeq& objects) { Lock sync(*this); - for(;;) + try { - try + IceDB::ReadWriteTxn txn(_env); + for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) { - TransactionHolder txHolder(_connection); - for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) + Ice::Identity id = p->proxy->ice_getIdentity(); + ObjectInfo info; + if(_objects.get(txn, id, info)) { - _objects.erase(p->proxy->ice_getIdentity()); + deleteObject(txn, info, false); } - txHolder.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_communicator, ex); } + txn.commit(); } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; + } + return _objectObserverTopic->wellKnownObjectsRemoved(objects); } @@ -1719,16 +1719,15 @@ Database::getObjectProxy(const Ice::Identity& id) { } - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, objectsDbName); - IdentityObjectInfoDict::const_iterator i = objects.find(id); - if(i == objects.end()) + IceDB::ReadOnlyTxn txn(_env); + ObjectInfo info; + if(!_objects.get(txn, id, info)) { ObjectNotRegisteredException ex; ex.id = id; throw ex; } - return i->second.proxy; + return info.proxy; } Ice::ObjectPrx @@ -1780,9 +1779,8 @@ Database::getObjectsByType(const string& type, const Ice::ConnectionPtr& con, co { Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type); - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, objectsDbName); - vector<ObjectInfo> infos = findByType(objects, type); + IceDB::ReadOnlyTxn txn(_env); + vector<ObjectInfo> infos = findByType(txn, _objects, _objectsByType, type); for(unsigned int i = 0; i < infos.size(); ++i) { proxies.push_back(infos[i].proxy); @@ -1814,14 +1812,13 @@ Database::getObjectInfo(const Ice::Identity& id) { } - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, objectsDbName); - IdentityObjectInfoDict::const_iterator i = objects.find(id); - if(i == objects.end()) + IceDB::ReadOnlyTxn txn(_env); + ObjectInfo info; + if(!_objects.get(txn, id, info)) { throw ObjectNotRegisteredException(id); } - return i->second; + return info; } ObjectInfoSeq @@ -1829,13 +1826,16 @@ Database::getAllObjectInfos(const string& expression) { ObjectInfoSeq infos = _objectCache.getAll(expression); - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, objectsDbName); - for(IdentityObjectInfoDict::const_iterator p = objects.begin(); p != objects.end(); ++p) + IceDB::ReadOnlyTxn txn(_env); + + Ice::Identity id; + ObjectInfo info; + ObjectsMapROCursor cursor(_objects, txn); + while(cursor.get(id, info, MDB_NEXT)) { - if(expression.empty() || IceUtilInternal::match(_communicator->identityToString(p->first), expression, true)) + if(expression.empty() || IceUtilInternal::match(_communicator->identityToString(id), expression, true)) { - infos.push_back(p->second); + infos.push_back(info); } } return infos; @@ -1846,9 +1846,8 @@ Database::getObjectInfosByType(const string& type) { ObjectInfoSeq infos = _objectCache.getAllByType(type); - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, objectsDbName); - ObjectInfoSeq dbInfos = findByType(objects, type); + IceDB::ReadOnlyTxn txn(_env); + ObjectInfoSeq dbInfos = findByType(txn, _objects, _objectsByType, type); for(unsigned int i = 0; i < dbInfos.size(); ++i) { infos.push_back(dbInfos[i]); @@ -1862,31 +1861,27 @@ Database::addInternalObject(const ObjectInfo& info, bool replace) Lock sync(*this); const Ice::Identity id = info.proxy->ice_getIdentity(); - for(;;) + try { - try + IceDB::ReadWriteTxn txn(_env); + + ObjectInfo oldInfo; + if(_internalObjects.get(txn, id, oldInfo)) { - TransactionHolder txHolder(_connection); if(!replace) { - IdentityObjectInfoDict::const_iterator i = _internalObjects.find(id); - if(i != _internalObjects.end()) - { - throw ObjectExistsException(id); - } + throw ObjectExistsException(id); } - _internalObjects.put(IdentityObjectInfoDict::value_type(id, info)); - txHolder.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_communicator, ex); + _internalObjectsByType.del(txn, oldInfo.type, id); } + addObject(txn, info, true); + + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; } } @@ -1895,30 +1890,25 @@ Database::removeInternalObject(const Ice::Identity& id) { Lock sync(*this); - for(;;) + try { - try - { - TransactionHolder txHolder(_connection); - IdentityObjectInfoDict::iterator i = _internalObjects.find(id); - if(i == _internalObjects.end()) - { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; - } - _internalObjects.erase(i); - txHolder.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) + IceDB::ReadWriteTxn txn(_env); + + ObjectInfo info; + if(!_internalObjects.get(txn, id, info)) { - halt(_communicator, ex); + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; } + deleteObject(txn, info, true); + + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; } } @@ -1927,9 +1917,8 @@ Database::getInternalObjectsByType(const string& type) { Ice::ObjectProxySeq proxies; - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict internalObjects(connection, internalObjectsDbName); - vector<ObjectInfo> infos = findByType(internalObjects, type); + IceDB::ReadOnlyTxn txn(_env); + vector<ObjectInfo> infos = findByType(txn, _internalObjects, _internalObjectsByType, type); for(unsigned int i = 0; i < infos.size(); ++i) { proxies.push_back(infos[i].proxy); @@ -1938,7 +1927,7 @@ Database::getInternalObjectsByType(const string& type) } void -Database::checkForAddition(const ApplicationHelper& app, const ConnectionPtr& connection) +Database::checkForAddition(const ApplicationHelper& app, const IceDB::ReadWriteTxn& txn) { set<string> serverIds; set<string> adapterIds; @@ -1949,18 +1938,16 @@ Database::checkForAddition(const ApplicationHelper& app, const ConnectionPtr& co for_each(serverIds.begin(), serverIds.end(), objFunc(*this, &Database::checkServerForAddition)); if(!adapterIds.empty()) { - StringAdapterInfoDict adapters(connection, adaptersDbName); for(set<string>::const_iterator p = adapterIds.begin(); p != adapterIds.end(); ++p) { - checkAdapterForAddition(*p, adapters); + checkAdapterForAddition(*p, txn); } } if(!objectIds.empty()) { - IdentityObjectInfoDict objects(connection, objectsDbName); for(set<Ice::Identity>::const_iterator p = objectIds.begin(); p != objectIds.end(); ++p) { - checkObjectForAddition(*p, objects); + checkObjectForAddition(*p, txn); } } @@ -1973,7 +1960,7 @@ Database::checkForAddition(const ApplicationHelper& app, const ConnectionPtr& co void Database::checkForUpdate(const ApplicationHelper& origApp, const ApplicationHelper& newApp, - const ConnectionPtr& connection) + const IceDB::ReadWriteTxn& txn) { set<string> oldSvrs, newSvrs; set<string> oldAdpts, newAdpts; @@ -1990,10 +1977,9 @@ Database::checkForUpdate(const ApplicationHelper& origApp, set_difference(newAdpts.begin(), newAdpts.end(), oldAdpts.begin(), oldAdpts.end(), back_inserter(addedAdpts)); if(!addedAdpts.empty()) { - StringAdapterInfoDict adapters(connection, adaptersDbName); for(Ice::StringSeq::const_iterator p = addedAdpts.begin(); p != addedAdpts.end(); ++p) { - checkAdapterForAddition(*p, adapters); + checkAdapterForAddition(*p, txn); } } @@ -2001,10 +1987,9 @@ Database::checkForUpdate(const ApplicationHelper& origApp, set_difference(newObjs.begin(), newObjs.end(), oldObjs.begin(), oldObjs.end(), back_inserter(addedObjs)); if(!addedObjs.empty()) { - IdentityObjectInfoDict objects(connection, objectsDbName); for(vector<Ice::Identity>::const_iterator p = addedObjs.begin(); p != addedObjs.end(); ++p) { - checkObjectForAddition(*p, objects); + checkObjectForAddition(*p, txn); } } @@ -2054,7 +2039,7 @@ Database::checkServerForAddition(const string& id) } void -Database::checkAdapterForAddition(const string& id, const StringAdapterInfoDict& adapters) +Database::checkAdapterForAddition(const string& id, const IceDB::ReadWriteTxn& txn) { bool found = false; if(_adapterCache.has(id)) @@ -2063,14 +2048,13 @@ Database::checkAdapterForAddition(const string& id, const StringAdapterInfoDict& } else { - StringAdapterInfoDict::const_iterator i = adapters.find(id); - if(i != adapters.end()) + if(_adapters.find(txn, id)) { found = true; } else { - if(!findByReplicaGroupId(adapters, id).empty()) + if(!findByReplicaGroupId(txn, _adapters,_adaptersByGroupId, id).empty()) { found = true; } @@ -2086,7 +2070,8 @@ Database::checkAdapterForAddition(const string& id, const StringAdapterInfoDict& } void -Database::checkObjectForAddition(const Ice::Identity& objectId, const IdentityObjectInfoDict& objects) +Database::checkObjectForAddition(const Ice::Identity& objectId, + const IceDB::ReadWriteTxn& txn) { bool found = false; if(_objectCache.has(objectId) || _allocatableObjectCache.has(objectId)) @@ -2095,8 +2080,7 @@ Database::checkObjectForAddition(const Ice::Identity& objectId, const IdentityOb } else { - IdentityObjectInfoDict::const_iterator i = objects.find(objectId); - if(i != objects.end()) + if(_objects.find(txn, objectId)) { found = true; } @@ -2342,57 +2326,19 @@ Database::reload(const ApplicationHelper& oldApp, } Ice::Long -Database::saveApplication(const ApplicationInfo& info, const ConnectionPtr& connection, Ice::Long dbSerial) +Database::saveApplication(const ApplicationInfo& info, const IceDB::ReadWriteTxn& txn, Ice::Long dbSerial) { assert(dbSerial != 0 || _master); - for(;;) - { - try - { - StringApplicationInfoDict applications(connection, applicationsDbName); - TransactionHolder txHolder(connection); - applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info)); - dbSerial = updateSerial(connection, applicationsDbName, dbSerial); - txHolder.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_communicator, ex); - } - } - return dbSerial; + _applications.put(txn, info.descriptor.name, info); + return updateSerial(txn, applicationsDbName, dbSerial); } Ice::Long -Database::removeApplication(const string& name, const ConnectionPtr& connection, Ice::Long dbSerial) +Database::removeApplication(const string& name, const IceDB::ReadWriteTxn& txn, Ice::Long dbSerial) { assert(dbSerial != 0 || _master); - for(;;) - { - try - { - StringApplicationInfoDict applications(connection, applicationsDbName); - TransactionHolder txHolder(connection); - applications.erase(name); - dbSerial = updateSerial(connection, applicationsDbName, dbSerial); - txHolder.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_communicator, ex); - } - } - return dbSerial; + _applications.del(txn, name); + return updateSerial(txn, applicationsDbName, dbSerial); } void @@ -2598,7 +2544,6 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update, Ice::Long dbSerial) { const ApplicationDescriptor& newDesc = helper.getDefinition(); - ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); ServerEntrySeq entries; int serial = 0; @@ -2610,7 +2555,10 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update, } Lock sync(*this); - checkForUpdate(previous, helper, connection); + + IceDB::ReadWriteTxn txn(_env); + + checkForUpdate(previous, helper, txn); reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1, noRestart); for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); @@ -2620,7 +2568,9 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update, info.updateUser = update.updateUser; info.revision = update.revision; info.descriptor = newDesc; - dbSerial = saveApplication(info, connection, dbSerial); + dbSerial = saveApplication(info, txn, dbSerial); + + txn.commit(); serial = _applicationObserverTopic->applicationUpdated(dbSerial, update); } @@ -2629,6 +2579,11 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update, finishUpdating(update.descriptor.name); throw; } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + throw; + } _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for replicas to be updated. @@ -2670,7 +2625,18 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update, ApplicationInfo info = oldApp; info.revision = update.revision + 1; - dbSerial = saveApplication(info, connection); + + try + { + IceDB::ReadWriteTxn txn(_env); + dbSerial = saveApplication(info, txn); + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_communicator, ex); + } + reload(previous, helper, entries, info.uuid, info.revision, noRestart); newUpdate.updateTime = IceUtil::Time::now().toMilliSeconds(); @@ -2729,3 +2695,97 @@ Database::finishUpdating(const string& name) _updating.erase(p); notifyAll(); } + +Ice::Long +Database::getSerial(const IceDB::Txn& txn, const string& dbName) +{ + Ice::Long serial = 1; + _serials.get(txn, dbName, serial); + return serial; +} + +Ice::Long +Database::updateSerial(const IceDB::ReadWriteTxn& txn, const string& dbName, Ice::Long serial) +{ + if(serial == -1) // The master we are talking to doesn't support serials (old IceGrid versions) + { + return -1; + } + + // + // If a serial number is set, just update the serial number from the database, + // otherwise if the serial is 0, we increment the serial from the database. + // + if(serial > 0) + { + _serials.put(txn, dbName, serial); + return serial; + } + else + { + Ice::Long dbSerial = getSerial(txn, dbName) + 1; + _serials.put(txn, dbName, dbSerial); + return dbSerial; + } +} + +void +Database::addAdapter(const IceDB::ReadWriteTxn& txn, const AdapterInfo& info) +{ + _adapters.put(txn, info.id, info); + _adaptersByGroupId.put(txn, info.replicaGroupId, info.id); +} + +void +Database::deleteAdapter(const IceDB::ReadWriteTxn& txn, const AdapterInfo& info) +{ + + _adapters.del(txn, info.id); + _adaptersByGroupId.del(txn, info.replicaGroupId, info.id); +} + +void +Database::addObject(const IceDB::ReadWriteTxn& txn, const ObjectInfo& info, bool internal) +{ + if(internal) + { + _internalObjects.put(txn, info.proxy->ice_getIdentity(), info); + _internalObjectsByType.put(txn, info.type, info.proxy->ice_getIdentity()); + } + else + { + try + { + _objects.put(txn, info.proxy->ice_getIdentity(), info); + } + catch(const IceDB::KeyTooLongException& ex) + { + throw DeploymentException("object identity `" + + _communicator->identityToString(info.proxy->ice_getIdentity()) + + "' is too long: " + ex.what()); + } + try + { + _objectsByType.put(txn, info.type, info.proxy->ice_getIdentity()); + } + catch(const IceDB::KeyTooLongException& ex) + { + throw DeploymentException("object type `" + info.type + "' is too long: " + ex.what()); + } + } +} + +void +Database::deleteObject(const IceDB::ReadWriteTxn& txn, const ObjectInfo& info, bool internal) +{ + if(internal) + { + _internalObjects.del(txn, info.proxy->ice_getIdentity()); + _internalObjectsByType.del(txn, info.type, info.proxy->ice_getIdentity()); + } + else + { + _objects.del(txn, info.proxy->ice_getIdentity()); + _objectsByType.del(txn, info.type, info.proxy->ice_getIdentity()); + } +} |