diff options
author | Dwayne Boone <dwayne@zeroc.com> | 2009-09-28 11:05:44 -0230 |
---|---|---|
committer | Dwayne Boone <dwayne@zeroc.com> | 2009-09-28 11:05:44 -0230 |
commit | 7d20430028f05cc26c412465176a75ce4ea5af9e (patch) | |
tree | 593695acf366f7e3a7081d15af8f474683ce4af7 /cpp/src/IceGrid/Database.cpp | |
parent | Removed unused __checkTwoway(const char*) from Proxy (diff) | |
download | ice-7d20430028f05cc26c412465176a75ce4ea5af9e.tar.bz2 ice-7d20430028f05cc26c412465176a75ce4ea5af9e.tar.xz ice-7d20430028f05cc26c412465176a75ce4ea5af9e.zip |
Bug 3231 - alternative storage for IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 517 |
1 files changed, 347 insertions, 170 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index dc87fad3849..65379e1dff7 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -9,7 +9,6 @@ #include <IceUtil/StringUtil.h> #include <IceUtil/Random.h> -#include <Freeze/Freeze.h> #include <IceGrid/Database.h> #include <IceGrid/TraceLevels.h> #include <IceGrid/Util.h> @@ -18,6 +17,14 @@ #include <IceGrid/ReplicaSessionI.h> #include <IceGrid/Session.h> #include <IceGrid/Topics.h> +#include <IceGrid/DatabaseWrapper.h> +#ifdef QTSQL +# include <IceUtil/Functional.h> +# include <Ice/Communicator.h> +# include <Ice/Instance.h> +# include <Ice/ObjectAdapter.h> +# include <Ice/LoggerUtil.h> +#endif #include <algorithm> #include <functional> @@ -26,10 +33,11 @@ using namespace std; using namespace IceGrid; -const string Database::_applicationDbName = "applications"; -const string Database::_adapterDbName = "adapters"; -const string Database::_objectDbName = "objects"; -const string Database::_internalObjectDbName = "internal-objects"; +#ifdef QTSQL +using namespace IceSQL; +#else +using namespace Freeze; +#endif namespace IceGrid { @@ -88,16 +96,20 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _objectCache(_communicator), _allocatableObjectCache(_communicator), _serverCache(_communicator, _instanceName, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache), - _connection(Freeze::createConnection(registryAdapter->getCommunicator(), _envName)), - _applications(_connection, _applicationDbName), - _adapters(_connection, _adapterDbName), - _objects(_connection, _objectDbName), - _internalObjects(_connection, _internalObjectDbName), + _databaseCache(new IceGrid::DatabaseCache(_communicator, _envName, _instanceName, info.name)), _lock(0), _applicationSerial(0) { ServerEntrySeq entries; - for(StringApplicationInfoDict::iterator p = _applications.begin(); p != _applications.end(); ++p) + + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); +#ifdef QTSQL + StringApplicationInfoDict applications = applicationsWrapper.getMap(); +#else + StringApplicationInfoDict& applications = applicationsWrapper.getMap(); +#endif + for(StringApplicationInfoDict::iterator p = applications.begin(); p != applications.end(); ++p) { try { @@ -119,9 +131,13 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter); _registryObserverTopic = new RegistryObserverTopic(_topicManager); - _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, _applications); - _adapterObserverTopic = new AdapterObserverTopic(_topicManager, _adapters); - _objectObserverTopic = new ObjectObserverTopic(_topicManager, _objects); + _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, applicationsWrapper.getMap()); + + AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); + _adapterObserverTopic = new AdapterObserverTopic(_topicManager, adaptersWrapper.getMap()); + + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + _objectObserverTopic = new ObjectObserverTopic(_topicManager, objectsWrapper.getMap()); _registryObserverTopic->registryUp(info); } @@ -207,27 +223,31 @@ Database::unlock(AdminSessionI* session) } void -Database::syncApplications(const ApplicationInfoSeq& applications) +Database::syncApplications(const ApplicationInfoSeq& newApplications) { int serial = 0; // Initialize to prevent warning. { Lock sync(*this); - Freeze::TransactionHolder txHolder(_connection); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + ServerEntrySeq entries; set<string> names; - for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p) + + ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); + for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p) { try { - StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name); - if(s != _applications.end()) + try { - ApplicationHelper previous(_communicator, s->second.descriptor); + ApplicationInfo info = applicationsWrapper.find(p->descriptor.name); + ApplicationHelper previous(_communicator, info.descriptor); ApplicationHelper helper(_communicator, p->descriptor); reload(previous, helper, entries, p->uuid, p->revision); } - else + catch(const NotFoundException&) { load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision); } @@ -237,27 +257,28 @@ Database::syncApplications(const ApplicationInfoSeq& applications) Ice::Warning warn(_traceLevels->logger); warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason; } - _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p)); + applicationsWrapper.put(p->descriptor.name, *p); names.insert(p->descriptor.name); } - StringApplicationInfoDict::iterator s = _applications.begin(); - while(s != _applications.end()) +#ifdef QTSQL + StringApplicationInfoDict applications = applicationsWrapper.getMap(); +#else + StringApplicationInfoDict& applications = applicationsWrapper.getMap(); +#endif + StringApplicationInfoDict::iterator s = applications.begin(); + while(s != applications.end()) { if(names.find(s->first) == names.end()) { unload(ApplicationHelper(_communicator, s->second.descriptor), entries); - _applications.erase(s++); - } - else - { - ++s; + applicationsWrapper.erase(s->first); } + ++s; } ++_applicationSerial; - serial = _applicationObserverTopic->applicationInit(_applicationSerial, applications); - + serial = _applicationObserverTopic->applicationInit(_applicationSerial, newApplications); txHolder.commit(); } _applicationObserverTopic->waitForSyncedSubscribers(serial); @@ -269,11 +290,15 @@ Database::syncAdapters(const AdapterInfoSeq& adapters) int serial; { Lock sync(*this); - Freeze::TransactionHolder txHolder(_connection); - _adapters.clear(); + + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + + AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); + adaptersWrapper.clear(); for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r) { - _adapters.put(StringAdapterInfoDict::value_type(r->id, *r)); + adaptersWrapper.put(*r); } serial = _adapterObserverTopic->adapterInit(adapters); txHolder.commit(); @@ -287,11 +312,16 @@ Database::syncObjects(const ObjectInfoSeq& objects) int serial; { Lock sync(*this); - Freeze::TransactionHolder txHolder(_connection); - _objects.clear(); + + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + + objectsWrapper.clear(); for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) { - _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); + objectsWrapper.put(q->proxy->ice_getIdentity(), *q); } serial = _objectObserverTopic->objectInit(objects); txHolder.commit(); @@ -302,6 +332,9 @@ Database::syncObjects(const ObjectInfoSeq& objects) void Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); + ServerEntrySeq entries; { Lock sync(*this); @@ -309,10 +342,14 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) waitForUpdate(info.descriptor.name); - if(_applications.find(info.descriptor.name) != _applications.end()) + try { + applicationsWrapper.find(info.descriptor.name); throw DeploymentException("application `" + info.descriptor.name + "' already exists"); - } + } + catch(const NotFoundException&) + { + } ApplicationHelper helper(_communicator, info.descriptor, true); checkForAddition(helper); @@ -347,9 +384,9 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) int serial; { Lock sync(*this); - ++_applicationSerial; - _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info)); + ++_applicationSerial; + applicationsWrapper.put(info.descriptor.name, info); serial = _applicationObserverTopic->applicationAdded(_applicationSerial, info); if(_traceLevels->application > 0) @@ -377,12 +414,16 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* se waitForUpdate(update.descriptor.name); - StringApplicationInfoDict::const_iterator p = _applications.find(update.descriptor.name); - if(p == _applications.end()) + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); + try + { + oldApp = applicationsWrapper.find(update.descriptor.name); + } + catch(const NotFoundException&) { throw ApplicationNotExistException(update.descriptor.name); } - oldApp = p->second; if(update.revision < 0) { @@ -415,12 +456,16 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS waitForUpdate(newDesc.name); - StringApplicationInfoDict::const_iterator p = _applications.find(newDesc.name); - if(p == _applications.end()) + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); + try + { + oldApp = applicationsWrapper.find(newDesc.name); + } + catch(const NotFoundException&) { throw ApplicationNotExistException(newDesc.name); } - oldApp = p->second; ApplicationHelper previous(_communicator, oldApp.descriptor); ApplicationHelper helper(_communicator, newDesc, true); @@ -455,12 +500,16 @@ Database::instantiateServer(const string& application, waitForUpdate(application); - StringApplicationInfoDict::const_iterator p = _applications.find(application); - if(p == _applications.end()) + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); + try + { + oldApp = applicationsWrapper.find(application); + } + catch(const NotFoundException&) { throw ApplicationNotExistException(application); } - oldApp = p->second; ApplicationHelper previous(_communicator, oldApp.descriptor); ApplicationHelper helper(_communicator, previous.instantiateServer(node, instance), true); @@ -484,6 +533,9 @@ Database::instantiateServer(const string& application, void Database::removeApplication(const string& name, AdminSessionI* session) { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); + ServerEntrySeq entries; int serial; { @@ -492,8 +544,12 @@ Database::removeApplication(const string& name, AdminSessionI* session) waitForUpdate(name); - StringApplicationInfoDict::iterator p = _applications.find(name); - if(p == _applications.end()) + ApplicationInfo appInfo; + try + { + appInfo = applicationsWrapper.find(name); + } + catch(const NotFoundException&) { throw ApplicationNotExistException(name); } @@ -501,7 +557,7 @@ Database::removeApplication(const string& name, AdminSessionI* session) bool init = false; try { - ApplicationHelper helper(_communicator, p->second.descriptor); + ApplicationHelper helper(_communicator, appInfo.descriptor); init = true; checkForRemove(helper); unload(helper, entries); @@ -520,7 +576,7 @@ Database::removeApplication(const string& name, AdminSessionI* session) // } - startUpdating(name, p->second.uuid, p->second.revision); + startUpdating(name, appInfo.uuid, appInfo.revision); } if(_master) @@ -531,7 +587,7 @@ Database::removeApplication(const string& name, AdminSessionI* session) { Lock sync(*this); - _applications.erase(name); + applicationsWrapper.erase(name); ++_applicationSerial; serial = _applicationObserverTopic->applicationRemoved(_applicationSerial, name); @@ -551,24 +607,24 @@ Database::removeApplication(const string& name, AdminSessionI* session) ApplicationInfo Database::getApplicationInfo(const std::string& name) { - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringApplicationInfoDict descriptors(connection, _applicationDbName); - - StringApplicationInfoDict::const_iterator p = descriptors.find(name); - if(p == descriptors.end()) + DatabaseConnectionPtr connection = _databaseCache->newConnection(); + try + { + ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); + return applicationsWrapper.find(name); + } + catch(const NotFoundException&) { throw ApplicationNotExistException(name); } - - return p->second; } Ice::StringSeq Database::getAllApplications(const string& expression) { - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringApplicationInfoDict descriptors(connection, _applicationDbName); - return getMatchingKeys<StringApplicationInfoDict>(descriptors, expression); + DatabaseConnectionPtr connection = _databaseCache->newConnection(); + ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); + return getMatchingKeys<StringApplicationInfoDict>(applicationsWrapper.getMap(), expression); } void @@ -648,17 +704,27 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr throw AdapterExistsException(adapterId); } - StringAdapterInfoDict::iterator p = _adapters.find(adapterId); AdapterInfo info; + bool found = false; + + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); + try + { + info = adaptersWrapper.find(adapterId); + found = true; + } + catch(const NotFoundException&) + { + } bool updated = false; if(proxy) { - if(p != _adapters.end()) + if(found) { - info = p->second; info.proxy = proxy; info.replicaGroupId = replicaGroupId; - p.set(info); + adaptersWrapper.put(info); updated = true; } else @@ -666,16 +732,16 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr info.id = adapterId; info.proxy = proxy; info.replicaGroupId = replicaGroupId; - _adapters.put(StringAdapterInfoDict::value_type(adapterId, info)); + adaptersWrapper.put(info); } } else { - if(p == _adapters.end()) + if(!found) { return; } - _adapters.erase(p); + adaptersWrapper.erase(adapterId); } if(_traceLevels->adapter > 0) @@ -710,18 +776,21 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr Ice::ObjectPrx Database::getAdapterDirectProxy(const string& id) { - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringAdapterInfoDict adapters(connection, _adapterDbName); - StringAdapterInfoDict::const_iterator p = adapters.find(id); - if(p != adapters.end()) + DatabaseConnectionPtr connection = _databaseCache->newConnection(); + AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); + try + { + return adaptersWrapper.find(id).proxy; + } + catch(const NotFoundException&) { - return p->second.proxy; } Ice::EndpointSeq endpoints; - for(p = adapters.findByReplicaGroupId(id, true); p != adapters.end(); ++p) + vector<AdapterInfo> infos = adaptersWrapper.findByReplicaGroupId(id); + for(unsigned int i = 0; i < infos.size(); ++i) { - Ice::EndpointSeq edpts = p->second.proxy->ice_getEndpoints(); + Ice::EndpointSeq edpts = infos[i].proxy->ice_getEndpoints(); endpoints.insert(endpoints.end(), edpts.begin(), edpts.end()); } if(!endpoints.empty()) @@ -747,29 +816,28 @@ Database::removeAdapter(const string& adapterId) throw ex; } - Freeze::TransactionHolder txHolder(_connection); // Required because of the iterator - - StringAdapterInfoDict::iterator p = _adapters.find(adapterId); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + + AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); + AdapterInfoSeq infos; - if(p != _adapters.end()) + try { - _adapters.erase(p); + adaptersWrapper.find(adapterId); + adaptersWrapper.erase(adapterId); } - else + catch(const NotFoundException&) { - p = _adapters.findByReplicaGroupId(adapterId, true); - if(p == _adapters.end()) + infos = adaptersWrapper.findByReplicaGroupId(adapterId); + if(infos.size() == 0) { throw AdapterNotExistException(adapterId); } - - while(p != _adapters.end()) + for(unsigned int i = 0; i < infos.size(); ++i) { - AdapterInfo info = p->second; - info.replicaGroupId = ""; - infos.push_back(info); - _adapters.put(StringAdapterInfoDict::value_type(p->first, info)); - ++p; + infos[i].replicaGroupId = ""; + adaptersWrapper.put(infos[i]); } } @@ -790,7 +858,6 @@ Database::removeAdapter(const string& adapterId) serial = _adapterObserverTopic->adapterUpdated(*p); } } - txHolder.commit(); } _adapterObserverTopic->waitForSyncedSubscribers(serial); @@ -836,33 +903,26 @@ Database::getAdapterInfo(const string& id) // Otherwise, we check the adapter endpoint table -- if there's an // entry the adapter is managed by the registry itself. // - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringAdapterInfoDict adapters(connection, _adapterDbName); - StringAdapterInfoDict::const_iterator p = adapters.find(id); - if(p != adapters.end()) + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); + AdapterInfoSeq infos; + try { - AdapterInfoSeq infos; - infos.push_back(p->second); - return infos; + infos.push_back(adaptersWrapper.find(id)); } - - // - // If it's not a regular object adapter, perhaps it's a replica - // group... - // - p = adapters.findByReplicaGroupId(id, true); - if(p != adapters.end()) + catch(const NotFoundException&) { - AdapterInfoSeq infos; - while(p != adapters.end()) + // + // If it's not a regular object adapter, perhaps it's a replica + // group... + // + infos = adaptersWrapper.findByReplicaGroupId(id); + if(infos.size() == 0) { - infos.push_back(p->second); - ++p; + throw AdapterNotExistException(id); } - return infos; } - - throw AdapterNotExistException(id); + return infos; } @@ -874,7 +934,15 @@ Database::getAllAdapters(const string& expression) vector<string> ids = _adapterCache.getAll(expression); result.swap(ids); set<string> groups; - for(StringAdapterInfoDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); +#ifdef QTSQL + StringAdapterInfoDict adapters = adaptersWrapper.getMap(); +#else + StringAdapterInfoDict& adapters = adaptersWrapper.getMap(); +#endif + for(StringAdapterInfoDict::const_iterator p = adapters.begin(); p != adapters.end(); ++p) { if(expression.empty() || IceUtilInternal::match(p->first, expression, true)) { @@ -910,11 +978,17 @@ Database::addObject(const ObjectInfo& info) throw ObjectExistsException(id); } - if(_objects.find(id) != _objects.end()) + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + try { + objectsWrapper.find(id); throw ObjectExistsException(id); } - _objects.put(IdentityObjectInfoDict::value_type(id, info)); + catch(const NotFoundException&) + { + } + objectsWrapper.put(id, info); serial = _objectObserverTopic->objectAdded(info); @@ -940,8 +1014,18 @@ Database::addOrUpdateObject(const ObjectInfo& info) throw ObjectExistsException(id); } - bool update = _objects.find(id) != _objects.end(); - _objects.put(IdentityObjectInfoDict::value_type(id, info)); + bool update = false; + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + try + { + objectsWrapper.find(id); + update = true; + } + catch(const NotFoundException&) + { + } + objectsWrapper.put(id, info); if(update) { @@ -977,14 +1061,20 @@ Database::removeObject(const Ice::Identity& id) throw ex; } - IdentityObjectInfoDict::iterator p = _objects.find(id); - if(p == _objects.end()) + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + try + { + objectsWrapper.find(id); + } + catch(const NotFoundException&) { ObjectNotRegisteredException ex; ex.id = id; throw ex; } - _objects.erase(p); + + objectsWrapper.erase(id); serial = _objectObserverTopic->objectRemoved(id); @@ -1015,18 +1105,23 @@ Database::updateObject(const Ice::ObjectPrx& proxy) throw ex; } - IdentityObjectInfoDict::iterator p = _objects.find(id); - if(p == _objects.end()) + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + + ObjectInfo info; + try + { + info = objectsWrapper.find(id); + } + catch(const NotFoundException&) { ObjectNotRegisteredException ex; ex.id = id; throw ex; } - ObjectInfo info; - info = p->second; info.proxy = proxy; - p.set(info); + objectsWrapper.put(id, info); serial = _objectObserverTopic->objectUpdated(info); @@ -1043,12 +1138,17 @@ int Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects) { Lock sync(*this); - Freeze::TransactionHolder txHolder(_connection); + + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) { - _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p)); + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + objectsWrapper.put(p->proxy->ice_getIdentity(), *p); } int serial = _objectObserverTopic->objectsAddedOrUpdated(objects); + txHolder.commit(); return serial; } @@ -1057,12 +1157,17 @@ void Database::removeObjectsInDatabase(const ObjectInfoSeq& objects) { Lock sync(*this); - Freeze::TransactionHolder txHolder(_connection); + + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txHolder(connection); + for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) { - _objects.erase(p->proxy->ice_getIdentity()); + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + objectsWrapper.erase(p->proxy->ice_getIdentity()); } _objectObserverTopic->objectsRemoved(objects); + txHolder.commit(); } @@ -1080,16 +1185,18 @@ Database::getObjectProxy(const Ice::Identity& id) { } - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, _objectDbName); - IdentityObjectInfoDict::const_iterator p = objects.find(id); - if(p == objects.end()) + DatabaseConnectionPtr connection = _databaseCache->newConnection(); + try + { + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + return objectsWrapper.find(id).proxy; + } + catch(const NotFoundException&) { ObjectNotRegisteredException ex; ex.id = id; throw ex; } - return p->second.proxy; } Ice::ObjectPrx @@ -1139,11 +1246,13 @@ Ice::ObjectProxySeq Database::getObjectsByType(const string& type) { Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type); - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, _objectDbName); - for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p) + + DatabaseConnectionPtr connection = _databaseCache->newConnection(); + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + vector<ObjectInfo> infos = objectsWrapper.findByType(type); + for(unsigned int i = 0; i < infos.size(); ++i) { - proxies.push_back(p->second.proxy); + proxies.push_back(infos[i].proxy); } return proxies; } @@ -1160,22 +1269,30 @@ Database::getObjectInfo(const Ice::Identity& id) { } - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, _objectDbName); - IdentityObjectInfoDict::const_iterator p = objects.find(id); - if(p == objects.end()) + DatabaseConnectionPtr connection = _databaseCache->newConnection(); + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + try + { + return objectsWrapper.find(id); + } + catch(const NotFoundException&) { throw ObjectNotRegisteredException(id); } - return p->second; } ObjectInfoSeq Database::getAllObjectInfos(const string& expression) { ObjectInfoSeq infos = _objectCache.getAll(expression); - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, _objectDbName); + + DatabaseConnectionPtr connection = _databaseCache->newConnection(); + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); +#ifdef QTSQL + IdentityObjectInfoDict objects = objectsWrapper.getMap(); +#else + IdentityObjectInfoDict& objects = objectsWrapper.getMap(); +#endif for(IdentityObjectInfoDict::const_iterator p = objects.begin(); p != objects.end(); ++p) { if(expression.empty() || IceUtilInternal::match(_communicator->identityToString(p->first), expression, true)) @@ -1190,11 +1307,13 @@ ObjectInfoSeq Database::getObjectInfosByType(const string& type) { ObjectInfoSeq infos = _objectCache.getAllByType(type); - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, _objectDbName); - for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p) + + DatabaseConnectionPtr connection = _databaseCache->newConnection(); + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + ObjectInfoSeq dbInfos = objectsWrapper.findByType(type); + for(unsigned int i = 0; i < dbInfos.size(); ++i) { - infos.push_back(p->second); + infos.push_back(dbInfos[i]); } return infos; } @@ -1204,36 +1323,54 @@ Database::addInternalObject(const ObjectInfo& info, bool replace) { Lock sync(*this); const Ice::Identity id = info.proxy->ice_getIdentity(); - if(!replace && _internalObjects.find(id) != _internalObjects.end()) + + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + InternalObjectsDictWrapper internalObjectsWrapper(_databaseCache, connection); + if(!replace) { - throw ObjectExistsException(id); + try + { + internalObjectsWrapper.find(id); + throw ObjectExistsException(id); + } + catch(const NotFoundException&) + { + } } - _internalObjects.put(IdentityObjectInfoDict::value_type(id, info)); + internalObjectsWrapper.put(id, info); } void Database::removeInternalObject(const Ice::Identity& id) { Lock sync(*this); - IdentityObjectInfoDict::iterator p = _internalObjects.find(id); - if(p == _internalObjects.end()) + + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + InternalObjectsDictWrapper internalObjectsWrapper(_databaseCache, connection); + try + { + internalObjectsWrapper.find(id); + } + catch(const NotFoundException&) { ObjectNotRegisteredException ex; ex.id = id; throw ex; } - _internalObjects.erase(p); + internalObjectsWrapper.erase(id); } Ice::ObjectProxySeq Database::getInternalObjectsByType(const string& type) { - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict internalObjects(connection, _internalObjectDbName); Ice::ObjectProxySeq proxies; - for(IdentityObjectInfoDict::const_iterator p = internalObjects.findByType(type); p != internalObjects.end(); ++p) + + DatabaseConnectionPtr connection = _databaseCache->newConnection(); + InternalObjectsDictWrapper internalObjectsWrapper(_databaseCache, connection); + vector<ObjectInfo> infos = internalObjectsWrapper.findByType(type); + for(unsigned int i = 0; i < infos.size(); ++i) { - proxies.push_back(p->second.proxy); + proxies.push_back(infos[i].proxy); } return proxies; } @@ -1327,9 +1464,30 @@ Database::checkServerForAddition(const string& id) void Database::checkAdapterForAddition(const string& id) { - if(_adapterCache.has(id) || - _adapters.find(id) != _adapters.end() || - _adapters.findByReplicaGroupId(id) != _adapters.end()) + bool found = false; + if(_adapterCache.has(id)) + { + found = true; + } + else + { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + AdaptersDictWrapper adaptersWrapper(_databaseCache, connection); + try + { + adaptersWrapper.find(id); + found = true; + } + catch(const NotFoundException&) + { + if(adaptersWrapper.findByReplicaGroupId(id).size() != 0) + { + found = true; + } + } + } + + if(found) { DeploymentException ex; ex.reason = "adapter `" + id + "' is already registered"; @@ -1340,9 +1498,26 @@ Database::checkAdapterForAddition(const string& id) void Database::checkObjectForAddition(const Ice::Identity& objectId) { - if(_objectCache.has(objectId) || - _allocatableObjectCache.has(objectId) || - _objects.find(objectId) != _objects.end()) + bool found = false; + if(_objectCache.has(objectId) || _allocatableObjectCache.has(objectId)) + { + found = true; + } + else + { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + ObjectsDictWrapper objectsWrapper(_databaseCache, connection); + try + { + objectsWrapper.find(objectId); + found = true; + } + catch(const NotFoundException&) + { + } + } + + if(found) { DeploymentException ex; ex.reason = "object `" + _communicator->identityToString(objectId) + "' is already registered"; @@ -1642,7 +1817,9 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries, info.revision = update.revision; info.descriptor = newDesc; - _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info)); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + ApplicationsDictWrapper applicationsWrapper(_databaseCache, connection); + applicationsWrapper.put(update.descriptor.name, info); ++_applicationSerial; if(_traceLevels->application > 0) |