diff options
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 287 |
1 files changed, 157 insertions, 130 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index d0f64b6e7b8..ac63076458c 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -9,7 +9,6 @@ #include <IceUtil/StringUtil.h> #include <IceUtil/Random.h> -#include <IceUtil/UUID.h> #include <Freeze/Freeze.h> #include <IceGrid/Database.h> #include <IceGrid/TraceLevels.h> @@ -371,138 +370,145 @@ Database::getReplicatedEndpoints(const string& name, const Ice::ObjectPrx& proxy } void -Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& desc) +Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) { ServerEntrySeq entries; - string uuid = IceUtil::generateUUID(); { Lock sync(*this); checkSessionLock(session); - while(_updating.find(desc.name) != _updating.end()) + while(_updating.find(info.descriptor.name) != _updating.end()) { wait(); } - if(_applications.find(desc.name) != _applications.end()) + if(_applications.find(info.descriptor.name) != _applications.end()) { - throw DeploymentException("application `" + desc.name + "' already exists"); + throw DeploymentException("application `" + info.descriptor.name + "' already exists"); } - ApplicationHelper helper(_communicator, desc); + ApplicationHelper helper(_communicator, info.descriptor); checkForAddition(helper); - load(helper, entries, uuid, 1); - _updating.insert(desc.name); + load(helper, entries, info.uuid, info.revision); + _updating.insert(info.descriptor.name); } // - // Synchronize the servers on the nodes. If a server couldn't be - // deployed we unload the application and throw. + // If the update is from an admin session, we synchronize the + // servers and throw if there's errors. // - try - { - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); - } - catch(const DeploymentException& ex) + if(session) { - { - Lock sync(*this); - entries.clear(); - unload(ApplicationHelper(_communicator, desc), entries); - _updating.erase(desc.name); - notifyAll(); - } + // + // Synchronize the servers on the nodes. If a server couldn't be + // deployed we unload the application and throw. + // try { for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } - catch(const DeploymentException&) + catch(const DeploymentException& ex) { - // TODO: warning? + { + Lock sync(*this); + entries.clear(); + unload(ApplicationHelper(_communicator, info.descriptor), entries); + _updating.erase(info.descriptor.name); + notifyAll(); + } + try + { + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + } + catch(const DeploymentException&) + { + // TODO: warning? + } + throw ex; } - throw ex; + } + else + { + // TODO: XXX: Synchronize the servers here?! } // // Save the application descriptor. // int serial; - ApplicationInfo info; { - Lock sync(*this); - - info.createTime = info.updateTime = IceUtil::Time::now().toMilliSeconds(); - info.createUser = info.updateUser = _lockUserId; - info.descriptor = desc; - info.revision = 1; - info.uuid = uuid; - - _applications.put(StringApplicationInfoDict::value_type(desc.name, info)); - + Lock sync(*this); + _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info)); serial = ++_applicationSerial; - _updating.erase(desc.name); + _updating.erase(info.descriptor.name); notifyAll(); } - // - // Notify the observers. - // _applicationObserverTopic->applicationAdded(serial, info); + _replicaCache.waitForUpdateReplication("application", serial); if(_traceLevels->application > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); - out << "added application `" << desc.name << "'"; + out << "added application `" << info.descriptor.name << "'"; } } void -Database::updateApplicationDescriptor(AdminSessionI* session, const ApplicationUpdateDescriptor& update) +Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* session) { ServerEntrySeq entries; ApplicationInfo oldApp; ApplicationDescriptor newDesc; + ApplicationUpdateInfo update = updt; { Lock sync(*this); checkSessionLock(session); - while(_updating.find(update.name) != _updating.end()) + while(_updating.find(update.descriptor.name) != _updating.end()) { wait(); } - StringApplicationInfoDict::const_iterator p = _applications.find(update.name); + StringApplicationInfoDict::const_iterator p = _applications.find(update.descriptor.name); if(p == _applications.end()) { - throw ApplicationNotExistException(update.name); + throw ApplicationNotExistException(update.descriptor.name); } oldApp = p->second; + if(update.revision < 0) + { + update.revision = oldApp.revision + 1; + } + ApplicationHelper previous(_communicator, oldApp.descriptor); - ApplicationHelper helper(_communicator, previous.update(update)); + ApplicationHelper helper(_communicator, previous.update(update.descriptor)); checkForUpdate(previous, helper); reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1); newDesc = helper.getDefinition(); - _updating.insert(update.name); + _updating.insert(update.descriptor.name); } - finishUpdate(entries, update, oldApp, newDesc); + finishUpdate(entries, update, oldApp, newDesc, session); } void -Database::syncApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& newDesc) +Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminSessionI* session) { + assert(session); + ServerEntrySeq entries; - ApplicationUpdateDescriptor update; + ApplicationUpdateInfo update; ApplicationInfo oldApp; { Lock sync(*this); checkSessionLock(session); - while(_updating.find(update.name) != _updating.end()) + while(_updating.find(update.descriptor.name) != _updating.end()) { wait(); } @@ -516,32 +522,38 @@ Database::syncApplicationDescriptor(AdminSessionI* session, const ApplicationDes ApplicationHelper previous(_communicator, oldApp.descriptor); ApplicationHelper helper(_communicator, newDesc); - update = helper.diff(previous); + + update.updateTime = IceUtil::Time::now().toMilliSeconds(); + update.updateUser = _lockUserId; + update.revision = oldApp.revision + 1; + update.descriptor = helper.diff(previous); checkForUpdate(previous, helper); reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1); - _updating.insert(update.name); + _updating.insert(update.descriptor.name); } - finishUpdate(entries, update, oldApp, newDesc); + finishUpdate(entries, update, oldApp, newDesc, session); } void -Database::instantiateServer(AdminSessionI* session, - const string& application, +Database::instantiateServer(const string& application, const string& node, - const ServerInstanceDescriptor& instance) + const ServerInstanceDescriptor& instance, + AdminSessionI* session) { + assert(session); + ServerEntrySeq entries; - ApplicationUpdateDescriptor update; + ApplicationUpdateInfo update; ApplicationInfo oldApp; ApplicationDescriptor newDesc; { Lock sync(*this); checkSessionLock(session); - while(_updating.find(update.name) != _updating.end()) + while(_updating.find(application) != _updating.end()) { wait(); } @@ -555,21 +567,25 @@ Database::instantiateServer(AdminSessionI* session, ApplicationHelper previous(_communicator, oldApp.descriptor); ApplicationHelper helper(_communicator, previous.instantiateServer(node, instance)); - update = helper.diff(previous); + + update.updateTime = IceUtil::Time::now().toMilliSeconds(); + update.updateUser = _lockUserId; + update.revision = oldApp.revision + 1; + update.descriptor = helper.diff(previous); checkForUpdate(previous, helper); reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1); newDesc = helper.getDefinition(); - _updating.insert(update.name); + _updating.insert(update.descriptor.name); } - finishUpdate(entries, update, oldApp, newDesc); + finishUpdate(entries, update, oldApp, newDesc, session); } void -Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& name) +Database::removeApplication(const string& name, AdminSessionI* session) { ServerEntrySeq entries; int serial; @@ -607,10 +623,8 @@ Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& serial = ++_applicationSerial; } - // - // Notify the observers - // _applicationObserverTopic->applicationRemoved(serial, name); + _replicaCache.waitForUpdateReplication("application", serial); if(_traceLevels->application > 0) { @@ -618,7 +632,14 @@ Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& out << "removed application `" << name << "'"; } - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + if(session) + { + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + } + else + { + // TODO: XXX: synchronize the servers + } } ApplicationInfo @@ -707,11 +728,15 @@ Database::getAllNodes(const string& expression) void Database::addReplica(const string& name, const ReplicaSessionIPtr& session) { - _replicaCache.add(name, session); + // + // NOTE: this must be done before we add the replica to the cache + // in order for ReplicaCache::waitForUpdateReplication to work. + // + _applicationObserverTopic->subscribeAndWaitForSubscription(session->getObserver()); + _adapterObserverTopic->subscribeAndWaitForSubscription(session->getObserver()); + _objectObserverTopic->subscribeAndWaitForSubscription(session->getObserver()); - _applicationObserverTopic->subscribe(session->getObserver()); - _adapterObserverTopic->subscribe(session->getObserver()); - _objectObserverTopic->subscribe(session->getObserver()); + _replicaCache.add(name, session); } InternalRegistryPrx @@ -727,13 +752,19 @@ Database::getReplicaInfo(const string& name) const } void +Database::replicaReceivedUpdate(const string& name, const string& update, int serial) +{ + _replicaCache.replicaReceivedUpdate(name, update, serial); +} + +void Database::removeReplica(const string& name, const ReplicaSessionIPtr& session) { + _replicaCache.remove(name); + _applicationObserverTopic->unsubscribe(session->getObserver()); _adapterObserverTopic->unsubscribe(session->getObserver()); _objectObserverTopic->unsubscribe(session->getObserver()); - - _replicaCache.remove(name); } Ice::StringSeq @@ -843,6 +874,7 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr { _adapterObserverTopic->adapterRemoved(serial, adapterId); } + _replicaCache.waitForUpdateReplication("adapter", serial); return true; } @@ -908,7 +940,7 @@ Database::removeAdapter(const string& adapterId) } else { - serial = _adapterSerial; + serial = _adapterSerial + 1; _adapterSerial += static_cast<int>(static_cast<int>(infos.size())); } } @@ -922,13 +954,21 @@ Database::removeAdapter(const string& adapterId) if(infos.empty()) { _adapterObserverTopic->adapterRemoved(serial, adapterId); + _replicaCache.waitForUpdateReplication("adapter", serial); } else { - for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) + int i = 0; + AdapterInfoSeq::const_iterator p; + for(p = infos.begin(); p != infos.end(); ++p, ++i) { - _adapterObserverTopic->adapterUpdated(++serial, *p); + _adapterObserverTopic->adapterUpdated(serial + i, *p); } + i = 0; + for(p = infos.begin(); p != infos.end(); ++p, ++i) + { + _replicaCache.waitForUpdateReplication("adapter", serial + i); + } } } @@ -1110,10 +1150,7 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase) serial = ++_objectSerial; } - - // - // Notify the observers. - // + if(!update) { _objectObserverTopic->objectAdded(serial, info); @@ -1122,6 +1159,7 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase) { _objectObserverTopic->objectUpdated(serial, info); } + _replicaCache.waitForUpdateReplication("object", serial); if(_traceLevels->object > 0) { @@ -1165,10 +1203,8 @@ Database::removeObject(const Ice::Identity& id) serial = ++_objectSerial; } - // - // Notify the observers. - // _objectObserverTopic->objectRemoved(serial, id); + _replicaCache.waitForUpdateReplication("object", serial); if(_traceLevels->object > 0) { @@ -1210,10 +1246,8 @@ Database::updateObject(const Ice::ObjectPrx& proxy) serial = ++_objectSerial; } - // - // Notify the observers. - // _objectObserverTopic->objectUpdated(serial, info); + _replicaCache.waitForUpdateReplication("object", serial); if(_traceLevels->object > 0) { @@ -1241,9 +1275,6 @@ Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects) txHolder.commit(); } - // - // Notify the observers. - // vector<bool>::const_iterator q = updated.begin(); for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p, ++q) { @@ -1288,10 +1319,7 @@ Database::removeObjectsInDatabase(const ObjectInfoSeq& objects) _objectSerial += static_cast<int>(static_cast<int>(objects.size())); txHolder.commit(); } - - // - // Notify the observers. - // + for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) { _objectObserverTopic->objectRemoved(++serial, p->proxy->ice_getIdentity()); @@ -1721,70 +1749,69 @@ Database::reload(const ApplicationHelper& oldApp, void Database::finishUpdate(ServerEntrySeq& entries, - const ApplicationUpdateDescriptor& update, + const ApplicationUpdateInfo& update, const ApplicationInfo& oldApp, - const ApplicationDescriptor& newDesc) + const ApplicationDescriptor& newDesc, + AdminSessionI* session) { - - // - // Synchronize the servers on the nodes. If a server couldn't be - // deployed we unload the application and throw. - // - try - { - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); - } - catch(const DeploymentException& ex) + if(session) { - { - Lock sync(*this); - entries.clear(); - ApplicationHelper previous(_communicator, newDesc); - ApplicationHelper helper(_communicator, oldApp.descriptor); - reload(previous, helper, entries, oldApp.uuid, oldApp.revision); - _updating.erase(newDesc.name); - notifyAll(); - } + // + // Synchronize the servers on the nodes. If a server couldn't be + // deployed we unload the application and throw. + // try { for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } - catch(const DeploymentException&) + catch(const DeploymentException& ex) { - // TODO: warning? + { + Lock sync(*this); + entries.clear(); + ApplicationHelper previous(_communicator, newDesc); + ApplicationHelper helper(_communicator, oldApp.descriptor); + reload(previous, helper, entries, oldApp.uuid, oldApp.revision); + _updating.erase(newDesc.name); + notifyAll(); + } + try + { + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + } + catch(const DeploymentException&) + { + // TODO: warning? + } + throw ex; } - throw ex; } // // Save the application descriptor. // int serial; - ApplicationUpdateInfo updateInfo; { Lock sync(*this); ApplicationInfo info = oldApp; - info.updateTime = updateInfo.updateTime = IceUtil::Time::now().toMilliSeconds(); - info.updateUser = updateInfo.updateUser = _lockUserId; - info.revision = updateInfo.revision = oldApp.revision + 1; + info.updateTime = update.updateTime; + info.updateUser = update.updateUser; + info.revision = update.revision; info.descriptor = newDesc; - updateInfo.descriptor = update; - _applications.put(StringApplicationInfoDict::value_type(update.name, info)); + _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info)); serial = ++_applicationSerial; - _updating.erase(update.name); + _updating.erase(update.descriptor.name); notifyAll(); } - // - // Notify the observers. - // - _applicationObserverTopic->applicationUpdated(serial, updateInfo); + _applicationObserverTopic->applicationUpdated(serial, update); + _replicaCache.waitForUpdateReplication("application", serial); if(_traceLevels->application > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); - out << "updated application `" << update.name << "'"; + out << "updated application `" << update.descriptor.name << "'"; } } |