diff options
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 507 |
1 files changed, 218 insertions, 289 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 9e7debbaaac..9b33ce7917a 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -140,9 +140,7 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _objects(_connection, _objectDbName), _internalObjects(_connection, _internalObjectDbName), _lock(0), - _applicationSerial(0), - _adapterSerial(0), - _objectSerial(0) + _applicationSerial(0) { ServerEntrySeq entries; for(StringApplicationInfoDict::const_iterator p = _applications.begin(); p != _applications.end(); ++p) @@ -272,95 +270,82 @@ Database::unlock(AdminSessionI* session) void Database::syncApplications(const ApplicationInfoSeq& applications) { - int serial; + Lock sync(*this); + + Freeze::TransactionHolder txHolder(_connection); + ServerEntrySeq entries; + set<string> names; + for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p) { - Lock sync(*this); - - Freeze::TransactionHolder txHolder(_connection); - ServerEntrySeq entries; - set<string> names; - for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p) + try { - try + StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name); + if(s != _applications.end()) { - StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name); - if(s != _applications.end()) - { - ApplicationHelper previous(_communicator, s->second.descriptor); - ApplicationHelper helper(_communicator, p->descriptor); - reload(previous, helper, entries, p->uuid, p->revision); - } - else - { - load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision); - } + ApplicationHelper previous(_communicator, s->second.descriptor); + ApplicationHelper helper(_communicator, p->descriptor); + reload(previous, helper, entries, p->uuid, p->revision); } - catch(const DeploymentException& ex) + else { - Ice::Warning warn(_traceLevels->logger); - warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason; + load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision); } - _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p)); - names.insert(p->descriptor.name); } - StringApplicationInfoDict::iterator s = _applications.begin(); - while(s != _applications.end()) + catch(const DeploymentException& ex) { - if(names.find(s->first) == names.end()) - { - unload(ApplicationHelper(_communicator, s->second.descriptor), entries); - _applications.erase(s++); - } - else - { - ++s; - } + Ice::Warning warn(_traceLevels->logger); + warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason; } - serial = ++_applicationSerial; - txHolder.commit(); + _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p)); + names.insert(p->descriptor.name); } - _applicationObserverTopic->applicationInit(serial, applications); + StringApplicationInfoDict::iterator s = _applications.begin(); + while(s != _applications.end()) + { + if(names.find(s->first) == names.end()) + { + unload(ApplicationHelper(_communicator, s->second.descriptor), entries); + _applications.erase(s++); + } + else + { + ++s; + } + } + ++_applicationSerial; + + _applicationObserverTopic->applicationInit(_applicationSerial, applications); + + txHolder.commit(); } void Database::syncAdapters(const AdapterInfoSeq& adapters) { - int serial; + Lock sync(*this); + Freeze::TransactionHolder txHolder(_connection); + _adapters.clear(); + for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r) { - Lock sync(*this); - - Freeze::TransactionHolder txHolder(_connection); - _adapters.clear(); - for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r) - { - _adapters.put(StringAdapterInfoDict::value_type(r->id, *r)); - } - serial = ++_adapterSerial; - txHolder.commit(); - } - - _adapterObserverTopic->adapterInit(serial, adapters); + _adapters.put(StringAdapterInfoDict::value_type(r->id, *r)); + } + _adapterObserverTopic->adapterInit(adapters); + txHolder.commit(); } void Database::syncObjects(const ObjectInfoSeq& objects) { - int serial; + Lock sync(*this); + Freeze::TransactionHolder txHolder(_connection); + _objects.clear(); + for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) { - Lock sync(*this); - - Freeze::TransactionHolder txHolder(_connection); - _objects.clear(); - for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) - { - _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); - } - serial = ++_objectSerial; - txHolder.commit(); + _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); } - - _objectObserverTopic->objectInit(serial, objects); + _objectObserverTopic->objectInit(objects); + txHolder.commit(); } Ice::ObjectPrx @@ -418,25 +403,20 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) } } - int serial; { Lock sync(*this); - serial = ++_applicationSerial; + ++_applicationSerial; _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info)); - } - - _applicationObserverTopic->applicationAdded(serial, info); - - { - Lock sync(*this); finishUpdating(info.descriptor.name); - notifyAll(); - } - if(_traceLevels->application > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); - out << "added application `" << info.descriptor.name << "'"; + _applicationObserverTopic->applicationAdded(_applicationSerial, info); + + if(_traceLevels->application > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); + out << "added application `" << info.descriptor.name << "'"; + } + notifyAll(); } } @@ -570,7 +550,6 @@ void Database::removeApplication(const string& name, AdminSessionI* session) { ServerEntrySeq entries; - int serial; { Lock sync(*this); checkSessionLock(session); @@ -601,16 +580,15 @@ Database::removeApplication(const string& name, AdminSessionI* session) } _applications.erase(p); + ++_applicationSerial; - serial = ++_applicationSerial; - } - - _applicationObserverTopic->applicationRemoved(serial, name); + _applicationObserverTopic->applicationRemoved(_applicationSerial, name); - if(_traceLevels->application > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); - out << "removed application `" << name << "'"; + if(_traceLevels->application > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); + out << "removed application `" << name << "'"; + } } if(_master) @@ -799,45 +777,40 @@ Database::getAllNodeServers(const string& node) bool Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy) { + Lock sync(*this); + if(_adapterCache.has(adapterId)) + { + return false; + } + + StringAdapterInfoDict::iterator p = _adapters.find(adapterId); AdapterInfo info; - int serial; bool updated = false; + if(proxy) { - Lock sync(*this); - if(_adapterCache.has(adapterId)) + if(p != _adapters.end()) { - return false; - } - - StringAdapterInfoDict::iterator p = _adapters.find(adapterId); - if(proxy) - { - if(p != _adapters.end()) - { - info = p->second; - info.proxy = proxy; - info.replicaGroupId = replicaGroupId; - p.set(info); - updated = true; - } - else - { - info.id = adapterId; - info.proxy = proxy; - info.replicaGroupId = replicaGroupId; - _adapters.put(StringAdapterInfoDict::value_type(adapterId, info)); - } + info = p->second; + info.proxy = proxy; + info.replicaGroupId = replicaGroupId; + p.set(info); + updated = true; } else { - if(p == _adapters.end()) - { - return true; - } - _adapters.erase(p); + info.id = adapterId; + info.proxy = proxy; + info.replicaGroupId = replicaGroupId; + _adapters.put(StringAdapterInfoDict::value_type(adapterId, info)); + } + } + else + { + if(p == _adapters.end()) + { + return true; } - - serial = ++_adapterSerial; + _adapters.erase(p); } if(_traceLevels->adapter > 0) @@ -849,22 +822,23 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr out << " with replica group `" << replicaGroupId << "'"; } } - + if(proxy) { if(updated) { - _adapterObserverTopic->adapterUpdated(serial, info); + _adapterObserverTopic->adapterUpdated(info); } else { - _adapterObserverTopic->adapterAdded(serial, info); + _adapterObserverTopic->adapterAdded(info); } } else { - _adapterObserverTopic->adapterRemoved(serial, adapterId); + _adapterObserverTopic->adapterRemoved(adapterId); } + return true; } @@ -884,57 +858,42 @@ Database::getAdapterDirectProxy(const string& adapterId) void Database::removeAdapter(const string& adapterId) { - AdapterInfoSeq infos; - int serial; + Lock sync(*this); + if(_adapterCache.has(adapterId)) { - Lock sync(*this); - if(_adapterCache.has(adapterId)) - { - AdapterEntryPtr adpt = _adapterCache.get(adapterId); - DeploymentException ex; - ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n"; - ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'"; - throw ex; - } - - Freeze::TransactionHolder txHolder(_connection); // Required because of the iterator + AdapterEntryPtr adpt = _adapterCache.get(adapterId); + DeploymentException ex; + ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n"; + ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'"; + throw ex; + } + + Freeze::TransactionHolder txHolder(_connection); // Required because of the iterator - StringAdapterInfoDict::iterator p = _adapters.find(adapterId); - if(p != _adapters.end()) + StringAdapterInfoDict::iterator p = _adapters.find(adapterId); + AdapterInfoSeq infos; + if(p != _adapters.end()) + { + _adapters.erase(p); + } + else + { + p = _adapters.findByReplicaGroupId(adapterId, true); + if(p == _adapters.end()) { - _adapters.erase(p); + throw AdapterNotExistException(adapterId); } - else - { - p = _adapters.findByReplicaGroupId(adapterId, true); - if(p == _adapters.end()) - { - throw AdapterNotExistException(adapterId); - } - while(p != _adapters.end()) - { - AdapterInfo info = p->second; - info.replicaGroupId = ""; - infos.push_back(info); - _adapters.put(StringAdapterInfoDict::value_type(p->first, info)); - ++p; - } - } - - txHolder.commit(); - - if(infos.empty()) + while(p != _adapters.end()) { - serial = ++_adapterSerial; - } - else - { - serial = _adapterSerial + 1; - _adapterSerial += static_cast<int>(static_cast<int>(infos.size())); + AdapterInfo info = p->second; + info.replicaGroupId = ""; + infos.push_back(info); + _adapters.put(StringAdapterInfoDict::value_type(p->first, info)); + ++p; } } - + if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); @@ -943,17 +902,17 @@ Database::removeAdapter(const string& adapterId) if(infos.empty()) { - _adapterObserverTopic->adapterRemoved(serial, adapterId); + _adapterObserverTopic->adapterRemoved(adapterId); } else { - int i = 0; - AdapterInfoSeq::const_iterator p; - for(p = infos.begin(); p != infos.end(); ++p, ++i) + for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) { - _adapterObserverTopic->adapterUpdated(serial + i, *p); + _adapterObserverTopic->adapterUpdated(*p); } } + + txHolder.commit(); } AdapterPrx @@ -1109,41 +1068,37 @@ Database::getAllAdapters(const string& expression) void Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase) { + Lock sync(*this); const Ice::Identity id = info.proxy->ice_getIdentity(); - int serial; + + if(_objectCache.has(id)) + { + throw ObjectExistsException(id); + } + bool update = false; + if(_objects.find(id) != _objects.end()) { - Lock sync(*this); - if(_objectCache.has(id)) + if(!replaceIfExistsInDatabase) { throw ObjectExistsException(id); } - - if(_objects.find(id) != _objects.end()) + else { - if(!replaceIfExistsInDatabase) - { - throw ObjectExistsException(id); - } - else - { - update = true; - } + update = true; } - _objects.put(IdentityObjectInfoDict::value_type(id, info)); - - serial = ++_objectSerial; } + _objects.put(IdentityObjectInfoDict::value_type(id, info)); if(!update) { - _objectObserverTopic->objectAdded(serial, info); + _objectObserverTopic->objectAdded(info); } else { - _objectObserverTopic->objectUpdated(serial, info); + _objectObserverTopic->objectUpdated(info); } - + if(_traceLevels->object > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); @@ -1161,32 +1116,28 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase) void Database::removeObject(const Ice::Identity& id) { - int serial; + Lock sync(*this); + if(_objectCache.has(id)) { - Lock sync(*this); - if(_objectCache.has(id)) - { - DeploymentException ex; - ex.reason = "removing object `" + _communicator->identityToString(id) + "' is not allowed:\n"; - ex.reason += "the object was added with the application descriptor `"; - ex.reason += _objectCache.get(id)->getApplication(); - ex.reason += "'"; - throw ex; - } - - IdentityObjectInfoDict::iterator p = _objects.find(id); - if(p == _objects.end()) - { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; - } - _objects.erase(p); + DeploymentException ex; + ex.reason = "removing object `" + _communicator->identityToString(id) + "' is not allowed:\n"; + ex.reason += "the object was added with the application descriptor `"; + ex.reason += _objectCache.get(id)->getApplication(); + ex.reason += "'"; + throw ex; + } - serial = ++_objectSerial; + IdentityObjectInfoDict::iterator p = _objects.find(id); + if(p == _objects.end()) + { + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; } + _objects.erase(p); - _objectObserverTopic->objectRemoved(serial, id); + + _objectObserverTopic->objectRemoved(id); if(_traceLevels->object > 0) { @@ -1198,38 +1149,34 @@ Database::removeObject(const Ice::Identity& id) void Database::updateObject(const Ice::ObjectPrx& proxy) { + Lock sync(*this); + const Ice::Identity id = proxy->ice_getIdentity(); - int serial; - ObjectInfo info; + if(_objectCache.has(id)) { - Lock sync(*this); - if(_objectCache.has(id)) - { - DeploymentException ex; - ex.reason = "updating object `" + _communicator->identityToString(id) + "' is not allowed:\n"; - ex.reason += "the object was added with the application descriptor `"; - ex.reason += _objectCache.get(id)->getApplication(); - ex.reason += "'"; - throw ex; - } - - IdentityObjectInfoDict::iterator p = _objects.find(id); - if(p == _objects.end()) - { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; - } - - info = p->second; - info.proxy = proxy; - p.set(info); - - serial = ++_objectSerial; + DeploymentException ex; + ex.reason = "updating object `" + _communicator->identityToString(id) + "' is not allowed:\n"; + ex.reason += "the object was added with the application descriptor `"; + ex.reason += _objectCache.get(id)->getApplication(); + ex.reason += "'"; + throw ex; + } + + IdentityObjectInfoDict::iterator p = _objects.find(id); + if(p == _objects.end()) + { + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; } - _objectObserverTopic->objectUpdated(serial, info); - + ObjectInfo info; + info = p->second; + info.proxy = proxy; + p.set(info); + + _objectObserverTopic->objectUpdated(info); + if(_traceLevels->object > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); @@ -1240,40 +1187,28 @@ Database::updateObject(const Ice::ObjectPrx& proxy) int Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects) { - int serial; - vector<bool> updated; + Lock sync(*this); + Freeze::TransactionHolder txHolder(_connection); + for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) { - Lock sync(*this); - Freeze::TransactionHolder txHolder(_connection); - for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) - { - updated.push_back(_objects.find(p->proxy->ice_getIdentity()) != _objects.end()); - _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p)); - } - serial = ++_objectSerial; - txHolder.commit(); + _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p)); } - - _objectObserverTopic->objectsAddedOrUpdated(serial, objects); + int serial = _objectObserverTopic->objectsAddedOrUpdated(objects); + txHolder.commit(); return serial; } void Database::removeObjectsInDatabase(const ObjectInfoSeq& objects) { - int serial; + Lock sync(*this); + Freeze::TransactionHolder txHolder(_connection); + for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) { - Lock sync(*this); - Freeze::TransactionHolder txHolder(_connection); - for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) - { - _objects.erase(p->proxy->ice_getIdentity()); - } - serial = ++_objectSerial; - txHolder.commit(); + _objects.erase(p->proxy->ice_getIdentity()); } - - _objectObserverTopic->objectsRemoved(serial, objects); + _objectObserverTopic->objectsRemoved(objects); + txHolder.commit(); } void @@ -1760,33 +1695,27 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries, // // Save the application descriptor. // - int serial; - { - Lock sync(*this); - - ApplicationInfo info = oldApp; - info.updateTime = update.updateTime; - info.updateUser = update.updateUser; - info.revision = update.revision; - info.descriptor = newDesc; - - _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info)); - serial = ++_applicationSerial; - } - - _applicationObserverTopic->applicationUpdated(serial, update); - - { - Lock sync(*this); - finishUpdating(update.descriptor.name); - notifyAll(); - } - + Lock sync(*this); + + ApplicationInfo info = oldApp; + info.updateTime = update.updateTime; + info.updateUser = update.updateUser; + info.revision = update.revision; + info.descriptor = newDesc; + + _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info)); + ++_applicationSerial; + finishUpdating(update.descriptor.name); + if(_traceLevels->application > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); out << "updated application `" << update.descriptor.name << "'"; } + + _applicationObserverTopic->applicationUpdated(_applicationSerial, update); + + notifyAll(); } void |