summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
authorJose <jose@zeroc.com>2014-04-18 18:31:48 +0200
committerJose <jose@zeroc.com>2014-04-18 18:31:48 +0200
commite7333297345efda9379045495d17aadb571ddd50 (patch)
tree5bfa86a78d29665e1ef50a9575b76114be1145e1 /cpp/src/IceGrid/Database.cpp
parentFixed replicaGroup test issue (diff)
downloadice-e7333297345efda9379045495d17aadb571ddd50.tar.bz2
ice-e7333297345efda9379045495d17aadb571ddd50.tar.xz
ice-e7333297345efda9379045495d17aadb571ddd50.zip
Fixed (ICE-4858) - Eliminate IceDB
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp579
1 files changed, 295 insertions, 284 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 216525c0314..25d513ac724 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -21,8 +21,8 @@
#include <IceGrid/ReplicaSessionI.h>
#include <IceGrid/Session.h>
#include <IceGrid/Topics.h>
-#include <IceGrid/DB.h>
#include <IceGrid/IceGrid.h>
+#include <IceGrid/SerialsDict.h>
#include <algorithm>
#include <functional>
@@ -30,12 +30,17 @@
using namespace std;
using namespace IceGrid;
-
-using namespace IceDB;
+using namespace Freeze;
namespace
{
+const string applicationsDbName = "applications";
+const string adaptersDbName = "adapters";
+const string objectsDbName = "objects";
+const string internalObjectsDbName = "internal-objects";
+const string serialsDbName = "serials";
+
struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::ObjectPrx, float>&, bool>
{
bool operator()(const pair<Ice::ObjectPrx, float>& lhs, const pair<Ice::ObjectPrx, float>& rhs)
@@ -44,17 +49,33 @@ struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::Ob
}
};
-template<typename K, typename V> vector<V>
-toVector(const map<K,V>& m)
+template<typename K, typename V, typename C, typename Comp> vector<V>
+toVector(const Map<K, V, C, Comp>& m)
{
vector<V> v;
- for(typename map<K,V>::const_iterator p = m.begin(); p != m.end(); ++p)
+ for(typename Map<K, V, C, Comp>::const_iterator p = m.begin(); p != m.end(); ++p)
{
v.push_back(p->second);
}
return v;
}
+template<typename K, typename V, typename C, typename Comp> map<K, V>
+toMap(const Map<K, V, C, Comp>& d)
+{
+ std::map<K, V> m;
+ for(typename Map<K, V, C, Comp>::const_iterator p = d.begin(); p != d.end(); ++p)
+ {
+#ifdef __SUNPRO_CC
+ std::map<Key, Value>::value_type v(p->first, p->second);
+ m.insert(v);
+#else
+ m.insert(*p);
+#endif
+ }
+ return m;
+}
+
void
halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
{
@@ -113,6 +134,73 @@ filterAdapterInfos(const string& filter,
infos.swap(filteredAdpts);
}
+Ice::Long
+getSerial(const Freeze::ConnectionPtr& connection, const string& dbName)
+{
+ SerialsDict dict(connection, serialsDbName);
+
+ //
+ // If a serial number is provided, juste update the serial number from the database,
+ // otherwise if the serial is 0, we increment the serial from the database.
+ //
+ SerialsDict::iterator p = dict.find(dbName);
+ if(p == dict.end())
+ {
+ dict.insert(SerialsDict::value_type(dbName, 1));
+ return 1;
+ }
+ return p->second;
+}
+
+Ice::Long
+updateSerial(const Freeze::ConnectionPtr& connection, const string& dbName, Ice::Long serial = 0)
+{
+ if(serial == -1) // Master doesn't support serials.
+ {
+ return -1;
+ }
+
+ SerialsDict dict(connection, serialsDbName);
+
+ //
+ // If a serial number is provided, juste update the serial number from the database,
+ // otherwise if the serial is 0, we increment the serial from the database.
+ //
+ SerialsDict::iterator p = dict.find(dbName);
+ if(p == dict.end())
+ {
+ dict.insert(SerialsDict::value_type(dbName, serial == 0 ? 1 : serial));
+ return 1;
+ }
+ else
+ {
+ p.set(serial == 0 ? p->second + 1 : serial);
+ return p->second;
+ }
+}
+
+vector<AdapterInfo>
+findByReplicaGroupId(const StringAdapterInfoDict& dict, const string& name)
+{
+ vector<AdapterInfo> result;
+ for(StringAdapterInfoDict::const_iterator p = dict.findByReplicaGroupId(name, true); p != dict.end(); ++p)
+ {
+ result.push_back(p->second);
+ }
+ return result;
+}
+
+vector<ObjectInfo>
+findByType(const IdentityObjectInfoDict& dict, const string& type)
+{
+ vector<ObjectInfo> result;
+ for(IdentityObjectInfoDict::const_iterator p = dict.findByType(type); p != dict.end(); ++p)
+ {
+ result.push_back(p->second);
+ }
+ return result;
+}
+
}
Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
@@ -120,7 +208,8 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
const string& instanceName,
const TraceLevelsPtr& traceLevels,
const RegistryInfo& info,
- const DatabasePluginPtr& plugin,
+ const Freeze::ConnectionPtr& connection,
+ const string& envName,
bool readonly) :
_communicator(registryAdapter->getCommunicator()),
_internalAdapter(registryAdapter),
@@ -135,17 +224,18 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_objectCache(_communicator),
_allocatableObjectCache(_communicator),
_serverCache(_communicator, _instanceName, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache),
- _connectionPool(plugin->getConnectionPool()),
- _databasePlugin(plugin),
+ _connection(connection),
+ _envName(envName),
+ _applications(_connection, applicationsDbName),
+ _adapters(_connection, adaptersDbName),
+ _objects(_connection, objectsDbName),
+ _internalObjects(_connection, internalObjectsDbName),
_pluginFacade(RegistryPluginFacadeIPtr::dynamicCast(getRegistryPluginFacade())),
_lock(0)
{
ServerEntrySeq entries;
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
- map<string, ApplicationInfo> applications = applicationsWrapper->getMap();
- for(map<string, ApplicationInfo>::iterator p = applications.begin(); p != applications.end(); ++p)
+ for(StringApplicationInfoDict::iterator p = _applications.begin(); p != _applications.end(); ++p)
{
try
{
@@ -167,25 +257,15 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter);
_registryObserverTopic = new RegistryObserverTopic(_topicManager);
- _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, applicationsWrapper);
- _adapterObserverTopic = new AdapterObserverTopic(_topicManager, _connectionPool->getAdapters(connection));
- _objectObserverTopic = new ObjectObserverTopic(_topicManager, _connectionPool->getObjects(connection));
+ _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, toMap(_applications), getSerial(_connection, applicationsDbName));
+ _adapterObserverTopic = new AdapterObserverTopic(_topicManager, toMap(_adapters), getSerial(_connection, adaptersDbName));
+ _objectObserverTopic = new ObjectObserverTopic(_topicManager, toMap(_objects), getSerial(_connection, objectsDbName));
_registryObserverTopic->registryUp(info);
_pluginFacade->setDatabase(this);
}
-Database::~Database()
-{
- //
- // Release first the cache and then the plugin. This must be done in this order
- // to make sure the plugin is destroyed after the database cache.
- //
- _connectionPool = 0;
- _databasePlugin = 0;
-}
-
std::string
Database::getInstanceName() const
{
@@ -275,18 +355,16 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long
map<string, ApplicationInfo> oldApplications;
for(;;)
{
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
try
{
- TransactionHolder txHolder(connection);
- ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
- oldApplications = applicationsWrapper->getMap();
- applicationsWrapper->clear();
+ TransactionHolder txHolder(_connection);
+ oldApplications = toMap(_applications);
+ _applications.clear();
for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p)
{
- applicationsWrapper->put(p->descriptor.name, *p);
+ _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p));
}
- dbSerial = applicationsWrapper->updateSerial(dbSerial);
+ dbSerial = updateSerial(_connection, applicationsDbName, dbSerial);
txHolder.commit();
break;
}
@@ -299,7 +377,7 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long
halt(_communicator, ex);
}
}
-
+
ServerEntrySeq entries;
set<string> names;
@@ -357,17 +435,15 @@ Database::syncAdapters(const AdapterInfoSeq& adapters, Ice::Long dbSerial)
Lock sync(*this);
for(;;)
{
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
try
{
- TransactionHolder txHolder(connection);
- AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
- adaptersWrapper->clear();
+ TransactionHolder txHolder(_connection);
+ _adapters.clear();
for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r)
{
- adaptersWrapper->put(r->id, *r);
+ _adapters.put(StringAdapterInfoDict::value_type(r->id, *r));
}
- dbSerial = adaptersWrapper->updateSerial(dbSerial);
+ dbSerial = updateSerial(_connection, adaptersDbName, dbSerial);
txHolder.commit();
break;
}
@@ -401,17 +477,15 @@ Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial)
Lock sync(*this);
for(;;)
{
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
try
{
- TransactionHolder txHolder(connection);
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
- objectsWrapper->clear();
+ TransactionHolder txHolder(_connection);
+ _objects.clear();
for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
{
- objectsWrapper->put(q->proxy->ice_getIdentity(), *q);
+ _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q));
}
- dbSerial = objectsWrapper->updateSerial(dbSerial);
+ dbSerial = updateSerial(_connection, objectsDbName, dbSerial);
txHolder.commit();
break;
}
@@ -443,11 +517,11 @@ Database::getApplications(Ice::Long& serial) const
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
TransactionHolder txHolder(connection);
- ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
- serial = applicationsWrapper->getSerial();
- return toVector(applicationsWrapper->getMap());
+ StringApplicationInfoDict applications(connection, applicationsDbName);
+ serial = getSerial(connection, applicationsDbName);
+ return toVector(applications);
}
catch(const DeadlockException&)
{
@@ -467,11 +541,11 @@ Database::getAdapters(Ice::Long& serial) const
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
TransactionHolder txHolder(connection);
- AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
- serial = adaptersWrapper->getSerial();
- return toVector(adaptersWrapper->getMap());
+ StringAdapterInfoDict adapters(connection, adaptersDbName);
+ serial = getSerial(connection, adaptersDbName);
+ return toVector(adapters);
}
catch(const DeadlockException&)
{
@@ -491,11 +565,11 @@ Database::getObjects(Ice::Long& serial) const
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
TransactionHolder txHolder(connection);
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
- serial = objectsWrapper->getSerial();
- return toVector(objectsWrapper->getMap());
+ IdentityObjectInfoDict objects(connection, objectsDbName);
+ serial = getSerial(connection, objectsDbName);
+ return toVector(objects);
}
catch(const DeadlockException&)
{
@@ -511,7 +585,7 @@ Database::getObjects(Ice::Long& serial) const
StringLongDict
Database::getSerials() const
{
- return _connectionPool->getSerials();
+ return toMap(SerialsDict(Freeze::createConnection(_communicator, _envName), serialsDbName));
}
void
@@ -528,20 +602,15 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic
waitForUpdate(info.descriptor.name);
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- try
+ StringApplicationInfoDict::const_iterator i = _applications.find(info.descriptor.name);
+ if(i != _applications.end())
{
- ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
- applicationsWrapper->find(info.descriptor.name);
throw DeploymentException("application `" + info.descriptor.name + "' already exists");
}
- catch(const NotFoundException&)
- {
- }
ApplicationHelper helper(_communicator, info.descriptor, true);
- checkForAddition(helper, connection);
- dbSerial = saveApplication(info, connection, dbSerial);
+ checkForAddition(helper, _connection);
+ dbSerial = saveApplication(info, _connection, dbSerial);
load(helper, entries, info.uuid, info.revision);
startUpdating(info.descriptor.name, info.uuid, info.revision);
@@ -589,7 +658,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic
Lock sync(*this);
entries.clear();
unload(ApplicationHelper(_communicator, info.descriptor), entries);
- dbSerial = removeApplication(info.descriptor.name, _connectionPool->getConnection());
+ dbSerial = removeApplication(info.descriptor.name, _connection);
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
serial = _applicationObserverTopic->applicationRemoved(dbSerial, info.descriptor.name);
@@ -635,16 +704,12 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, A
waitForUpdate(update.descriptor.name);
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
- try
- {
- oldApp = applicationsWrapper->find(update.descriptor.name);
- }
- catch(const NotFoundException&)
+ StringApplicationInfoDict::const_iterator i = _applications.find(update.descriptor.name);
+ if(i == _applications.end())
{
throw ApplicationNotExistException(update.descriptor.name);
}
+ oldApp = i->second;
if(update.revision < 0)
{
@@ -680,16 +745,12 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool n
waitForUpdate(newDesc.name);
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
- try
- {
- oldApp = applicationsWrapper->find(newDesc.name);
- }
- catch(const NotFoundException&)
+ StringApplicationInfoDict::const_iterator i = _applications.find(newDesc.name);
+ if(i == _applications.end())
{
throw ApplicationNotExistException(newDesc.name);
}
+ oldApp = i->second;
previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
helper.reset(new ApplicationHelper(_communicator, newDesc, true));
@@ -724,21 +785,18 @@ Database::instantiateServer(const string& application,
try
{
- Lock sync(*this);
+ Lock sync(*this);
checkSessionLock(session);
waitForUpdate(application);
-
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
- try
- {
- oldApp = applicationsWrapper->find(application);
- }
- catch(const NotFoundException&)
+
+ StringApplicationInfoDict::const_iterator i = _applications.find(application);
+ if(i == _applications.end())
{
throw ApplicationNotExistException(application);
+
}
+ oldApp = i->second;
previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
helper.reset(new ApplicationHelper(_communicator, previous->instantiateServer(node, instance), true));
@@ -772,17 +830,14 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon
waitForUpdate(name);
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
ApplicationInfo appInfo;
- try
- {
- ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
- appInfo = applicationsWrapper->find(name);
- }
- catch(const NotFoundException&)
+
+ StringApplicationInfoDict::const_iterator i = _applications.find(name);
+ if(i == _applications.end())
{
throw ApplicationNotExistException(name);
}
+ appInfo = i->second;
bool init = false;
try
@@ -800,7 +855,7 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon
}
}
- dbSerial = removeApplication(name, connection, dbSerial);
+ dbSerial = removeApplication(name, _connection, dbSerial);
startUpdating(name, appInfo.uuid, appInfo.revision);
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
@@ -829,24 +884,22 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon
ApplicationInfo
Database::getApplicationInfo(const std::string& name)
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
- try
- {
- ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
- return applicationsWrapper->find(name);
- }
- catch(const NotFoundException&)
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ StringApplicationInfoDict applications(connection, applicationsDbName);
+ StringApplicationInfoDict::const_iterator i = applications.find(name);
+ if(i == applications.end())
{
throw ApplicationNotExistException(name);
}
+ return i->second;
}
Ice::StringSeq
Database::getAllApplications(const string& expression)
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
- ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
- return getMatchingKeys<map<string, ApplicationInfo> >(applicationsWrapper->getMap(), expression);
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ StringApplicationInfoDict applications(connection, applicationsDbName);
+ return getMatchingKeys<map<string, ApplicationInfo> >(toMap(applications), expression);
}
void
@@ -939,27 +992,29 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- TransactionHolder txHolder(connection);
- AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
- try
- {
- adaptersWrapper->find(adapterId);
- updated = true;
- }
- catch(const NotFoundException&)
- {
- }
-
+ TransactionHolder txHolder(_connection);
+ StringAdapterInfoDict::iterator i = _adapters.find(adapterId);
if(proxy)
{
- adaptersWrapper->put(adapterId, info);
+ if(i == _adapters.end())
+ {
+ _adapters.put(StringAdapterInfoDict::value_type(adapterId, info));
+ }
+ else
+ {
+ updated = true;
+ i.set(info);
+ }
}
else
{
- adaptersWrapper->erase(adapterId);
+ if(i == _adapters.end())
+ {
+ return;
+ }
+ _adapters.erase(i);
}
- dbSerial = adaptersWrapper->updateSerial(dbSerial);
+ dbSerial = updateSerial(_connection, adaptersDbName, dbSerial);
txHolder.commit();
break;
}
@@ -1007,18 +1062,16 @@ Ice::ObjectPrx
Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& encoding, const Ice::ConnectionPtr& con,
const Ice::Context& ctx)
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
- AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
- try
- {
- return adaptersWrapper->find(id).proxy;
- }
- catch(const NotFoundException&)
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ StringAdapterInfoDict adapters(connection, adaptersDbName);
+ StringAdapterInfoDict::const_iterator i = adapters.find(id);
+ if(i != adapters.end())
{
+ return i->second.proxy;
}
Ice::EndpointSeq endpoints;
- vector<AdapterInfo> infos = adaptersWrapper->findByReplicaGroupId(id);
+ vector<AdapterInfo> infos = findByReplicaGroupId(adapters, id);
filterAdapterInfos("", id, _pluginFacade, con, ctx, infos);
for(unsigned int i = 0; i < infos.size(); ++i)
{
@@ -1059,17 +1112,15 @@ Database::removeAdapter(const string& adapterId)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- TransactionHolder txHolder(connection);
- AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
- try
+ TransactionHolder txHolder(_connection);
+ StringAdapterInfoDict::iterator i = _adapters.find(adapterId);
+ if(i != _adapters.end())
{
- adaptersWrapper->find(adapterId);
- adaptersWrapper->erase(adapterId);
+ _adapters.erase(i);
}
- catch(const NotFoundException&)
+ else
{
- infos = adaptersWrapper->findByReplicaGroupId(adapterId);
+ infos = findByReplicaGroupId(_adapters, adapterId);
if(infos.empty())
{
throw AdapterNotExistException(adapterId);
@@ -1077,10 +1128,10 @@ Database::removeAdapter(const string& adapterId)
for(AdapterInfoSeq::iterator p = infos.begin(); p != infos.end(); ++p)
{
p->replicaGroupId.clear();
- adaptersWrapper->put(p->id, *p);
+ _adapters.put(StringAdapterInfoDict::value_type(p->id, *p));
}
}
- dbSerial = adaptersWrapper->updateSerial();
+ dbSerial = updateSerial(_connection, adaptersDbName);
txHolder.commit();
break;
}
@@ -1202,20 +1253,21 @@ Database::getAdapterInfo(const string& id)
// Otherwise, we check the adapter endpoint table -- if there's an
// entry the adapter is managed by the registry itself.
//
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
- AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ StringAdapterInfoDict adapters(connection, adaptersDbName);
AdapterInfoSeq infos;
- try
+ StringAdapterInfoDict::const_iterator i = adapters.find(id);
+ if(i != adapters.end())
{
- infos.push_back(adaptersWrapper->find(id));
+ infos.push_back(i->second);
}
- catch(const NotFoundException&)
+ else
{
//
// If it's not a regular object adapter, perhaps it's a replica
// group...
//
- infos = adaptersWrapper->findByReplicaGroupId(id);
+ infos = findByReplicaGroupId(adapters, id);
if(infos.empty())
{
throw AdapterNotExistException(id);
@@ -1257,20 +1309,21 @@ Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con
// Otherwise, we check the adapter endpoint table -- if there's an
// entry the adapter is managed by the registry itself.
//
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
- AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ StringAdapterInfoDict adapters(connection, adaptersDbName);
AdapterInfoSeq infos;
- try
+ StringAdapterInfoDict::const_iterator i = adapters.find(id);
+ if(i != adapters.end())
{
- infos.push_back(adaptersWrapper->find(id));
+ infos.push_back(i->second);
}
- catch(const NotFoundException&)
+ else
{
//
// If it's not a regular object adapter, perhaps it's a replica
// group...
//
- infos = adaptersWrapper->findByReplicaGroupId(id);
+ infos = findByReplicaGroupId(adapters, id);
if(infos.empty())
{
throw AdapterNotExistException(id);
@@ -1339,10 +1392,7 @@ Database::getAllAdapters(const string& expression)
result.swap(ids);
set<string> groups;
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
- map<string, AdapterInfo> adapters = adaptersWrapper->getMap();
- for(map<string, AdapterInfo>::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
+ for(StringAdapterInfoDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
{
if(expression.empty() || IceUtilInternal::match(p->first, expression, true))
{
@@ -1385,19 +1435,14 @@ Database::addObject(const ObjectInfo& info)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- TransactionHolder txHolder(connection);
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
- try
+ TransactionHolder txHolder(_connection);
+ IdentityObjectInfoDict::const_iterator i = _objects.find(id);
+ if(i != _objects.end())
{
- objectsWrapper->find(id);
throw ObjectExistsException(id);
}
- catch(const NotFoundException&)
- {
- }
- objectsWrapper->put(id, info);
- dbSerial = objectsWrapper->updateSerial();
+ _objects.put(IdentityObjectInfoDict::value_type(id, info));
+ dbSerial = updateSerial(_connection, objectsDbName);
txHolder.commit();
break;
}
@@ -1442,19 +1487,18 @@ Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- TransactionHolder txHolder(connection);
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
- try
+ TransactionHolder txHolder(_connection);
+ IdentityObjectInfoDict::iterator i = _objects.find(id);
+ if(i != _objects.end())
{
- objectsWrapper->find(id);
update = true;
+ i.set(info);
}
- catch(const NotFoundException&)
+ else
{
+ _objects.put(IdentityObjectInfoDict::value_type(id, info));
}
- objectsWrapper->put(id, info);
- dbSerial = objectsWrapper->updateSerial(dbSerial);
+ dbSerial = updateSerial(_connection, objectsDbName, dbSerial);
txHolder.commit();
break;
}
@@ -1508,22 +1552,17 @@ Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- TransactionHolder txHolder(connection);
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
- try
- {
- objectsWrapper->find(id);
- }
- catch(const NotFoundException&)
+ TransactionHolder txHolder(_connection);
+ IdentityObjectInfoDict::iterator i = _objects.find(id);
+ if(i == _objects.end())
{
ObjectNotRegisteredException ex;
ex.id = id;
throw ex;
}
- objectsWrapper->erase(id);
- dbSerial = objectsWrapper->updateSerial(dbSerial);
+ _objects.erase(i);
+ dbSerial = updateSerial(_connection, objectsDbName, dbSerial);
txHolder.commit();
break;
}
@@ -1574,23 +1613,18 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- TransactionHolder txHolder(connection);
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
- try
- {
- info = objectsWrapper->find(id);
- }
- catch(const NotFoundException&)
+ TransactionHolder txHolder(_connection);
+ IdentityObjectInfoDict::iterator i = _objects.find(id);
+ if(i == _objects.end())
{
ObjectNotRegisteredException ex;
ex.id = id;
throw ex;
}
-
+ info = i->second;
info.proxy = proxy;
- objectsWrapper->put(id, info);
- dbSerial = objectsWrapper->updateSerial();
+ i.set(info);
+ dbSerial = updateSerial(_connection, objectsDbName);
txHolder.commit();
break;
}
@@ -1605,7 +1639,6 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
}
serial = _objectObserverTopic->objectUpdated(dbSerial, info);
-
if(_traceLevels->object > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
@@ -1623,12 +1656,10 @@ Database::addOrUpdateRegistryWellKnownObjects(const ObjectInfoSeq& objects)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- TransactionHolder txHolder(connection);
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
+ TransactionHolder txHolder(_connection);
for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
- objectsWrapper->put(p->proxy->ice_getIdentity(), *p);
+ _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p));
}
txHolder.commit();
break;
@@ -1653,12 +1684,10 @@ Database::removeRegistryWellKnownObjects(const ObjectInfoSeq& objects)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- TransactionHolder txHolder(connection);
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
+ TransactionHolder txHolder(_connection);
for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
- objectsWrapper->erase(p->proxy->ice_getIdentity());
+ _objects.erase(p->proxy->ice_getIdentity());
}
txHolder.commit();
break;
@@ -1689,18 +1718,16 @@ Database::getObjectProxy(const Ice::Identity& id)
{
}
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
- try
- {
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
- return objectsWrapper->find(id).proxy;
- }
- catch(const NotFoundException&)
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ IdentityObjectInfoDict objects(connection, objectsDbName);
+ IdentityObjectInfoDict::const_iterator i = objects.find(id);
+ if(i == objects.end())
{
ObjectNotRegisteredException ex;
ex.id = id;
throw ex;
}
+ return i->second.proxy;
}
Ice::ObjectPrx
@@ -1752,9 +1779,9 @@ Database::getObjectsByType(const string& type, const Ice::ConnectionPtr& con, co
{
Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type);
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
- vector<ObjectInfo> infos = objectsWrapper->findByType(type);
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ IdentityObjectInfoDict objects(connection, objectsDbName);
+ vector<ObjectInfo> infos = findByType(objects, type);
for(unsigned int i = 0; i < infos.size(); ++i)
{
proxies.push_back(infos[i].proxy);
@@ -1786,16 +1813,14 @@ Database::getObjectInfo(const Ice::Identity& id)
{
}
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
- try
- {
- return objectsWrapper->find(id);
- }
- catch(const NotFoundException&)
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ IdentityObjectInfoDict objects(connection, objectsDbName);
+ IdentityObjectInfoDict::const_iterator i = objects.find(id);
+ if(i == objects.end())
{
throw ObjectNotRegisteredException(id);
}
+ return i->second;
}
ObjectInfoSeq
@@ -1803,10 +1828,9 @@ Database::getAllObjectInfos(const string& expression)
{
ObjectInfoSeq infos = _objectCache.getAll(expression);
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
- map<Ice::Identity, ObjectInfo> objects = objectsWrapper->getMap();
- for(map<Ice::Identity, ObjectInfo>::const_iterator p = objects.begin(); p != objects.end(); ++p)
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ IdentityObjectInfoDict objects(connection, objectsDbName);
+ for(IdentityObjectInfoDict::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
if(expression.empty() || IceUtilInternal::match(_communicator->identityToString(p->first), expression, true))
{
@@ -1821,9 +1845,9 @@ Database::getObjectInfosByType(const string& type)
{
ObjectInfoSeq infos = _objectCache.getAllByType(type);
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
- ObjectInfoSeq dbInfos = objectsWrapper->findByType(type);
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ IdentityObjectInfoDict objects(connection, objectsDbName);
+ ObjectInfoSeq dbInfos = findByType(objects, type);
for(unsigned int i = 0; i < dbInfos.size(); ++i)
{
infos.push_back(dbInfos[i]);
@@ -1834,28 +1858,23 @@ Database::getObjectInfosByType(const string& type)
void
Database::addInternalObject(const ObjectInfo& info, bool replace)
{
- Lock sync(*this);
+ Lock sync(*this);
const Ice::Identity id = info.proxy->ice_getIdentity();
for(;;)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- TransactionHolder txHolder(connection);
- ObjectsWrapperPtr internalObjectsWrapper = _connectionPool->getInternalObjects(connection);
+ TransactionHolder txHolder(_connection);
if(!replace)
{
- try
+ IdentityObjectInfoDict::const_iterator i = _internalObjects.find(id);
+ if(i != _internalObjects.end())
{
- internalObjectsWrapper->find(id);
throw ObjectExistsException(id);
}
- catch(const NotFoundException&)
- {
- }
}
- internalObjectsWrapper->put(id, info);
+ _internalObjects.put(IdentityObjectInfoDict::value_type(id, info));
txHolder.commit();
break;
}
@@ -1866,7 +1885,7 @@ Database::addInternalObject(const ObjectInfo& info, bool replace)
catch(const DatabaseException& ex)
{
halt(_communicator, ex);
- }
+ }
}
}
@@ -1879,20 +1898,15 @@ Database::removeInternalObject(const Ice::Identity& id)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
- TransactionHolder txHolder(connection);
- ObjectsWrapperPtr internalObjectsWrapper = _connectionPool->getInternalObjects(connection);
- try
- {
- internalObjectsWrapper->find(id);
- }
- catch(const NotFoundException&)
+ TransactionHolder txHolder(_connection);
+ IdentityObjectInfoDict::iterator i = _internalObjects.find(id);
+ if(i == _internalObjects.end())
{
ObjectNotRegisteredException ex;
ex.id = id;
throw ex;
}
- internalObjectsWrapper->erase(id);
+ _internalObjects.erase(i);
txHolder.commit();
break;
}
@@ -1912,9 +1926,9 @@ Database::getInternalObjectsByType(const string& type)
{
Ice::ObjectProxySeq proxies;
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
- ObjectsWrapperPtr internalObjectsWrapper = _connectionPool->getInternalObjects(connection);
- vector<ObjectInfo> infos = internalObjectsWrapper->findByType(type);
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ IdentityObjectInfoDict internalObjects(connection, internalObjectsDbName);
+ vector<ObjectInfo> infos = findByType(internalObjects, type);
for(unsigned int i = 0; i < infos.size(); ++i)
{
proxies.push_back(infos[i].proxy);
@@ -1923,7 +1937,7 @@ Database::getInternalObjectsByType(const string& type)
}
void
-Database::checkForAddition(const ApplicationHelper& app, const DatabaseConnectionPtr& connection)
+Database::checkForAddition(const ApplicationHelper& app, const ConnectionPtr& connection)
{
set<string> serverIds;
set<string> adapterIds;
@@ -1934,18 +1948,18 @@ Database::checkForAddition(const ApplicationHelper& app, const DatabaseConnectio
for_each(serverIds.begin(), serverIds.end(), objFunc(*this, &Database::checkServerForAddition));
if(!adapterIds.empty())
{
- AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
+ StringAdapterInfoDict adapters(connection, adaptersDbName);
for(set<string>::const_iterator p = adapterIds.begin(); p != adapterIds.end(); ++p)
{
- checkAdapterForAddition(*p, adaptersWrapper);
+ checkAdapterForAddition(*p, adapters);
}
}
if(!objectIds.empty())
{
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
+ IdentityObjectInfoDict objects(connection, objectsDbName);
for(set<Ice::Identity>::const_iterator p = objectIds.begin(); p != objectIds.end(); ++p)
{
- checkObjectForAddition(*p, objectsWrapper);
+ checkObjectForAddition(*p, objects);
}
}
@@ -1958,7 +1972,7 @@ Database::checkForAddition(const ApplicationHelper& app, const DatabaseConnectio
void
Database::checkForUpdate(const ApplicationHelper& origApp,
const ApplicationHelper& newApp,
- const DatabaseConnectionPtr& connection)
+ const ConnectionPtr& connection)
{
set<string> oldSvrs, newSvrs;
set<string> oldAdpts, newAdpts;
@@ -1975,10 +1989,10 @@ Database::checkForUpdate(const ApplicationHelper& origApp,
set_difference(newAdpts.begin(), newAdpts.end(), oldAdpts.begin(), oldAdpts.end(), back_inserter(addedAdpts));
if(!addedAdpts.empty())
{
- AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
+ StringAdapterInfoDict adapters(connection, adaptersDbName);
for(Ice::StringSeq::const_iterator p = addedAdpts.begin(); p != addedAdpts.end(); ++p)
{
- checkAdapterForAddition(*p, adaptersWrapper);
+ checkAdapterForAddition(*p, adapters);
}
}
@@ -1986,10 +2000,10 @@ Database::checkForUpdate(const ApplicationHelper& origApp,
set_difference(newObjs.begin(), newObjs.end(), oldObjs.begin(), oldObjs.end(), back_inserter(addedObjs));
if(!addedObjs.empty())
{
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
+ IdentityObjectInfoDict objects(connection, objectsDbName);
for(vector<Ice::Identity>::const_iterator p = addedObjs.begin(); p != addedObjs.end(); ++p)
{
- checkObjectForAddition(*p, objectsWrapper);
+ checkObjectForAddition(*p, objects);
}
}
@@ -2039,7 +2053,7 @@ Database::checkServerForAddition(const string& id)
}
void
-Database::checkAdapterForAddition(const string& id, const AdaptersWrapperPtr& adaptersWrapper)
+Database::checkAdapterForAddition(const string& id, const StringAdapterInfoDict& adapters)
{
bool found = false;
if(_adapterCache.has(id))
@@ -2048,14 +2062,14 @@ Database::checkAdapterForAddition(const string& id, const AdaptersWrapperPtr& ad
}
else
{
- try
+ StringAdapterInfoDict::const_iterator i = adapters.find(id);
+ if(i != adapters.end())
{
- adaptersWrapper->find(id);
found = true;
}
- catch(const NotFoundException&)
+ else
{
- if(adaptersWrapper->findByReplicaGroupId(id).size() != 0)
+ if(!findByReplicaGroupId(adapters, id).empty())
{
found = true;
}
@@ -2071,7 +2085,7 @@ Database::checkAdapterForAddition(const string& id, const AdaptersWrapperPtr& ad
}
void
-Database::checkObjectForAddition(const Ice::Identity& objectId, const ObjectsWrapperPtr& objectsWrapper)
+Database::checkObjectForAddition(const Ice::Identity& objectId, const IdentityObjectInfoDict& objects)
{
bool found = false;
if(_objectCache.has(objectId) || _allocatableObjectCache.has(objectId))
@@ -2080,14 +2094,11 @@ Database::checkObjectForAddition(const Ice::Identity& objectId, const ObjectsWra
}
else
{
- try
+ IdentityObjectInfoDict::const_iterator i = objects.find(objectId);
+ if(i != objects.end())
{
- objectsWrapper->find(objectId);
found = true;
}
- catch(const NotFoundException&)
- {
- }
}
if(found)
@@ -2323,17 +2334,17 @@ Database::reload(const ApplicationHelper& oldApp,
}
Ice::Long
-Database::saveApplication(const ApplicationInfo& info, const DatabaseConnectionPtr& connection, Ice::Long dbSerial)
+Database::saveApplication(const ApplicationInfo& info, const ConnectionPtr& connection, Ice::Long dbSerial)
{
assert(dbSerial != 0 || _master);
for(;;)
{
try
{
- ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
+ StringApplicationInfoDict applications(connection, applicationsDbName);
TransactionHolder txHolder(connection);
- applicationsWrapper->put(info.descriptor.name, info);
- dbSerial = applicationsWrapper->updateSerial(dbSerial);
+ applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info));
+ dbSerial = updateSerial(connection, applicationsDbName, dbSerial);
txHolder.commit();
break;
}
@@ -2350,17 +2361,17 @@ Database::saveApplication(const ApplicationInfo& info, const DatabaseConnectionP
}
Ice::Long
-Database::removeApplication(const string& name, const DatabaseConnectionPtr& connection, Ice::Long dbSerial)
+Database::removeApplication(const string& name, const ConnectionPtr& connection, Ice::Long dbSerial)
{
assert(dbSerial != 0 || _master);
for(;;)
{
try
{
- ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
+ StringApplicationInfoDict applications(connection, applicationsDbName);
TransactionHolder txHolder(connection);
- applicationsWrapper->erase(name);
- dbSerial = applicationsWrapper->updateSerial(dbSerial);
+ applications.erase(name);
+ dbSerial = updateSerial(connection, applicationsDbName, dbSerial);
txHolder.commit();
break;
}
@@ -2560,7 +2571,7 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
Ice::Long dbSerial)
{
const ApplicationDescriptor& newDesc = helper.getDefinition();
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
ServerEntrySeq entries;
int serial = 0;