summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
authorDwayne Boone <dwayne@zeroc.com>2009-09-28 11:05:44 -0230
committerDwayne Boone <dwayne@zeroc.com>2009-09-28 11:05:44 -0230
commit7d20430028f05cc26c412465176a75ce4ea5af9e (patch)
tree593695acf366f7e3a7081d15af8f474683ce4af7 /cpp/src/IceGrid/Database.cpp
parentRemoved unused __checkTwoway(const char*) from Proxy (diff)
downloadice-7d20430028f05cc26c412465176a75ce4ea5af9e.tar.bz2
ice-7d20430028f05cc26c412465176a75ce4ea5af9e.tar.xz
ice-7d20430028f05cc26c412465176a75ce4ea5af9e.zip
Bug 3231 - alternative storage for IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp517
1 files changed, 347 insertions, 170 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index dc87fad3849..65379e1dff7 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -9,7 +9,6 @@
#include <IceUtil/StringUtil.h>
#include <IceUtil/Random.h>
-#include <Freeze/Freeze.h>
#include <IceGrid/Database.h>
#include <IceGrid/TraceLevels.h>
#include <IceGrid/Util.h>
@@ -18,6 +17,14 @@
#include <IceGrid/ReplicaSessionI.h>
#include <IceGrid/Session.h>
#include <IceGrid/Topics.h>
+#include <IceGrid/DatabaseWrapper.h>
+#ifdef QTSQL
+# include <IceUtil/Functional.h>
+# include <Ice/Communicator.h>
+# include <Ice/Instance.h>
+# include <Ice/ObjectAdapter.h>
+# include <Ice/LoggerUtil.h>
+#endif
#include <algorithm>
#include <functional>
@@ -26,10 +33,11 @@
using namespace std;
using namespace IceGrid;
-const string Database::_applicationDbName = "applications";
-const string Database::_adapterDbName = "adapters";
-const string Database::_objectDbName = "objects";
-const string Database::_internalObjectDbName = "internal-objects";
+#ifdef QTSQL
+using namespace IceSQL;
+#else
+using namespace Freeze;
+#endif
namespace IceGrid
{
@@ -88,16 +96,20 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_objectCache(_communicator),
_allocatableObjectCache(_communicator),
_serverCache(_communicator, _instanceName, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache),
- _connection(Freeze::createConnection(registryAdapter->getCommunicator(), _envName)),
- _applications(_connection, _applicationDbName),
- _adapters(_connection, _adapterDbName),
- _objects(_connection, _objectDbName),
- _internalObjects(_connection, _internalObjectDbName),
+ _databaseCache(new IceGrid::DatabaseCache(_communicator, _envName, _instanceName, info.name)),
_lock(0),
_applicationSerial(0)
{
ServerEntrySeq entries;
- for(StringApplicationInfoDict::iterator p = _applications.begin(); p != _applications.end(); ++p)
+
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection);
+#ifdef QTSQL
+ StringApplicationInfoDict applications = applicationsWrapper.getMap();
+#else
+ StringApplicationInfoDict& applications = applicationsWrapper.getMap();
+#endif
+ for(StringApplicationInfoDict::iterator p = applications.begin(); p != applications.end(); ++p)
{
try
{
@@ -119,9 +131,13 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter);
_registryObserverTopic = new RegistryObserverTopic(_topicManager);
- _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, _applications);
- _adapterObserverTopic = new AdapterObserverTopic(_topicManager, _adapters);
- _objectObserverTopic = new ObjectObserverTopic(_topicManager, _objects);
+ _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, applicationsWrapper.getMap());
+
+ AdaptersDictWrapper adaptersWrapper(_databaseCache, connection);
+ _adapterObserverTopic = new AdapterObserverTopic(_topicManager, adaptersWrapper.getMap());
+
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+ _objectObserverTopic = new ObjectObserverTopic(_topicManager, objectsWrapper.getMap());
_registryObserverTopic->registryUp(info);
}
@@ -207,27 +223,31 @@ Database::unlock(AdminSessionI* session)
}
void
-Database::syncApplications(const ApplicationInfoSeq& applications)
+Database::syncApplications(const ApplicationInfoSeq& newApplications)
{
int serial = 0; // Initialize to prevent warning.
{
Lock sync(*this);
- Freeze::TransactionHolder txHolder(_connection);
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txHolder(connection);
+
ServerEntrySeq entries;
set<string> names;
- for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p)
+
+ ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection);
+ for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p)
{
try
{
- StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name);
- if(s != _applications.end())
+ try
{
- ApplicationHelper previous(_communicator, s->second.descriptor);
+ ApplicationInfo info = applicationsWrapper.find(p->descriptor.name);
+ ApplicationHelper previous(_communicator, info.descriptor);
ApplicationHelper helper(_communicator, p->descriptor);
reload(previous, helper, entries, p->uuid, p->revision);
}
- else
+ catch(const NotFoundException&)
{
load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision);
}
@@ -237,27 +257,28 @@ Database::syncApplications(const ApplicationInfoSeq& applications)
Ice::Warning warn(_traceLevels->logger);
warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason;
}
- _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p));
+ applicationsWrapper.put(p->descriptor.name, *p);
names.insert(p->descriptor.name);
}
- StringApplicationInfoDict::iterator s = _applications.begin();
- while(s != _applications.end())
+#ifdef QTSQL
+ StringApplicationInfoDict applications = applicationsWrapper.getMap();
+#else
+ StringApplicationInfoDict& applications = applicationsWrapper.getMap();
+#endif
+ StringApplicationInfoDict::iterator s = applications.begin();
+ while(s != applications.end())
{
if(names.find(s->first) == names.end())
{
unload(ApplicationHelper(_communicator, s->second.descriptor), entries);
- _applications.erase(s++);
- }
- else
- {
- ++s;
+ applicationsWrapper.erase(s->first);
}
+ ++s;
}
++_applicationSerial;
- serial = _applicationObserverTopic->applicationInit(_applicationSerial, applications);
-
+ serial = _applicationObserverTopic->applicationInit(_applicationSerial, newApplications);
txHolder.commit();
}
_applicationObserverTopic->waitForSyncedSubscribers(serial);
@@ -269,11 +290,15 @@ Database::syncAdapters(const AdapterInfoSeq& adapters)
int serial;
{
Lock sync(*this);
- Freeze::TransactionHolder txHolder(_connection);
- _adapters.clear();
+
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txHolder(connection);
+
+ AdaptersDictWrapper adaptersWrapper(_databaseCache, connection);
+ adaptersWrapper.clear();
for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r)
{
- _adapters.put(StringAdapterInfoDict::value_type(r->id, *r));
+ adaptersWrapper.put(*r);
}
serial = _adapterObserverTopic->adapterInit(adapters);
txHolder.commit();
@@ -287,11 +312,16 @@ Database::syncObjects(const ObjectInfoSeq& objects)
int serial;
{
Lock sync(*this);
- Freeze::TransactionHolder txHolder(_connection);
- _objects.clear();
+
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txHolder(connection);
+
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+
+ objectsWrapper.clear();
for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
{
- _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q));
+ objectsWrapper.put(q->proxy->ice_getIdentity(), *q);
}
serial = _objectObserverTopic->objectInit(objects);
txHolder.commit();
@@ -302,6 +332,9 @@ Database::syncObjects(const ObjectInfoSeq& objects)
void
Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
{
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection);
+
ServerEntrySeq entries;
{
Lock sync(*this);
@@ -309,10 +342,14 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
waitForUpdate(info.descriptor.name);
- if(_applications.find(info.descriptor.name) != _applications.end())
+ try
{
+ applicationsWrapper.find(info.descriptor.name);
throw DeploymentException("application `" + info.descriptor.name + "' already exists");
- }
+ }
+ catch(const NotFoundException&)
+ {
+ }
ApplicationHelper helper(_communicator, info.descriptor, true);
checkForAddition(helper);
@@ -347,9 +384,9 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
int serial;
{
Lock sync(*this);
- ++_applicationSerial;
- _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info));
+ ++_applicationSerial;
+ applicationsWrapper.put(info.descriptor.name, info);
serial = _applicationObserverTopic->applicationAdded(_applicationSerial, info);
if(_traceLevels->application > 0)
@@ -377,12 +414,16 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* se
waitForUpdate(update.descriptor.name);
- StringApplicationInfoDict::const_iterator p = _applications.find(update.descriptor.name);
- if(p == _applications.end())
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection);
+ try
+ {
+ oldApp = applicationsWrapper.find(update.descriptor.name);
+ }
+ catch(const NotFoundException&)
{
throw ApplicationNotExistException(update.descriptor.name);
}
- oldApp = p->second;
if(update.revision < 0)
{
@@ -415,12 +456,16 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS
waitForUpdate(newDesc.name);
- StringApplicationInfoDict::const_iterator p = _applications.find(newDesc.name);
- if(p == _applications.end())
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection);
+ try
+ {
+ oldApp = applicationsWrapper.find(newDesc.name);
+ }
+ catch(const NotFoundException&)
{
throw ApplicationNotExistException(newDesc.name);
}
- oldApp = p->second;
ApplicationHelper previous(_communicator, oldApp.descriptor);
ApplicationHelper helper(_communicator, newDesc, true);
@@ -455,12 +500,16 @@ Database::instantiateServer(const string& application,
waitForUpdate(application);
- StringApplicationInfoDict::const_iterator p = _applications.find(application);
- if(p == _applications.end())
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection);
+ try
+ {
+ oldApp = applicationsWrapper.find(application);
+ }
+ catch(const NotFoundException&)
{
throw ApplicationNotExistException(application);
}
- oldApp = p->second;
ApplicationHelper previous(_communicator, oldApp.descriptor);
ApplicationHelper helper(_communicator, previous.instantiateServer(node, instance), true);
@@ -484,6 +533,9 @@ Database::instantiateServer(const string& application,
void
Database::removeApplication(const string& name, AdminSessionI* session)
{
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection);
+
ServerEntrySeq entries;
int serial;
{
@@ -492,8 +544,12 @@ Database::removeApplication(const string& name, AdminSessionI* session)
waitForUpdate(name);
- StringApplicationInfoDict::iterator p = _applications.find(name);
- if(p == _applications.end())
+ ApplicationInfo appInfo;
+ try
+ {
+ appInfo = applicationsWrapper.find(name);
+ }
+ catch(const NotFoundException&)
{
throw ApplicationNotExistException(name);
}
@@ -501,7 +557,7 @@ Database::removeApplication(const string& name, AdminSessionI* session)
bool init = false;
try
{
- ApplicationHelper helper(_communicator, p->second.descriptor);
+ ApplicationHelper helper(_communicator, appInfo.descriptor);
init = true;
checkForRemove(helper);
unload(helper, entries);
@@ -520,7 +576,7 @@ Database::removeApplication(const string& name, AdminSessionI* session)
//
}
- startUpdating(name, p->second.uuid, p->second.revision);
+ startUpdating(name, appInfo.uuid, appInfo.revision);
}
if(_master)
@@ -531,7 +587,7 @@ Database::removeApplication(const string& name, AdminSessionI* session)
{
Lock sync(*this);
- _applications.erase(name);
+ applicationsWrapper.erase(name);
++_applicationSerial;
serial = _applicationObserverTopic->applicationRemoved(_applicationSerial, name);
@@ -551,24 +607,24 @@ Database::removeApplication(const string& name, AdminSessionI* session)
ApplicationInfo
Database::getApplicationInfo(const std::string& name)
{
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringApplicationInfoDict descriptors(connection, _applicationDbName);
-
- StringApplicationInfoDict::const_iterator p = descriptors.find(name);
- if(p == descriptors.end())
+ DatabaseConnectionPtr connection = _databaseCache->newConnection();
+ try
+ {
+ ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection);
+ return applicationsWrapper.find(name);
+ }
+ catch(const NotFoundException&)
{
throw ApplicationNotExistException(name);
}
-
- return p->second;
}
Ice::StringSeq
Database::getAllApplications(const string& expression)
{
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringApplicationInfoDict descriptors(connection, _applicationDbName);
- return getMatchingKeys<StringApplicationInfoDict>(descriptors, expression);
+ DatabaseConnectionPtr connection = _databaseCache->newConnection();
+ ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection);
+ return getMatchingKeys<StringApplicationInfoDict>(applicationsWrapper.getMap(), expression);
}
void
@@ -648,17 +704,27 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
throw AdapterExistsException(adapterId);
}
- StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
AdapterInfo info;
+ bool found = false;
+
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ AdaptersDictWrapper adaptersWrapper(_databaseCache, connection);
+ try
+ {
+ info = adaptersWrapper.find(adapterId);
+ found = true;
+ }
+ catch(const NotFoundException&)
+ {
+ }
bool updated = false;
if(proxy)
{
- if(p != _adapters.end())
+ if(found)
{
- info = p->second;
info.proxy = proxy;
info.replicaGroupId = replicaGroupId;
- p.set(info);
+ adaptersWrapper.put(info);
updated = true;
}
else
@@ -666,16 +732,16 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
info.id = adapterId;
info.proxy = proxy;
info.replicaGroupId = replicaGroupId;
- _adapters.put(StringAdapterInfoDict::value_type(adapterId, info));
+ adaptersWrapper.put(info);
}
}
else
{
- if(p == _adapters.end())
+ if(!found)
{
return;
}
- _adapters.erase(p);
+ adaptersWrapper.erase(adapterId);
}
if(_traceLevels->adapter > 0)
@@ -710,18 +776,21 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
Ice::ObjectPrx
Database::getAdapterDirectProxy(const string& id)
{
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringAdapterInfoDict adapters(connection, _adapterDbName);
- StringAdapterInfoDict::const_iterator p = adapters.find(id);
- if(p != adapters.end())
+ DatabaseConnectionPtr connection = _databaseCache->newConnection();
+ AdaptersDictWrapper adaptersWrapper(_databaseCache, connection);
+ try
+ {
+ return adaptersWrapper.find(id).proxy;
+ }
+ catch(const NotFoundException&)
{
- return p->second.proxy;
}
Ice::EndpointSeq endpoints;
- for(p = adapters.findByReplicaGroupId(id, true); p != adapters.end(); ++p)
+ vector<AdapterInfo> infos = adaptersWrapper.findByReplicaGroupId(id);
+ for(unsigned int i = 0; i < infos.size(); ++i)
{
- Ice::EndpointSeq edpts = p->second.proxy->ice_getEndpoints();
+ Ice::EndpointSeq edpts = infos[i].proxy->ice_getEndpoints();
endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
}
if(!endpoints.empty())
@@ -747,29 +816,28 @@ Database::removeAdapter(const string& adapterId)
throw ex;
}
- Freeze::TransactionHolder txHolder(_connection); // Required because of the iterator
-
- StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txHolder(connection);
+
+ AdaptersDictWrapper adaptersWrapper(_databaseCache, connection);
+
AdapterInfoSeq infos;
- if(p != _adapters.end())
+ try
{
- _adapters.erase(p);
+ adaptersWrapper.find(adapterId);
+ adaptersWrapper.erase(adapterId);
}
- else
+ catch(const NotFoundException&)
{
- p = _adapters.findByReplicaGroupId(adapterId, true);
- if(p == _adapters.end())
+ infos = adaptersWrapper.findByReplicaGroupId(adapterId);
+ if(infos.size() == 0)
{
throw AdapterNotExistException(adapterId);
}
-
- while(p != _adapters.end())
+ for(unsigned int i = 0; i < infos.size(); ++i)
{
- AdapterInfo info = p->second;
- info.replicaGroupId = "";
- infos.push_back(info);
- _adapters.put(StringAdapterInfoDict::value_type(p->first, info));
- ++p;
+ infos[i].replicaGroupId = "";
+ adaptersWrapper.put(infos[i]);
}
}
@@ -790,7 +858,6 @@ Database::removeAdapter(const string& adapterId)
serial = _adapterObserverTopic->adapterUpdated(*p);
}
}
-
txHolder.commit();
}
_adapterObserverTopic->waitForSyncedSubscribers(serial);
@@ -836,33 +903,26 @@ Database::getAdapterInfo(const string& id)
// Otherwise, we check the adapter endpoint table -- if there's an
// entry the adapter is managed by the registry itself.
//
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringAdapterInfoDict adapters(connection, _adapterDbName);
- StringAdapterInfoDict::const_iterator p = adapters.find(id);
- if(p != adapters.end())
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ AdaptersDictWrapper adaptersWrapper(_databaseCache, connection);
+ AdapterInfoSeq infos;
+ try
{
- AdapterInfoSeq infos;
- infos.push_back(p->second);
- return infos;
+ infos.push_back(adaptersWrapper.find(id));
}
-
- //
- // If it's not a regular object adapter, perhaps it's a replica
- // group...
- //
- p = adapters.findByReplicaGroupId(id, true);
- if(p != adapters.end())
+ catch(const NotFoundException&)
{
- AdapterInfoSeq infos;
- while(p != adapters.end())
+ //
+ // If it's not a regular object adapter, perhaps it's a replica
+ // group...
+ //
+ infos = adaptersWrapper.findByReplicaGroupId(id);
+ if(infos.size() == 0)
{
- infos.push_back(p->second);
- ++p;
+ throw AdapterNotExistException(id);
}
- return infos;
}
-
- throw AdapterNotExistException(id);
+ return infos;
}
@@ -874,7 +934,15 @@ Database::getAllAdapters(const string& expression)
vector<string> ids = _adapterCache.getAll(expression);
result.swap(ids);
set<string> groups;
- for(StringAdapterInfoDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ AdaptersDictWrapper adaptersWrapper(_databaseCache, connection);
+#ifdef QTSQL
+ StringAdapterInfoDict adapters = adaptersWrapper.getMap();
+#else
+ StringAdapterInfoDict& adapters = adaptersWrapper.getMap();
+#endif
+ for(StringAdapterInfoDict::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
{
if(expression.empty() || IceUtilInternal::match(p->first, expression, true))
{
@@ -910,11 +978,17 @@ Database::addObject(const ObjectInfo& info)
throw ObjectExistsException(id);
}
- if(_objects.find(id) != _objects.end())
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+ try
{
+ objectsWrapper.find(id);
throw ObjectExistsException(id);
}
- _objects.put(IdentityObjectInfoDict::value_type(id, info));
+ catch(const NotFoundException&)
+ {
+ }
+ objectsWrapper.put(id, info);
serial = _objectObserverTopic->objectAdded(info);
@@ -940,8 +1014,18 @@ Database::addOrUpdateObject(const ObjectInfo& info)
throw ObjectExistsException(id);
}
- bool update = _objects.find(id) != _objects.end();
- _objects.put(IdentityObjectInfoDict::value_type(id, info));
+ bool update = false;
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+ try
+ {
+ objectsWrapper.find(id);
+ update = true;
+ }
+ catch(const NotFoundException&)
+ {
+ }
+ objectsWrapper.put(id, info);
if(update)
{
@@ -977,14 +1061,20 @@ Database::removeObject(const Ice::Identity& id)
throw ex;
}
- IdentityObjectInfoDict::iterator p = _objects.find(id);
- if(p == _objects.end())
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+ try
+ {
+ objectsWrapper.find(id);
+ }
+ catch(const NotFoundException&)
{
ObjectNotRegisteredException ex;
ex.id = id;
throw ex;
}
- _objects.erase(p);
+
+ objectsWrapper.erase(id);
serial = _objectObserverTopic->objectRemoved(id);
@@ -1015,18 +1105,23 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
throw ex;
}
- IdentityObjectInfoDict::iterator p = _objects.find(id);
- if(p == _objects.end())
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+
+ ObjectInfo info;
+ try
+ {
+ info = objectsWrapper.find(id);
+ }
+ catch(const NotFoundException&)
{
ObjectNotRegisteredException ex;
ex.id = id;
throw ex;
}
- ObjectInfo info;
- info = p->second;
info.proxy = proxy;
- p.set(info);
+ objectsWrapper.put(id, info);
serial = _objectObserverTopic->objectUpdated(info);
@@ -1043,12 +1138,17 @@ int
Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects)
{
Lock sync(*this);
- Freeze::TransactionHolder txHolder(_connection);
+
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txHolder(connection);
+
for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
- _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p));
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+ objectsWrapper.put(p->proxy->ice_getIdentity(), *p);
}
int serial = _objectObserverTopic->objectsAddedOrUpdated(objects);
+
txHolder.commit();
return serial;
}
@@ -1057,12 +1157,17 @@ void
Database::removeObjectsInDatabase(const ObjectInfoSeq& objects)
{
Lock sync(*this);
- Freeze::TransactionHolder txHolder(_connection);
+
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txHolder(connection);
+
for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
- _objects.erase(p->proxy->ice_getIdentity());
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+ objectsWrapper.erase(p->proxy->ice_getIdentity());
}
_objectObserverTopic->objectsRemoved(objects);
+
txHolder.commit();
}
@@ -1080,16 +1185,18 @@ Database::getObjectProxy(const Ice::Identity& id)
{
}
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, _objectDbName);
- IdentityObjectInfoDict::const_iterator p = objects.find(id);
- if(p == objects.end())
+ DatabaseConnectionPtr connection = _databaseCache->newConnection();
+ try
+ {
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+ return objectsWrapper.find(id).proxy;
+ }
+ catch(const NotFoundException&)
{
ObjectNotRegisteredException ex;
ex.id = id;
throw ex;
}
- return p->second.proxy;
}
Ice::ObjectPrx
@@ -1139,11 +1246,13 @@ Ice::ObjectProxySeq
Database::getObjectsByType(const string& type)
{
Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type);
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, _objectDbName);
- for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p)
+
+ DatabaseConnectionPtr connection = _databaseCache->newConnection();
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+ vector<ObjectInfo> infos = objectsWrapper.findByType(type);
+ for(unsigned int i = 0; i < infos.size(); ++i)
{
- proxies.push_back(p->second.proxy);
+ proxies.push_back(infos[i].proxy);
}
return proxies;
}
@@ -1160,22 +1269,30 @@ Database::getObjectInfo(const Ice::Identity& id)
{
}
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, _objectDbName);
- IdentityObjectInfoDict::const_iterator p = objects.find(id);
- if(p == objects.end())
+ DatabaseConnectionPtr connection = _databaseCache->newConnection();
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+ try
+ {
+ return objectsWrapper.find(id);
+ }
+ catch(const NotFoundException&)
{
throw ObjectNotRegisteredException(id);
}
- return p->second;
}
ObjectInfoSeq
Database::getAllObjectInfos(const string& expression)
{
ObjectInfoSeq infos = _objectCache.getAll(expression);
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, _objectDbName);
+
+ DatabaseConnectionPtr connection = _databaseCache->newConnection();
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+#ifdef QTSQL
+ IdentityObjectInfoDict objects = objectsWrapper.getMap();
+#else
+ IdentityObjectInfoDict& objects = objectsWrapper.getMap();
+#endif
for(IdentityObjectInfoDict::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
if(expression.empty() || IceUtilInternal::match(_communicator->identityToString(p->first), expression, true))
@@ -1190,11 +1307,13 @@ ObjectInfoSeq
Database::getObjectInfosByType(const string& type)
{
ObjectInfoSeq infos = _objectCache.getAllByType(type);
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, _objectDbName);
- for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p)
+
+ DatabaseConnectionPtr connection = _databaseCache->newConnection();
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+ ObjectInfoSeq dbInfos = objectsWrapper.findByType(type);
+ for(unsigned int i = 0; i < dbInfos.size(); ++i)
{
- infos.push_back(p->second);
+ infos.push_back(dbInfos[i]);
}
return infos;
}
@@ -1204,36 +1323,54 @@ Database::addInternalObject(const ObjectInfo& info, bool replace)
{
Lock sync(*this);
const Ice::Identity id = info.proxy->ice_getIdentity();
- if(!replace && _internalObjects.find(id) != _internalObjects.end())
+
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ InternalObjectsDictWrapper internalObjectsWrapper(_databaseCache, connection);
+ if(!replace)
{
- throw ObjectExistsException(id);
+ try
+ {
+ internalObjectsWrapper.find(id);
+ throw ObjectExistsException(id);
+ }
+ catch(const NotFoundException&)
+ {
+ }
}
- _internalObjects.put(IdentityObjectInfoDict::value_type(id, info));
+ internalObjectsWrapper.put(id, info);
}
void
Database::removeInternalObject(const Ice::Identity& id)
{
Lock sync(*this);
- IdentityObjectInfoDict::iterator p = _internalObjects.find(id);
- if(p == _internalObjects.end())
+
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ InternalObjectsDictWrapper internalObjectsWrapper(_databaseCache, connection);
+ try
+ {
+ internalObjectsWrapper.find(id);
+ }
+ catch(const NotFoundException&)
{
ObjectNotRegisteredException ex;
ex.id = id;
throw ex;
}
- _internalObjects.erase(p);
+ internalObjectsWrapper.erase(id);
}
Ice::ObjectProxySeq
Database::getInternalObjectsByType(const string& type)
{
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict internalObjects(connection, _internalObjectDbName);
Ice::ObjectProxySeq proxies;
- for(IdentityObjectInfoDict::const_iterator p = internalObjects.findByType(type); p != internalObjects.end(); ++p)
+
+ DatabaseConnectionPtr connection = _databaseCache->newConnection();
+ InternalObjectsDictWrapper internalObjectsWrapper(_databaseCache, connection);
+ vector<ObjectInfo> infos = internalObjectsWrapper.findByType(type);
+ for(unsigned int i = 0; i < infos.size(); ++i)
{
- proxies.push_back(p->second.proxy);
+ proxies.push_back(infos[i].proxy);
}
return proxies;
}
@@ -1327,9 +1464,30 @@ Database::checkServerForAddition(const string& id)
void
Database::checkAdapterForAddition(const string& id)
{
- if(_adapterCache.has(id) ||
- _adapters.find(id) != _adapters.end() ||
- _adapters.findByReplicaGroupId(id) != _adapters.end())
+ bool found = false;
+ if(_adapterCache.has(id))
+ {
+ found = true;
+ }
+ else
+ {
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ AdaptersDictWrapper adaptersWrapper(_databaseCache, connection);
+ try
+ {
+ adaptersWrapper.find(id);
+ found = true;
+ }
+ catch(const NotFoundException&)
+ {
+ if(adaptersWrapper.findByReplicaGroupId(id).size() != 0)
+ {
+ found = true;
+ }
+ }
+ }
+
+ if(found)
{
DeploymentException ex;
ex.reason = "adapter `" + id + "' is already registered";
@@ -1340,9 +1498,26 @@ Database::checkAdapterForAddition(const string& id)
void
Database::checkObjectForAddition(const Ice::Identity& objectId)
{
- if(_objectCache.has(objectId) ||
- _allocatableObjectCache.has(objectId) ||
- _objects.find(objectId) != _objects.end())
+ bool found = false;
+ if(_objectCache.has(objectId) || _allocatableObjectCache.has(objectId))
+ {
+ found = true;
+ }
+ else
+ {
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ ObjectsDictWrapper objectsWrapper(_databaseCache, connection);
+ try
+ {
+ objectsWrapper.find(objectId);
+ found = true;
+ }
+ catch(const NotFoundException&)
+ {
+ }
+ }
+
+ if(found)
{
DeploymentException ex;
ex.reason = "object `" + _communicator->identityToString(objectId) + "' is already registered";
@@ -1642,7 +1817,9 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries,
info.revision = update.revision;
info.descriptor = newDesc;
- _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info));
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection);
+ applicationsWrapper.put(update.descriptor.name, info);
++_applicationSerial;
if(_traceLevels->application > 0)