summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp1200
1 files changed, 630 insertions, 570 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 8782672ac31..973f39272b3 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -22,7 +22,6 @@
#include <IceGrid/Session.h>
#include <IceGrid/Topics.h>
#include <IceGrid/IceGrid.h>
-#include <IceGrid/SerialsDict.h>
#include <algorithm>
#include <functional>
@@ -30,15 +29,23 @@
using namespace std;
using namespace IceGrid;
-using namespace Freeze;
+
+typedef IceDB::ReadWriteCursor<string, ApplicationInfo, IceDB::IceContext, Ice::OutputStream> ApplicationMapRWCursor;
+typedef IceDB::ReadOnlyCursor<string, AdapterInfo, IceDB::IceContext, Ice::OutputStream> AdapterMapROCursor;
+typedef IceDB::Cursor<string, string, IceDB::IceContext, Ice::OutputStream> AdaptersByGroupMapCursor;
+typedef IceDB::ReadOnlyCursor<string, Ice::Identity, IceDB::IceContext, Ice::OutputStream> ObjectsByTypeMapROCursor;
+typedef IceDB::ReadOnlyCursor<Ice::Identity, ObjectInfo, IceDB::IceContext, Ice::OutputStream> ObjectsMapROCursor;
namespace
{
const string applicationsDbName = "applications";
const string adaptersDbName = "adapters";
+const string adaptersByReplicaGroupIdDbName = "adaptersByReplicaGroupId";
const string objectsDbName = "objects";
+const string objectsByTypeDbName = "objectsByType";
const string internalObjectsDbName = "internal-objects";
+const string internalObjectsByTypeDbName = "internal-objectsByType";
const string serialsDbName = "serials";
struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::ObjectPrx, float>&, bool>
@@ -49,42 +56,41 @@ struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::Ob
}
};
-template<typename K, typename V, typename C, typename Comp> vector<V>
-toVector(const Map<K, V, C, Comp>& m)
+template<typename K, typename V, typename C, typename H> vector<V>
+toVector(const IceDB::ReadOnlyTxn& txn, const IceDB::Dbi<K, V, C, H>& m)
{
vector<V> v;
- for(typename Map<K, V, C, Comp>::const_iterator p = m.begin(); p != m.end(); ++p)
+ IceDB::ReadOnlyCursor<K, V, C, H> cursor(m, txn);
+ K key;
+ V value;
+ while(cursor.get(key, value, MDB_NEXT))
{
- v.push_back(p->second);
+ v.push_back(value);
}
return v;
}
-template<typename K, typename V, typename C, typename Comp> map<K, V>
-toMap(const Map<K, V, C, Comp>& d)
+template<typename K, typename V, typename C, typename H> map<K, V>
+toMap(const IceDB::Txn& txn, const IceDB::Dbi<K, V, C, H>& d)
{
std::map<K, V> m;
- for(typename Map<K, V, C, Comp>::const_iterator p = d.begin(); p != d.end(); ++p)
+ IceDB::Cursor<K, V, C, H> cursor(d, txn);
+ K key;
+ V value;
+ while(cursor.get(key, value, MDB_NEXT))
{
-#ifdef __SUNPRO_CC
- std::map<Key, Value>::value_type v(p->first, p->second);
+ typename std::map<K, V>::value_type v(key, value);
m.insert(v);
-#else
- m.insert(*p);
-#endif
}
+ cursor.close();
return m;
}
void
-halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
+logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
{
- {
- Ice::Error error(com->getLogger());
- error << "fatal exception: " << ex << "\n*** Aborting application ***";
- }
-
- abort();
+ Ice::Error error(com->getLogger());
+ error << "LMDB error: " << ex;
}
void
@@ -134,69 +140,52 @@ filterAdapterInfos(const string& filter,
infos.swap(filteredAdpts);
}
-Ice::Long
-getSerial(const Freeze::ConnectionPtr& connection, const string& dbName)
-{
- SerialsDict dict(connection, serialsDbName);
-
- //
- // If a serial number is provided, juste update the serial number from the database,
- // otherwise if the serial is 0, we increment the serial from the database.
- //
- SerialsDict::iterator p = dict.find(dbName);
- if(p == dict.end())
- {
- dict.insert(SerialsDict::value_type(dbName, 1));
- return 1;
- }
- return p->second;
-}
-
-Ice::Long
-updateSerial(const Freeze::ConnectionPtr& connection, const string& dbName, Ice::Long serial = 0)
-{
- if(serial == -1) // Master doesn't support serials.
- {
- return -1;
- }
-
- SerialsDict dict(connection, serialsDbName);
-
- //
- // If a serial number is provided, juste update the serial number from the database,
- // otherwise if the serial is 0, we increment the serial from the database.
- //
- SerialsDict::iterator p = dict.find(dbName);
- if(p == dict.end())
- {
- dict.insert(SerialsDict::value_type(dbName, serial == 0 ? 1 : serial));
- return 1;
- }
- else
- {
- p.set(serial == 0 ? p->second + 1 : serial);
- return p->second;
- }
-}
-
vector<AdapterInfo>
-findByReplicaGroupId(const StringAdapterInfoDict& dict, const string& name)
+findByReplicaGroupId(const IceDB::Txn& txn,
+ const StringAdapterInfoMap& adapters,
+ const StringStringMap& adaptersByGroupId,
+ const string& name)
{
vector<AdapterInfo> result;
- for(StringAdapterInfoDict::const_iterator p = dict.findByReplicaGroupId(name, true); p != dict.end(); ++p)
+ AdaptersByGroupMapCursor cursor(adaptersByGroupId, txn);
+ string id;
+ if(cursor.find(name, id))
{
- result.push_back(p->second);
+ AdapterInfo info;
+ adapters.get(txn, id, info);
+ result.push_back(info);
+
+ string n;
+ while(cursor.get(n, id, MDB_NEXT) && n == name)
+ {
+ adapters.get(txn, id, info);
+ result.push_back(info);
+ }
}
return result;
}
vector<ObjectInfo>
-findByType(const IdentityObjectInfoDict& dict, const string& type)
+findByType(const IceDB::ReadOnlyTxn& txn,
+ const IdentityObjectInfoMap& objects,
+ const StringIdentityMap& objectsByType,
+ const string& type)
{
vector<ObjectInfo> result;
- for(IdentityObjectInfoDict::const_iterator p = dict.findByType(type); p != dict.end(); ++p)
+ ObjectsByTypeMapROCursor cursor(objectsByType, txn);
+ Ice::Identity id;
+ if(cursor.find(type, id))
{
- result.push_back(p->second);
+ ObjectInfo info;
+ objects.get(txn, id, info);
+ result.push_back(info);
+
+ string t;
+ while(cursor.get(t, id, MDB_NEXT) && t == type)
+ {
+ objects.get(txn, id, info);
+ result.push_back(info);
+ }
}
return result;
}
@@ -208,8 +197,6 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
const string& instanceName,
const TraceLevelsPtr& traceLevels,
const RegistryInfo& info,
- const Freeze::ConnectionPtr& connection,
- const string& envName,
bool readonly) :
_communicator(registryAdapter->getCommunicator()),
_internalAdapter(registryAdapter),
@@ -224,27 +211,47 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_objectCache(_communicator),
_allocatableObjectCache(_communicator),
_serverCache(_communicator, _instanceName, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache),
- _connection(connection),
- _envName(envName),
- _applications(_connection, applicationsDbName),
- _adapters(_connection, adaptersDbName),
- _objects(_connection, objectsDbName),
- _internalObjects(_connection, internalObjectsDbName),
+ _dbLock(_communicator->getProperties()->getProperty("IceGrid.Registry.LMDB.Path") + "/icedb.lock"),
+ _env(_communicator->getProperties()->getProperty("IceGrid.Registry.LMDB.Path"), 8,
+ IceDB::getMapSize(_communicator->getProperties()->getPropertyAsInt("IceGrid.Registry.LMDB.MapSize"))),
_pluginFacade(RegistryPluginFacadeIPtr::dynamicCast(getRegistryPluginFacade())),
_lock(0)
{
+ IceDB::ReadWriteTxn txn(_env);
+
+ IceDB::IceContext context;
+ context.communicator = _communicator;
+ context.encoding.major = 1;
+ context.encoding.minor = 1;
+
+ _applications = StringApplicationInfoMap(txn, applicationsDbName, context, MDB_CREATE);
+
+ _adapters = StringAdapterInfoMap(txn, adaptersDbName, context, MDB_CREATE);
+ _adaptersByGroupId = StringStringMap(txn, adaptersByReplicaGroupIdDbName, context, MDB_CREATE|MDB_DUPSORT);
+
+ _objects = IdentityObjectInfoMap(txn, objectsDbName, context, MDB_CREATE);
+ _objectsByType = StringIdentityMap(txn, objectsByTypeDbName, context, MDB_CREATE|MDB_DUPSORT);
+
+ _internalObjects = IdentityObjectInfoMap(txn, internalObjectsDbName, context, MDB_CREATE);
+ _internalObjectsByType = StringIdentityMap(txn, internalObjectsByTypeDbName, context, MDB_CREATE|MDB_DUPSORT);
+
+ _serials = StringLongMap(txn, serialsDbName, context, MDB_CREATE);
+
ServerEntrySeq entries;
- for(StringApplicationInfoDict::iterator p = _applications.begin(); p != _applications.end(); ++p)
+ string k;
+ ApplicationInfo v;
+ ApplicationMapRWCursor cursor(_applications, txn);
+ while(cursor.get(k, v, MDB_NEXT))
{
try
{
- load(ApplicationHelper(_communicator, p->second.descriptor), entries, p->second.uuid, p->second.revision);
+ load(ApplicationHelper(_communicator, v.descriptor), entries, v.uuid, v.revision);
}
catch(const DeploymentException& ex)
{
Ice::Error err(_traceLevels->logger);
- err << "invalid application `" << p->first << "':\n" << ex.reason;
+ err << "invalid application `" << k << "':\n" << ex.reason;
}
}
@@ -257,9 +264,30 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter);
_registryObserverTopic = new RegistryObserverTopic(_topicManager);
- _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, toMap(_applications), getSerial(_connection, applicationsDbName));
- _adapterObserverTopic = new AdapterObserverTopic(_topicManager, toMap(_adapters), getSerial(_connection, adaptersDbName));
- _objectObserverTopic = new ObjectObserverTopic(_topicManager, toMap(_objects), getSerial(_connection, objectsDbName));
+
+ // Set all serials to 1 if they have not yet been set.
+ Ice::Long serial;
+ if(!_serials.get(txn, applicationsDbName, serial))
+ {
+ _serials.put(txn, applicationsDbName, 1);
+ }
+ if(!_serials.get(txn, adaptersDbName, serial))
+ {
+ _serials.put(txn, adaptersDbName, 1);
+ }
+ if(!_serials.get(txn, objectsDbName, serial))
+ {
+ _serials.put(txn, objectsDbName, 1);
+ }
+
+ _applicationObserverTopic =
+ new ApplicationObserverTopic(_topicManager, toMap(txn, _applications), getSerial(txn, applicationsDbName));
+ _adapterObserverTopic =
+ new AdapterObserverTopic(_topicManager, toMap(txn, _adapters), getSerial(txn, adaptersDbName));
+ _objectObserverTopic =
+ new ObjectObserverTopic(_topicManager, toMap(txn, _objects), getSerial(txn, objectsDbName));
+
+ txn.commit();
_registryObserverTopic->registryUp(info);
@@ -353,29 +381,24 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long
Lock sync(*this);
map<string, ApplicationInfo> oldApplications;
- for(;;)
+ try
{
- try
- {
- TransactionHolder txHolder(_connection);
- oldApplications = toMap(_applications);
- _applications.clear();
- for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p)
- {
- _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p));
- }
- dbSerial = updateSerial(_connection, applicationsDbName, dbSerial);
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
+ IceDB::ReadWriteTxn txn(_env);
+
+ oldApplications = toMap(txn, _applications);
+ _applications.clear(txn);
+ for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p)
{
- halt(_communicator, ex);
+ _applications.put(txn, p->descriptor.name, *p);
}
+ dbSerial = updateSerial(txn, applicationsDbName, dbSerial);
+
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
}
ServerEntrySeq entries;
@@ -433,28 +456,28 @@ Database::syncAdapters(const AdapterInfoSeq& adapters, Ice::Long dbSerial)
int serial = 0;
{
Lock sync(*this);
- for(;;)
+ try
{
- try
- {
- TransactionHolder txHolder(_connection);
- _adapters.clear();
- for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r)
- {
- _adapters.put(StringAdapterInfoDict::value_type(r->id, *r));
- }
- dbSerial = updateSerial(_connection, adaptersDbName, dbSerial);
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
+ IceDB::ReadWriteTxn txn(_env);
+
+ _adapters.clear(txn);
+ _adaptersByGroupId.clear(txn);
+ for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r)
{
- halt(_communicator, ex);
+ addAdapter(txn, *r);
}
+ dbSerial = updateSerial(txn, adaptersDbName, dbSerial);
+
+ txn.commit();
+ }
+ catch(const IceDB::KeyTooLongException&)
+ {
+ throw;
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
}
if(_traceLevels->adapter > 0)
@@ -475,28 +498,24 @@ Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial)
int serial = 0;
{
Lock sync(*this);
- for(;;)
+ try
{
- try
- {
- TransactionHolder txHolder(_connection);
- _objects.clear();
- for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
- {
- _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q));
- }
- dbSerial = updateSerial(_connection, objectsDbName, dbSerial);
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
+ IceDB::ReadWriteTxn txn(_env);
+
+ _objects.clear(txn);
+ _objectsByType.clear(txn);
+ for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
{
- halt(_communicator, ex);
+ addObject(txn, *q, false);
}
+ dbSerial = updateSerial(txn, objectsDbName, dbSerial);
+
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
}
if(_traceLevels->object > 0)
@@ -511,82 +530,67 @@ Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial)
}
ApplicationInfoSeq
-Database::getApplications(Ice::Long& serial) const
+Database::getApplications(Ice::Long& serial)
{
- for(;;)
+ try
{
- try
- {
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- TransactionHolder txHolder(connection);
- StringApplicationInfoDict applications(connection, applicationsDbName);
- serial = getSerial(connection, applicationsDbName);
- return toVector(applications);
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_communicator, ex);
- }
+ IceDB::ReadOnlyTxn txn(_env);
+
+ serial = getSerial(txn, applicationsDbName);
+ return toVector(txn, _applications);
}
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
+ }
+ assert(false);
+ return ApplicationInfoSeq();
}
AdapterInfoSeq
-Database::getAdapters(Ice::Long& serial) const
+Database::getAdapters(Ice::Long& serial)
{
- for(;;)
+ try
{
- try
- {
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- TransactionHolder txHolder(connection);
- StringAdapterInfoDict adapters(connection, adaptersDbName);
- serial = getSerial(connection, adaptersDbName);
- return toVector(adapters);
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_communicator, ex);
- }
+ IceDB::ReadOnlyTxn txn(_env);
+
+ serial = getSerial(txn, adaptersDbName);
+ return toVector(txn, _adapters);
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
}
+ assert(false);
+ return AdapterInfoSeq();
}
ObjectInfoSeq
-Database::getObjects(Ice::Long& serial) const
+Database::getObjects(Ice::Long& serial)
{
- for(;;)
+ try
{
- try
- {
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- TransactionHolder txHolder(connection);
- IdentityObjectInfoDict objects(connection, objectsDbName);
- serial = getSerial(connection, objectsDbName);
- return toVector(objects);
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_communicator, ex);
- }
+ IceDB::ReadOnlyTxn txn(_env);
+
+ serial = getSerial(txn, objectsDbName);
+ return toVector(txn, _objects);
}
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
+ }
+ assert(false);
+ return ObjectInfoSeq();
}
StringLongDict
Database::getSerials() const
{
- SerialsDict serials(Freeze::createConnection(_communicator, _envName), serialsDbName);
- return toMap(serials);
+ IceDB::ReadOnlyTxn txn(_env);
+ return toMap(txn, _serials);
}
void
@@ -603,24 +607,33 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic
waitForUpdate(info.descriptor.name);
- StringApplicationInfoDict::const_iterator i = _applications.find(info.descriptor.name);
- if(i != _applications.end())
+ IceDB::ReadWriteTxn txn(_env);
+
+ if(_applications.find(txn, info.descriptor.name))
{
throw DeploymentException("application `" + info.descriptor.name + "' already exists");
}
ApplicationHelper helper(_communicator, info.descriptor, true);
- checkForAddition(helper, _connection);
- dbSerial = saveApplication(info, _connection, dbSerial);
+ checkForAddition(helper, txn);
+ dbSerial = saveApplication(info, txn, dbSerial);
+
+ txn.commit();
+
load(helper, entries, info.uuid, info.revision);
startUpdating(info.descriptor.name, info.uuid, info.revision);
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
serial = _applicationObserverTopic->applicationAdded(dbSerial, info);
}
- catch(const DatabaseException& ex)
+ catch(const IceDB::KeyTooLongException& ex)
+ {
+ throw DeploymentException("application name `" + info.descriptor.name + "' is too long: " + ex.what());
+ }
+ catch(const IceDB::LMDBException& ex)
{
- halt(_communicator, ex);
+ logError(_communicator, ex);
+ throw;
}
_applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for replicas to be updated.
@@ -659,7 +672,10 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic
Lock sync(*this);
entries.clear();
unload(ApplicationHelper(_communicator, info.descriptor), entries);
- dbSerial = removeApplication(info.descriptor.name, _connection);
+
+ IceDB::ReadWriteTxn txn(_env);
+ dbSerial = removeApplication(info.descriptor.name, txn);
+ txn.commit();
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
serial = _applicationObserverTopic->applicationRemoved(dbSerial, info.descriptor.name);
@@ -669,10 +685,11 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic
Ice::Error err(_traceLevels->logger);
err << "failed to rollback previous application `" << info.descriptor.name << "':\n" << ex.reason;
}
- catch(const DatabaseException& ex)
+ catch(const IceDB::LMDBException& ex)
{
- halt(_communicator, ex);
+ logError(_communicator, ex);
}
+
_applicationObserverTopic->waitForSyncedSubscribers(serial);
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
finishUpdating(info.descriptor.name);
@@ -705,12 +722,12 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, A
waitForUpdate(update.descriptor.name);
- StringApplicationInfoDict::const_iterator i = _applications.find(update.descriptor.name);
- if(i == _applications.end())
+ IceDB::ReadOnlyTxn txn(_env);
+
+ if(!_applications.get(txn, update.descriptor.name, oldApp))
{
throw ApplicationNotExistException(update.descriptor.name);
}
- oldApp = i->second;
if(update.revision < 0)
{
@@ -722,9 +739,10 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, A
startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
}
- catch(const DatabaseException& ex)
+ catch(const IceDB::LMDBException& ex)
{
- halt(_communicator, ex);
+ logError(_communicator, ex);
+ throw;
}
finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart, dbSerial);
@@ -746,12 +764,12 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool n
waitForUpdate(newDesc.name);
- StringApplicationInfoDict::const_iterator i = _applications.find(newDesc.name);
- if(i == _applications.end())
+ IceDB::ReadOnlyTxn txn(_env);
+
+ if(!_applications.get(txn, newDesc.name, oldApp))
{
throw ApplicationNotExistException(newDesc.name);
}
- oldApp = i->second;
previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
helper.reset(new ApplicationHelper(_communicator, newDesc, true));
@@ -763,9 +781,10 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool n
startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
}
- catch(const DatabaseException& ex)
+ catch(const IceDB::LMDBException& ex)
{
- halt(_communicator, ex);
+ logError(_communicator, ex);
+ throw;
}
finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart);
@@ -791,13 +810,12 @@ Database::instantiateServer(const string& application,
waitForUpdate(application);
- StringApplicationInfoDict::const_iterator i = _applications.find(application);
- if(i == _applications.end())
+ IceDB::ReadOnlyTxn txn(_env);
+
+ if(!_applications.get(txn, application, oldApp))
{
throw ApplicationNotExistException(application);
-
}
- oldApp = i->second;
previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
helper.reset(new ApplicationHelper(_communicator, previous->instantiateServer(node, instance), true));
@@ -809,9 +827,10 @@ Database::instantiateServer(const string& application,
startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
}
- catch(const DatabaseException& ex)
+ catch(const IceDB::LMDBException& ex)
{
- halt(_communicator, ex);
+ logError(_communicator, ex);
+ throw;
}
finishApplicationUpdate(update, oldApp, *previous, *helper, session, true);
@@ -833,12 +852,12 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon
ApplicationInfo appInfo;
- StringApplicationInfoDict::const_iterator i = _applications.find(name);
- if(i == _applications.end())
+ IceDB::ReadWriteTxn txn(_env);
+
+ if(!_applications.get(txn, name, appInfo))
{
throw ApplicationNotExistException(name);
}
- appInfo = i->second;
bool init = false;
try
@@ -855,17 +874,21 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon
throw;
}
}
+ dbSerial = removeApplication(name, txn, dbSerial);
+
+ txn.commit();
- dbSerial = removeApplication(name, _connection, dbSerial);
startUpdating(name, appInfo.uuid, appInfo.revision);
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
serial = _applicationObserverTopic->applicationRemoved(dbSerial, name);
}
- catch(const DatabaseException& ex)
+ catch(const IceDB::LMDBException& ex)
{
- halt(_communicator, ex);
+ logError(_communicator, ex);
+ throw;
}
+
_applicationObserverTopic->waitForSyncedSubscribers(serial);
if(_master)
@@ -885,22 +908,21 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon
ApplicationInfo
Database::getApplicationInfo(const std::string& name)
{
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringApplicationInfoDict applications(connection, applicationsDbName);
- StringApplicationInfoDict::const_iterator i = applications.find(name);
- if(i == applications.end())
+ IceDB::ReadOnlyTxn txn(_env);
+
+ ApplicationInfo info;
+ if(!_applications.get(txn, name, info))
{
throw ApplicationNotExistException(name);
}
- return i->second;
+ return info;
}
Ice::StringSeq
Database::getAllApplications(const string& expression)
{
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringApplicationInfoDict applications(connection, applicationsDbName);
- return getMatchingKeys<map<string, ApplicationInfo> >(toMap(applications), expression);
+ IceDB::ReadOnlyTxn txn(_env);
+ return getMatchingKeys<map<string, ApplicationInfo> >(toMap(txn, _applications), expression);
}
void
@@ -989,44 +1011,42 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
info.replicaGroupId = replicaGroupId;
bool updated = false;
- for(;;)
+ try
{
- try
+ IceDB::ReadWriteTxn txn(_env);
+
+ AdapterInfo oldInfo;
+ bool found = _adapters.get(txn, adapterId, oldInfo);
+ if(proxy)
{
- TransactionHolder txHolder(_connection);
- StringAdapterInfoDict::iterator i = _adapters.find(adapterId);
- if(proxy)
+ updated = found;
+
+ if(replicaGroupId != oldInfo.replicaGroupId)
{
- if(i == _adapters.end())
- {
- _adapters.put(StringAdapterInfoDict::value_type(adapterId, info));
- }
- else
- {
- updated = true;
- i.set(info);
- }
+ _adaptersByGroupId.del(txn, oldInfo.replicaGroupId, adapterId);
}
- else
- {
- if(i == _adapters.end())
- {
- return;
- }
- _adapters.erase(i);
- }
- dbSerial = updateSerial(_connection, adaptersDbName, dbSerial);
- txHolder.commit();
- break;
+ addAdapter(txn, info);
}
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
+ else
{
- halt(_communicator, ex);
+ if(!found)
+ {
+ return;
+ }
+ deleteAdapter(txn, oldInfo);
}
+ dbSerial = updateSerial(txn, adaptersDbName, dbSerial);
+
+ txn.commit();
+ }
+ catch(const IceDB::KeyTooLongException&)
+ {
+ throw;
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
}
if(_traceLevels->adapter > 0)
@@ -1037,7 +1057,7 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
{
out << " with replica group `" << replicaGroupId << "'";
}
- out << " (serial = `" << dbSerial << "')";
+ out << " (serial = `" << dbSerial << "')";
}
if(proxy)
@@ -1063,16 +1083,16 @@ Ice::ObjectPrx
Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& encoding, const Ice::ConnectionPtr& con,
const Ice::Context& ctx)
{
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringAdapterInfoDict adapters(connection, adaptersDbName);
- StringAdapterInfoDict::const_iterator i = adapters.find(id);
- if(i != adapters.end())
+ IceDB::ReadOnlyTxn txn(_env);
+
+ AdapterInfo info;
+ if(_adapters.get(txn, id, info))
{
- return i->second.proxy;
+ return info.proxy;
}
Ice::EndpointSeq endpoints;
- vector<AdapterInfo> infos = findByReplicaGroupId(adapters, id);
+ vector<AdapterInfo> infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id);
filterAdapterInfos("", id, _pluginFacade, con, ctx, infos);
for(unsigned int i = 0; i < infos.size(); ++i)
{
@@ -1109,41 +1129,41 @@ Database::removeAdapter(const string& adapterId)
AdapterInfoSeq infos;
Ice::Long dbSerial = 0;
- for(;;)
+ try
{
- try
+ IceDB::ReadWriteTxn txn(_env);
+
+ AdapterInfo info;
+ if(_adapters.get(txn, adapterId, info))
{
- TransactionHolder txHolder(_connection);
- StringAdapterInfoDict::iterator i = _adapters.find(adapterId);
- if(i != _adapters.end())
+ deleteAdapter(txn, info);
+ }
+ else
+ {
+ infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, adapterId);
+ if(infos.empty())
{
- _adapters.erase(i);
+ throw AdapterNotExistException(adapterId);
}
- else
+ for(AdapterInfoSeq::iterator p = infos.begin(); p != infos.end(); ++p)
{
- infos = findByReplicaGroupId(_adapters, adapterId);
- if(infos.empty())
- {
- throw AdapterNotExistException(adapterId);
- }
- for(AdapterInfoSeq::iterator p = infos.begin(); p != infos.end(); ++p)
- {
- p->replicaGroupId.clear();
- _adapters.put(StringAdapterInfoDict::value_type(p->id, *p));
- }
+ _adaptersByGroupId.del(txn, p->replicaGroupId, p->id);
+ p->replicaGroupId.clear();
+ addAdapter(txn, *p);
}
- dbSerial = updateSerial(_connection, adaptersDbName);
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_communicator, ex);
}
+ dbSerial = updateSerial(txn, adaptersDbName);
+
+ txn.commit();
+ }
+ catch(const IceDB::KeyTooLongException&)
+ {
+ throw;
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
}
if(_traceLevels->adapter > 0)
@@ -1254,13 +1274,13 @@ Database::getAdapterInfo(const string& id)
// Otherwise, we check the adapter endpoint table -- if there's an
// entry the adapter is managed by the registry itself.
//
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringAdapterInfoDict adapters(connection, adaptersDbName);
+ IceDB::ReadOnlyTxn txn(_env);
+
+ AdapterInfo info;
AdapterInfoSeq infos;
- StringAdapterInfoDict::const_iterator i = adapters.find(id);
- if(i != adapters.end())
+ if(_adapters.get(txn, id, info))
{
- infos.push_back(i->second);
+ infos.push_back(info);
}
else
{
@@ -1268,7 +1288,7 @@ Database::getAdapterInfo(const string& id)
// If it's not a regular object adapter, perhaps it's a replica
// group...
//
- infos = findByReplicaGroupId(adapters, id);
+ infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id);
if(infos.empty())
{
throw AdapterNotExistException(id);
@@ -1310,13 +1330,13 @@ Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con
// Otherwise, we check the adapter endpoint table -- if there's an
// entry the adapter is managed by the registry itself.
//
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringAdapterInfoDict adapters(connection, adaptersDbName);
+ IceDB::ReadOnlyTxn txn(_env);
+
+ AdapterInfo info;
AdapterInfoSeq infos;
- StringAdapterInfoDict::const_iterator i = adapters.find(id);
- if(i != adapters.end())
+ if(_adapters.get(txn, id, info))
{
- infos.push_back(i->second);
+ infos.push_back(info);
}
else
{
@@ -1324,7 +1344,7 @@ Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con
// If it's not a regular object adapter, perhaps it's a replica
// group...
//
- infos = findByReplicaGroupId(adapters, id);
+ infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id);
if(infos.empty())
{
throw AdapterNotExistException(id);
@@ -1393,18 +1413,25 @@ Database::getAllAdapters(const string& expression)
result.swap(ids);
set<string> groups;
- for(StringAdapterInfoDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ IceDB::ReadOnlyTxn txn(_env);
+
+ string name;
+ AdapterInfo info;
+ AdapterMapROCursor cursor(_adapters, txn);
+ while(cursor.get(name, info, MDB_NEXT))
{
- if(expression.empty() || IceUtilInternal::match(p->first, expression, true))
+ if(expression.empty() || IceUtilInternal::match(name, expression, true))
{
- result.push_back(p->first);
+ result.push_back(name);
}
- string replicaGroupId = p->second.replicaGroupId;
+ string replicaGroupId = info.replicaGroupId;
if(!replicaGroupId.empty() && (expression.empty() || IceUtilInternal::match(replicaGroupId, expression, true)))
{
groups.insert(replicaGroupId);
}
}
+ cursor.close();
+
//
// COMPILERFIX: We're not using result.insert() here, this doesn't compile on Sun.
//
@@ -1432,29 +1459,23 @@ Database::addObject(const ObjectInfo& info)
}
Ice::Long dbSerial = 0;
- for(;;)
+ try
{
- try
- {
- TransactionHolder txHolder(_connection);
- IdentityObjectInfoDict::const_iterator i = _objects.find(id);
- if(i != _objects.end())
- {
- throw ObjectExistsException(id);
- }
- _objects.put(IdentityObjectInfoDict::value_type(id, info));
- dbSerial = updateSerial(_connection, objectsDbName);
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
+ IceDB::ReadWriteTxn txn(_env);
+
+ if(_objects.find(txn, id))
{
- halt(_communicator, ex);
+ throw ObjectExistsException(id);
}
+ addObject(txn, info, false);
+ dbSerial = updateSerial(txn, objectsDbName);
+
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
}
serial = _objectObserverTopic->objectAdded(dbSerial, info);
@@ -1484,33 +1505,26 @@ Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial)
}
bool update = false;
- for(;;)
+ try
{
- try
- {
- TransactionHolder txHolder(_connection);
- IdentityObjectInfoDict::iterator i = _objects.find(id);
- if(i != _objects.end())
- {
- update = true;
- i.set(info);
- }
- else
- {
- _objects.put(IdentityObjectInfoDict::value_type(id, info));
- }
- dbSerial = updateSerial(_connection, objectsDbName, dbSerial);
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
+ IceDB::ReadWriteTxn txn(_env);
+
+ Ice::Identity k;
+ ObjectInfo v;
+ update = _objects.get(txn, k, v);
+ if(update)
{
- halt(_communicator, ex);
+ _objectsByType.del(txn, v.type, v.proxy->ice_getIdentity());
}
+ addObject(txn, info, false);
+ dbSerial = updateSerial(txn, objectsDbName, dbSerial);
+
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
}
if(update)
@@ -1549,32 +1563,26 @@ Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial)
throw ex;
}
- for(;;)
+ try
{
- try
- {
- TransactionHolder txHolder(_connection);
- IdentityObjectInfoDict::iterator i = _objects.find(id);
- if(i == _objects.end())
- {
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
- }
+ IceDB::ReadWriteTxn txn(_env);
- _objects.erase(i);
- dbSerial = updateSerial(_connection, objectsDbName, dbSerial);
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
+ ObjectInfo info;
+ if(!_objects.get(txn, id, info))
{
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_communicator, ex);
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
}
+ deleteObject(txn, info, false);
+ dbSerial = updateSerial(txn, objectsDbName, dbSerial);
+
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
}
serial = _objectObserverTopic->objectRemoved(dbSerial, id);
@@ -1610,33 +1618,26 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
ObjectInfo info;
Ice::Long dbSerial = 0;
- for(;;)
+ try
{
- try
- {
- TransactionHolder txHolder(_connection);
- IdentityObjectInfoDict::iterator i = _objects.find(id);
- if(i == _objects.end())
- {
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
- }
- info = i->second;
- info.proxy = proxy;
- i.set(info);
- dbSerial = updateSerial(_connection, objectsDbName);
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
+ IceDB::ReadWriteTxn txn(_env);
+
+ if(!_objects.get(txn, id, info))
{
- halt(_communicator, ex);
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
}
+ info.proxy = proxy;
+ addObject(txn, info, false);
+ dbSerial = updateSerial(txn, objectsDbName);
+
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
}
serial = _objectObserverTopic->objectUpdated(dbSerial, info);
@@ -1653,27 +1654,27 @@ int
Database::addOrUpdateRegistryWellKnownObjects(const ObjectInfoSeq& objects)
{
Lock sync(*this);
- for(;;)
+ try
{
- try
+ IceDB::ReadWriteTxn txn(_env);
+ for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
- TransactionHolder txHolder(_connection);
- for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
+ Ice::Identity id = p->proxy->ice_getIdentity();
+ ObjectInfo info;
+ if(_objects.get(txn, id, info))
{
- _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p));
+ _objectsByType.del(txn, info.type, id);
}
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_communicator, ex);
+ addObject(txn, *p, false);
}
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
}
+
return _objectObserverTopic->wellKnownObjectsAddedOrUpdated(objects);
}
@@ -1681,27 +1682,26 @@ int
Database::removeRegistryWellKnownObjects(const ObjectInfoSeq& objects)
{
Lock sync(*this);
- for(;;)
+ try
{
- try
+ IceDB::ReadWriteTxn txn(_env);
+ for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
- TransactionHolder txHolder(_connection);
- for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
+ Ice::Identity id = p->proxy->ice_getIdentity();
+ ObjectInfo info;
+ if(_objects.get(txn, id, info))
{
- _objects.erase(p->proxy->ice_getIdentity());
+ deleteObject(txn, info, false);
}
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_communicator, ex);
}
+ txn.commit();
}
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
+ }
+
return _objectObserverTopic->wellKnownObjectsRemoved(objects);
}
@@ -1719,16 +1719,15 @@ Database::getObjectProxy(const Ice::Identity& id)
{
}
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, objectsDbName);
- IdentityObjectInfoDict::const_iterator i = objects.find(id);
- if(i == objects.end())
+ IceDB::ReadOnlyTxn txn(_env);
+ ObjectInfo info;
+ if(!_objects.get(txn, id, info))
{
ObjectNotRegisteredException ex;
ex.id = id;
throw ex;
}
- return i->second.proxy;
+ return info.proxy;
}
Ice::ObjectPrx
@@ -1780,9 +1779,8 @@ Database::getObjectsByType(const string& type, const Ice::ConnectionPtr& con, co
{
Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type);
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, objectsDbName);
- vector<ObjectInfo> infos = findByType(objects, type);
+ IceDB::ReadOnlyTxn txn(_env);
+ vector<ObjectInfo> infos = findByType(txn, _objects, _objectsByType, type);
for(unsigned int i = 0; i < infos.size(); ++i)
{
proxies.push_back(infos[i].proxy);
@@ -1814,14 +1812,13 @@ Database::getObjectInfo(const Ice::Identity& id)
{
}
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, objectsDbName);
- IdentityObjectInfoDict::const_iterator i = objects.find(id);
- if(i == objects.end())
+ IceDB::ReadOnlyTxn txn(_env);
+ ObjectInfo info;
+ if(!_objects.get(txn, id, info))
{
throw ObjectNotRegisteredException(id);
}
- return i->second;
+ return info;
}
ObjectInfoSeq
@@ -1829,13 +1826,16 @@ Database::getAllObjectInfos(const string& expression)
{
ObjectInfoSeq infos = _objectCache.getAll(expression);
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, objectsDbName);
- for(IdentityObjectInfoDict::const_iterator p = objects.begin(); p != objects.end(); ++p)
+ IceDB::ReadOnlyTxn txn(_env);
+
+ Ice::Identity id;
+ ObjectInfo info;
+ ObjectsMapROCursor cursor(_objects, txn);
+ while(cursor.get(id, info, MDB_NEXT))
{
- if(expression.empty() || IceUtilInternal::match(_communicator->identityToString(p->first), expression, true))
+ if(expression.empty() || IceUtilInternal::match(_communicator->identityToString(id), expression, true))
{
- infos.push_back(p->second);
+ infos.push_back(info);
}
}
return infos;
@@ -1846,9 +1846,8 @@ Database::getObjectInfosByType(const string& type)
{
ObjectInfoSeq infos = _objectCache.getAllByType(type);
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, objectsDbName);
- ObjectInfoSeq dbInfos = findByType(objects, type);
+ IceDB::ReadOnlyTxn txn(_env);
+ ObjectInfoSeq dbInfos = findByType(txn, _objects, _objectsByType, type);
for(unsigned int i = 0; i < dbInfos.size(); ++i)
{
infos.push_back(dbInfos[i]);
@@ -1862,31 +1861,27 @@ Database::addInternalObject(const ObjectInfo& info, bool replace)
Lock sync(*this);
const Ice::Identity id = info.proxy->ice_getIdentity();
- for(;;)
+ try
{
- try
+ IceDB::ReadWriteTxn txn(_env);
+
+ ObjectInfo oldInfo;
+ if(_internalObjects.get(txn, id, oldInfo))
{
- TransactionHolder txHolder(_connection);
if(!replace)
{
- IdentityObjectInfoDict::const_iterator i = _internalObjects.find(id);
- if(i != _internalObjects.end())
- {
- throw ObjectExistsException(id);
- }
+ throw ObjectExistsException(id);
}
- _internalObjects.put(IdentityObjectInfoDict::value_type(id, info));
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_communicator, ex);
+ _internalObjectsByType.del(txn, oldInfo.type, id);
}
+ addObject(txn, info, true);
+
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
}
}
@@ -1895,30 +1890,25 @@ Database::removeInternalObject(const Ice::Identity& id)
{
Lock sync(*this);
- for(;;)
+ try
{
- try
- {
- TransactionHolder txHolder(_connection);
- IdentityObjectInfoDict::iterator i = _internalObjects.find(id);
- if(i == _internalObjects.end())
- {
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
- }
- _internalObjects.erase(i);
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
+ IceDB::ReadWriteTxn txn(_env);
+
+ ObjectInfo info;
+ if(!_internalObjects.get(txn, id, info))
{
- halt(_communicator, ex);
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
}
+ deleteObject(txn, info, true);
+
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
}
}
@@ -1927,9 +1917,8 @@ Database::getInternalObjectsByType(const string& type)
{
Ice::ObjectProxySeq proxies;
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict internalObjects(connection, internalObjectsDbName);
- vector<ObjectInfo> infos = findByType(internalObjects, type);
+ IceDB::ReadOnlyTxn txn(_env);
+ vector<ObjectInfo> infos = findByType(txn, _internalObjects, _internalObjectsByType, type);
for(unsigned int i = 0; i < infos.size(); ++i)
{
proxies.push_back(infos[i].proxy);
@@ -1938,7 +1927,7 @@ Database::getInternalObjectsByType(const string& type)
}
void
-Database::checkForAddition(const ApplicationHelper& app, const ConnectionPtr& connection)
+Database::checkForAddition(const ApplicationHelper& app, const IceDB::ReadWriteTxn& txn)
{
set<string> serverIds;
set<string> adapterIds;
@@ -1949,18 +1938,16 @@ Database::checkForAddition(const ApplicationHelper& app, const ConnectionPtr& co
for_each(serverIds.begin(), serverIds.end(), objFunc(*this, &Database::checkServerForAddition));
if(!adapterIds.empty())
{
- StringAdapterInfoDict adapters(connection, adaptersDbName);
for(set<string>::const_iterator p = adapterIds.begin(); p != adapterIds.end(); ++p)
{
- checkAdapterForAddition(*p, adapters);
+ checkAdapterForAddition(*p, txn);
}
}
if(!objectIds.empty())
{
- IdentityObjectInfoDict objects(connection, objectsDbName);
for(set<Ice::Identity>::const_iterator p = objectIds.begin(); p != objectIds.end(); ++p)
{
- checkObjectForAddition(*p, objects);
+ checkObjectForAddition(*p, txn);
}
}
@@ -1973,7 +1960,7 @@ Database::checkForAddition(const ApplicationHelper& app, const ConnectionPtr& co
void
Database::checkForUpdate(const ApplicationHelper& origApp,
const ApplicationHelper& newApp,
- const ConnectionPtr& connection)
+ const IceDB::ReadWriteTxn& txn)
{
set<string> oldSvrs, newSvrs;
set<string> oldAdpts, newAdpts;
@@ -1990,10 +1977,9 @@ Database::checkForUpdate(const ApplicationHelper& origApp,
set_difference(newAdpts.begin(), newAdpts.end(), oldAdpts.begin(), oldAdpts.end(), back_inserter(addedAdpts));
if(!addedAdpts.empty())
{
- StringAdapterInfoDict adapters(connection, adaptersDbName);
for(Ice::StringSeq::const_iterator p = addedAdpts.begin(); p != addedAdpts.end(); ++p)
{
- checkAdapterForAddition(*p, adapters);
+ checkAdapterForAddition(*p, txn);
}
}
@@ -2001,10 +1987,9 @@ Database::checkForUpdate(const ApplicationHelper& origApp,
set_difference(newObjs.begin(), newObjs.end(), oldObjs.begin(), oldObjs.end(), back_inserter(addedObjs));
if(!addedObjs.empty())
{
- IdentityObjectInfoDict objects(connection, objectsDbName);
for(vector<Ice::Identity>::const_iterator p = addedObjs.begin(); p != addedObjs.end(); ++p)
{
- checkObjectForAddition(*p, objects);
+ checkObjectForAddition(*p, txn);
}
}
@@ -2054,7 +2039,7 @@ Database::checkServerForAddition(const string& id)
}
void
-Database::checkAdapterForAddition(const string& id, const StringAdapterInfoDict& adapters)
+Database::checkAdapterForAddition(const string& id, const IceDB::ReadWriteTxn& txn)
{
bool found = false;
if(_adapterCache.has(id))
@@ -2063,14 +2048,13 @@ Database::checkAdapterForAddition(const string& id, const StringAdapterInfoDict&
}
else
{
- StringAdapterInfoDict::const_iterator i = adapters.find(id);
- if(i != adapters.end())
+ if(_adapters.find(txn, id))
{
found = true;
}
else
{
- if(!findByReplicaGroupId(adapters, id).empty())
+ if(!findByReplicaGroupId(txn, _adapters,_adaptersByGroupId, id).empty())
{
found = true;
}
@@ -2086,7 +2070,8 @@ Database::checkAdapterForAddition(const string& id, const StringAdapterInfoDict&
}
void
-Database::checkObjectForAddition(const Ice::Identity& objectId, const IdentityObjectInfoDict& objects)
+Database::checkObjectForAddition(const Ice::Identity& objectId,
+ const IceDB::ReadWriteTxn& txn)
{
bool found = false;
if(_objectCache.has(objectId) || _allocatableObjectCache.has(objectId))
@@ -2095,8 +2080,7 @@ Database::checkObjectForAddition(const Ice::Identity& objectId, const IdentityOb
}
else
{
- IdentityObjectInfoDict::const_iterator i = objects.find(objectId);
- if(i != objects.end())
+ if(_objects.find(txn, objectId))
{
found = true;
}
@@ -2342,57 +2326,19 @@ Database::reload(const ApplicationHelper& oldApp,
}
Ice::Long
-Database::saveApplication(const ApplicationInfo& info, const ConnectionPtr& connection, Ice::Long dbSerial)
+Database::saveApplication(const ApplicationInfo& info, const IceDB::ReadWriteTxn& txn, Ice::Long dbSerial)
{
assert(dbSerial != 0 || _master);
- for(;;)
- {
- try
- {
- StringApplicationInfoDict applications(connection, applicationsDbName);
- TransactionHolder txHolder(connection);
- applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info));
- dbSerial = updateSerial(connection, applicationsDbName, dbSerial);
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_communicator, ex);
- }
- }
- return dbSerial;
+ _applications.put(txn, info.descriptor.name, info);
+ return updateSerial(txn, applicationsDbName, dbSerial);
}
Ice::Long
-Database::removeApplication(const string& name, const ConnectionPtr& connection, Ice::Long dbSerial)
+Database::removeApplication(const string& name, const IceDB::ReadWriteTxn& txn, Ice::Long dbSerial)
{
assert(dbSerial != 0 || _master);
- for(;;)
- {
- try
- {
- StringApplicationInfoDict applications(connection, applicationsDbName);
- TransactionHolder txHolder(connection);
- applications.erase(name);
- dbSerial = updateSerial(connection, applicationsDbName, dbSerial);
- txHolder.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_communicator, ex);
- }
- }
- return dbSerial;
+ _applications.del(txn, name);
+ return updateSerial(txn, applicationsDbName, dbSerial);
}
void
@@ -2598,7 +2544,6 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
Ice::Long dbSerial)
{
const ApplicationDescriptor& newDesc = helper.getDefinition();
- ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
ServerEntrySeq entries;
int serial = 0;
@@ -2610,7 +2555,10 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
}
Lock sync(*this);
- checkForUpdate(previous, helper, connection);
+
+ IceDB::ReadWriteTxn txn(_env);
+
+ checkForUpdate(previous, helper, txn);
reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1, noRestart);
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
@@ -2620,7 +2568,9 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
info.updateUser = update.updateUser;
info.revision = update.revision;
info.descriptor = newDesc;
- dbSerial = saveApplication(info, connection, dbSerial);
+ dbSerial = saveApplication(info, txn, dbSerial);
+
+ txn.commit();
serial = _applicationObserverTopic->applicationUpdated(dbSerial, update);
}
@@ -2629,6 +2579,11 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
finishUpdating(update.descriptor.name);
throw;
}
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ throw;
+ }
_applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for replicas to be updated.
@@ -2670,7 +2625,18 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
ApplicationInfo info = oldApp;
info.revision = update.revision + 1;
- dbSerial = saveApplication(info, connection);
+
+ try
+ {
+ IceDB::ReadWriteTxn txn(_env);
+ dbSerial = saveApplication(info, txn);
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_communicator, ex);
+ }
+
reload(previous, helper, entries, info.uuid, info.revision, noRestart);
newUpdate.updateTime = IceUtil::Time::now().toMilliSeconds();
@@ -2729,3 +2695,97 @@ Database::finishUpdating(const string& name)
_updating.erase(p);
notifyAll();
}
+
+Ice::Long
+Database::getSerial(const IceDB::Txn& txn, const string& dbName)
+{
+ Ice::Long serial = 1;
+ _serials.get(txn, dbName, serial);
+ return serial;
+}
+
+Ice::Long
+Database::updateSerial(const IceDB::ReadWriteTxn& txn, const string& dbName, Ice::Long serial)
+{
+ if(serial == -1) // The master we are talking to doesn't support serials (old IceGrid versions)
+ {
+ return -1;
+ }
+
+ //
+ // If a serial number is set, just update the serial number from the database,
+ // otherwise if the serial is 0, we increment the serial from the database.
+ //
+ if(serial > 0)
+ {
+ _serials.put(txn, dbName, serial);
+ return serial;
+ }
+ else
+ {
+ Ice::Long dbSerial = getSerial(txn, dbName) + 1;
+ _serials.put(txn, dbName, dbSerial);
+ return dbSerial;
+ }
+}
+
+void
+Database::addAdapter(const IceDB::ReadWriteTxn& txn, const AdapterInfo& info)
+{
+ _adapters.put(txn, info.id, info);
+ _adaptersByGroupId.put(txn, info.replicaGroupId, info.id);
+}
+
+void
+Database::deleteAdapter(const IceDB::ReadWriteTxn& txn, const AdapterInfo& info)
+{
+
+ _adapters.del(txn, info.id);
+ _adaptersByGroupId.del(txn, info.replicaGroupId, info.id);
+}
+
+void
+Database::addObject(const IceDB::ReadWriteTxn& txn, const ObjectInfo& info, bool internal)
+{
+ if(internal)
+ {
+ _internalObjects.put(txn, info.proxy->ice_getIdentity(), info);
+ _internalObjectsByType.put(txn, info.type, info.proxy->ice_getIdentity());
+ }
+ else
+ {
+ try
+ {
+ _objects.put(txn, info.proxy->ice_getIdentity(), info);
+ }
+ catch(const IceDB::KeyTooLongException& ex)
+ {
+ throw DeploymentException("object identity `" +
+ _communicator->identityToString(info.proxy->ice_getIdentity())
+ + "' is too long: " + ex.what());
+ }
+ try
+ {
+ _objectsByType.put(txn, info.type, info.proxy->ice_getIdentity());
+ }
+ catch(const IceDB::KeyTooLongException& ex)
+ {
+ throw DeploymentException("object type `" + info.type + "' is too long: " + ex.what());
+ }
+ }
+}
+
+void
+Database::deleteObject(const IceDB::ReadWriteTxn& txn, const ObjectInfo& info, bool internal)
+{
+ if(internal)
+ {
+ _internalObjects.del(txn, info.proxy->ice_getIdentity());
+ _internalObjectsByType.del(txn, info.type, info.proxy->ice_getIdentity());
+ }
+ else
+ {
+ _objects.del(txn, info.proxy->ice_getIdentity());
+ _objectsByType.del(txn, info.type, info.proxy->ice_getIdentity());
+ }
+}