From 91f6ebb998532b36fc70187b641a5b7404060422 Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Tue, 3 Sep 2013 15:42:19 +0200 Subject: Fixed ICE-5358 - allow IceGrid replica to initialize its database from another replica --- cpp/src/IceGrid/Database.cpp | 382 +++++++++++++++++++++++++++++-------------- 1 file changed, 259 insertions(+), 123 deletions(-) (limited to 'cpp/src/IceGrid/Database.cpp') diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index f822bf576c8..14e454df6d5 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -43,6 +43,17 @@ struct ObjectLoadCI : binary_function&, pair vector +toVector(const map& m) +{ + vector v; + for(typename map::const_iterator p = m.begin(); p != m.end(); ++p) + { + v.push_back(p->second); + } + return v; +} + void halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex) { @@ -78,8 +89,7 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _serverCache(_communicator, _instanceName, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache), _connectionPool(plugin->getConnectionPool()), _databasePlugin(plugin), - _lock(0), - _applicationSerial(0) + _lock(0) { ServerEntrySeq entries; @@ -108,13 +118,9 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter); _registryObserverTopic = new RegistryObserverTopic(_topicManager); - _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, applications); - - AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); - _adapterObserverTopic = new AdapterObserverTopic(_topicManager, adaptersWrapper->getMap()); - - ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); - _objectObserverTopic = new ObjectObserverTopic(_topicManager, objectsWrapper->getMap()); + _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, applicationsWrapper); + _adapterObserverTopic = new AdapterObserverTopic(_topicManager, _connectionPool->getAdapters(connection)); + _objectObserverTopic = new ObjectObserverTopic(_topicManager, _connectionPool->getObjects(connection)); _registryObserverTopic->registryUp(info); } @@ -188,8 +194,8 @@ Database::lock(AdminSessionI* session, const string& userId) _lock = session; _lockUserId = userId; - - return _applicationSerial; + + return _applicationObserverTopic->getSerial(); } void @@ -206,9 +212,10 @@ Database::unlock(AdminSessionI* session) } void -Database::syncApplications(const ApplicationInfoSeq& newApplications) +Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long dbSerial) { - int serial = 0; // Initialize to prevent warning. + assert(dbSerial != 0); + int serial = 0; { Lock sync(*this); @@ -226,6 +233,7 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications) { applicationsWrapper->put(p->descriptor.name, *p); } + dbSerial = applicationsWrapper->updateSerial(dbSerial); txHolder.commit(); break; } @@ -274,16 +282,24 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications) } } - ++_applicationSerial; - serial = _applicationObserverTopic->applicationInit(_applicationSerial, newApplications); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + + if(_traceLevels->application > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); + out << "synchronized applications (serial = `" << dbSerial << "')"; + } + + serial = _applicationObserverTopic->applicationInit(dbSerial, newApplications); } _applicationObserverTopic->waitForSyncedSubscribers(serial); } void -Database::syncAdapters(const AdapterInfoSeq& adapters) +Database::syncAdapters(const AdapterInfoSeq& adapters, Ice::Long dbSerial) { - int serial; + assert(dbSerial != 0); + int serial = 0; { Lock sync(*this); for(;;) @@ -298,6 +314,7 @@ Database::syncAdapters(const AdapterInfoSeq& adapters) { adaptersWrapper->put(r->id, *r); } + dbSerial = adaptersWrapper->updateSerial(dbSerial); txHolder.commit(); break; } @@ -310,15 +327,23 @@ Database::syncAdapters(const AdapterInfoSeq& adapters) halt(_communicator, ex); } } - serial = _adapterObserverTopic->adapterInit(adapters); + + if(_traceLevels->application > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); + out << "synchronized adapters (serial = `" << dbSerial << "')"; + } + + serial = _adapterObserverTopic->adapterInit(dbSerial, adapters); } _adapterObserverTopic->waitForSyncedSubscribers(serial); } void -Database::syncObjects(const ObjectInfoSeq& objects) +Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial) { - int serial; + assert(dbSerial != 0); + int serial = 0; { Lock sync(*this); for(;;) @@ -333,6 +358,7 @@ Database::syncObjects(const ObjectInfoSeq& objects) { objectsWrapper->put(q->proxy->ice_getIdentity(), *q); } + dbSerial = objectsWrapper->updateSerial(dbSerial); txHolder.commit(); break; } @@ -345,14 +371,102 @@ Database::syncObjects(const ObjectInfoSeq& objects) halt(_communicator, ex); } } - serial = _objectObserverTopic->objectInit(objects); + + if(_traceLevels->application > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); + out << "synchronized objects (serial = `" << dbSerial << "')"; + } + + serial = _objectObserverTopic->objectInit(dbSerial, objects); } _objectObserverTopic->waitForSyncedSubscribers(serial); } +ApplicationInfoSeq +Database::getApplications(Ice::Long& serial) const +{ + for(;;) + { + try + { + DatabaseConnectionPtr connection = _connectionPool->newConnection(); + TransactionHolder txHolder(connection); + ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); + serial = applicationsWrapper->getSerial(); + return toVector(applicationsWrapper->getMap()); + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } + } +} + +AdapterInfoSeq +Database::getAdapters(Ice::Long& serial) const +{ + for(;;) + { + try + { + DatabaseConnectionPtr connection = _connectionPool->newConnection(); + TransactionHolder txHolder(connection); + AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection); + serial = adaptersWrapper->getSerial(); + return toVector(adaptersWrapper->getMap()); + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } + } +} + +ObjectInfoSeq +Database::getObjects(Ice::Long& serial) const +{ + for(;;) + { + try + { + DatabaseConnectionPtr connection = _connectionPool->newConnection(); + TransactionHolder txHolder(connection); + ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection); + serial = objectsWrapper->getSerial(); + return toVector(objectsWrapper->getMap()); + } + catch(const DeadlockException&) + { + continue; + } + catch(const DatabaseException& ex) + { + halt(_communicator, ex); + } + } +} + +StringLongDict +Database::getSerials() const +{ + return _connectionPool->getSerials(); +} + void -Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) +Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ice::Long dbSerial) { + assert(dbSerial != 0 || _master); + + int serial = 0; // Initialize to prevent warning. ServerEntrySeq entries; try { @@ -374,18 +488,33 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) ApplicationHelper helper(_communicator, info.descriptor, true); checkForAddition(helper, connection); - saveApplication(info, connection); + dbSerial = saveApplication(info, connection, dbSerial); load(helper, entries, info.uuid, info.revision); startUpdating(info.descriptor.name, info.uuid, info.revision); + + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + serial = _applicationObserverTopic->applicationAdded(dbSerial, info); } catch(const DatabaseException& ex) { halt(_communicator, ex); } + _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for replicas to be updated. + + // + // Mark the application as updated. All the replicas received the update so it's now safe + // for the nodes to start the servers. + // + { + Lock sync(*this); + vector::iterator p = find(_updating.begin(), _updating.end(), info.descriptor.name); + assert(p != _updating.end()); + p->markUpdated(); + } + if(_master) { - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); try { for(ServerEntrySeq::const_iterator p = entries.begin(); p != entries.end(); ++p) @@ -407,7 +536,10 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) Lock sync(*this); entries.clear(); unload(ApplicationHelper(_communicator, info.descriptor), entries); - removeApplication(info.descriptor.name, _connectionPool->getConnection()); + dbSerial = removeApplication(info.descriptor.name, _connectionPool->getConnection()); + + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + serial = _applicationObserverTopic->applicationRemoved(dbSerial, info.descriptor.name); } catch(const DeploymentException& ex) { @@ -418,32 +550,27 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) { halt(_communicator, ex); } + _applicationObserverTopic->waitForSyncedSubscribers(serial); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow)); finishUpdating(info.descriptor.name); throw ex; } } - int serial; + if(_traceLevels->application > 0) { - Lock sync(*this); - ++_applicationSerial; - serial = _applicationObserverTopic->applicationAdded(_applicationSerial, info); - - if(_traceLevels->application > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); - out << "added application `" << info.descriptor.name << "'"; - } + Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); + out << "added application `" << info.descriptor.name << "'"; } - - _applicationObserverTopic->waitForSyncedSubscribers(serial); - finishUpdating(info.descriptor.name); } void -Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, AdminSessionI* session) +Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, AdminSessionI* session, + Ice::Long dbSerial) { + assert(dbSerial != 0 || _master); + ApplicationInfo oldApp; ApplicationUpdateInfo update = updt; IceUtil::UniquePtr previous; @@ -481,12 +608,14 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, A halt(_communicator, ex); } - finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart); + finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart, dbSerial); } void Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool noRestart, AdminSessionI* session) { + assert(_master); + ApplicationUpdateInfo update; ApplicationInfo oldApp; IceUtil::UniquePtr previous; @@ -533,6 +662,8 @@ Database::instantiateServer(const string& application, const ServerInstanceDescriptor& instance, AdminSessionI* session) { + assert(_master); + ApplicationUpdateInfo update; ApplicationInfo oldApp; IceUtil::UniquePtr previous; @@ -575,11 +706,12 @@ Database::instantiateServer(const string& application, } void -Database::removeApplication(const string& name, AdminSessionI* session) +Database::removeApplication(const string& name, AdminSessionI* session, Ice::Long dbSerial) { + assert(dbSerial != 0 || _master); ServerEntrySeq entries; - int serial; + int serial = 0; // Initialize to prevent warning. try { Lock sync(*this); @@ -605,7 +737,6 @@ Database::removeApplication(const string& name, AdminSessionI* session) ApplicationHelper helper(_communicator, appInfo.descriptor); init = true; checkForRemove(helper); - removeApplication(name, connection); unload(helper, entries); } catch(const DeploymentException&) @@ -614,42 +745,31 @@ Database::removeApplication(const string& name, AdminSessionI* session) { throw; } - - // - // For some reasons the application became invalid. If - // it's invalid, it's most likely not loaded either. So we - // ignore the error and erase the descriptor. - // - removeApplication(name, connection); } + dbSerial = removeApplication(name, connection, dbSerial); startUpdating(name, appInfo.uuid, appInfo.revision); + + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + serial = _applicationObserverTopic->applicationRemoved(dbSerial, name); } catch(const DatabaseException& ex) { halt(_communicator, ex); } + _applicationObserverTopic->waitForSyncedSubscribers(serial); if(_master) { - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow)); } + if(_traceLevels->application > 0) { - Lock sync(*this); - ++_applicationSerial; - serial = _applicationObserverTopic->applicationRemoved(_applicationSerial, name); - - if(_traceLevels->application > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); - out << "removed application `" << name << "'"; - } + Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); + out << "removed application `" << name << "'"; } - _applicationObserverTopic->waitForSyncedSubscribers(serial); - finishUpdating(name); } @@ -743,9 +863,12 @@ Database::getAllocatableObject(const Ice::Identity& id) const } void -Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy) +Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy, + Ice::Long dbSerial) { - int serial = 0; + assert(dbSerial != 0 || _master); + + int serial = 0; // Initialize to prevent warning. { Lock sync(*this); if(_adapterCache.has(adapterId)) @@ -783,6 +906,7 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr { adaptersWrapper->erase(adapterId); } + dbSerial = adaptersWrapper->updateSerial(dbSerial); txHolder.commit(); break; } @@ -810,16 +934,16 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr { if(updated) { - serial = _adapterObserverTopic->adapterUpdated(info); + serial = _adapterObserverTopic->adapterUpdated(dbSerial, info); } else { - serial = _adapterObserverTopic->adapterAdded(info); + serial = _adapterObserverTopic->adapterAdded(dbSerial, info); } } else { - serial = _adapterObserverTopic->adapterRemoved(adapterId); + serial = _adapterObserverTopic->adapterRemoved(dbSerial, adapterId); } } _adapterObserverTopic->waitForSyncedSubscribers(serial); @@ -859,6 +983,8 @@ Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& en void Database::removeAdapter(const string& adapterId) { + assert(_master); + int serial = 0; // Initialize to prevent warning. { Lock sync(*this); @@ -870,8 +996,9 @@ Database::removeAdapter(const string& adapterId) ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'"; throw ex; } - + AdapterInfoSeq infos; + Ice::Long dbSerial = 0; for(;;) { try @@ -897,6 +1024,7 @@ Database::removeAdapter(const string& adapterId) adaptersWrapper->put(p->id, *p); } } + dbSerial = adaptersWrapper->updateSerial(); txHolder.commit(); break; } @@ -909,6 +1037,7 @@ Database::removeAdapter(const string& adapterId) halt(_communicator, ex); } } + if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); @@ -917,13 +1046,13 @@ Database::removeAdapter(const string& adapterId) if(infos.empty()) { - serial = _adapterObserverTopic->adapterRemoved(adapterId); + serial = _adapterObserverTopic->adapterRemoved(dbSerial, adapterId); } else { for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) { - serial = _adapterObserverTopic->adapterUpdated(*p); + serial = _adapterObserverTopic->adapterUpdated(dbSerial, *p); } } } @@ -1039,7 +1168,9 @@ Database::getAllAdapters(const string& expression) void Database::addObject(const ObjectInfo& info) { - int serial; + assert(_master); + + int serial = 0; { Lock sync(*this); const Ice::Identity id = info.proxy->ice_getIdentity(); @@ -1049,6 +1180,7 @@ Database::addObject(const ObjectInfo& info) throw ObjectExistsException(id); } + Ice::Long dbSerial = 0; for(;;) { try @@ -1065,6 +1197,7 @@ Database::addObject(const ObjectInfo& info) { } objectsWrapper->put(id, info); + dbSerial = objectsWrapper->updateSerial(); txHolder.commit(); break; } @@ -1078,7 +1211,7 @@ Database::addObject(const ObjectInfo& info) } } - serial = _objectObserverTopic->objectAdded(info); + serial = _objectObserverTopic->objectAdded(dbSerial, info); if(_traceLevels->object > 0) { @@ -1090,9 +1223,11 @@ Database::addObject(const ObjectInfo& info) } void -Database::addOrUpdateObject(const ObjectInfo& info) +Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial) { - int serial; + assert(dbSerial != 0 || _master); + + int serial = 0; // Initialize to prevent warning. { Lock sync(*this); const Ice::Identity id = info.proxy->ice_getIdentity(); @@ -1119,6 +1254,7 @@ Database::addOrUpdateObject(const ObjectInfo& info) { } objectsWrapper->put(id, info); + dbSerial = objectsWrapper->updateSerial(dbSerial); txHolder.commit(); break; } @@ -1134,11 +1270,11 @@ Database::addOrUpdateObject(const ObjectInfo& info) if(update) { - serial = _objectObserverTopic->objectUpdated(info); + serial = _objectObserverTopic->objectUpdated(dbSerial, info); } else { - serial = _objectObserverTopic->objectAdded(info); + serial = _objectObserverTopic->objectAdded(dbSerial, info); } if(_traceLevels->object > 0) @@ -1151,9 +1287,11 @@ Database::addOrUpdateObject(const ObjectInfo& info) } void -Database::removeObject(const Ice::Identity& id) +Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial) { - int serial; + assert(dbSerial != 0 || _master); + + int serial = 0; // Initialize to prevent warning. { Lock sync(*this); if(_objectCache.has(id)) @@ -1184,7 +1322,8 @@ Database::removeObject(const Ice::Identity& id) throw ex; } - objectsWrapper->erase(id); + objectsWrapper->erase(id); + dbSerial = objectsWrapper->updateSerial(dbSerial); txHolder.commit(); break; } @@ -1198,7 +1337,7 @@ Database::removeObject(const Ice::Identity& id) } } - serial = _objectObserverTopic->objectRemoved(id); + serial = _objectObserverTopic->objectRemoved(dbSerial, id); if(_traceLevels->object > 0) { @@ -1212,7 +1351,9 @@ Database::removeObject(const Ice::Identity& id) void Database::updateObject(const Ice::ObjectPrx& proxy) { - int serial; + assert(_master); + + int serial = 0; { Lock sync(*this); @@ -1228,6 +1369,7 @@ Database::updateObject(const Ice::ObjectPrx& proxy) } ObjectInfo info; + Ice::Long dbSerial = 0; for(;;) { try @@ -1248,6 +1390,7 @@ Database::updateObject(const Ice::ObjectPrx& proxy) info.proxy = proxy; objectsWrapper->put(id, info); + dbSerial = objectsWrapper->updateSerial(); txHolder.commit(); break; } @@ -1261,7 +1404,7 @@ Database::updateObject(const Ice::ObjectPrx& proxy) } } - serial = _objectObserverTopic->objectUpdated(info); + serial = _objectObserverTopic->objectUpdated(dbSerial, info); if(_traceLevels->object > 0) { @@ -1273,10 +1416,9 @@ Database::updateObject(const Ice::ObjectPrx& proxy) } int -Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects) +Database::addOrUpdateRegistryWellKnownObjects(const ObjectInfoSeq& objects) { Lock sync(*this); - for(;;) { try @@ -1300,15 +1442,13 @@ Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects) halt(_communicator, ex); } } - int serial = _objectObserverTopic->objectsAddedOrUpdated(objects); - return serial; + return _objectObserverTopic->wellKnownObjectsAddedOrUpdated(objects); } -void -Database::removeObjectsInDatabase(const ObjectInfoSeq& objects) +int +Database::removeRegistryWellKnownObjects(const ObjectInfoSeq& objects) { Lock sync(*this); - for(;;) { try @@ -1332,7 +1472,7 @@ Database::removeObjectsInDatabase(const ObjectInfoSeq& objects) halt(_communicator, ex); } } - _objectObserverTopic->objectsRemoved(objects); + return _objectObserverTopic->wellKnownObjectsRemoved(objects); } Ice::ObjectPrx @@ -1924,7 +2064,6 @@ Database::reload(const ApplicationHelper& oldApp, // Remove all the node descriptors. // const NodeDescriptorDict& oldNodes = oldApp.getInstance().nodes; - for(NodeDescriptorDict::const_iterator n = oldNodes.begin(); n != oldNodes.end(); ++n) { _nodeCache.get(n->first)->removeDescriptor(application); @@ -1970,9 +2109,10 @@ Database::reload(const ApplicationHelper& oldApp, } } -void -Database::saveApplication(const ApplicationInfo& info, const DatabaseConnectionPtr& connection) +Ice::Long +Database::saveApplication(const ApplicationInfo& info, const DatabaseConnectionPtr& connection, Ice::Long dbSerial) { + assert(dbSerial != 0 || _master); for(;;) { try @@ -1980,6 +2120,7 @@ Database::saveApplication(const ApplicationInfo& info, const DatabaseConnectionP ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); TransactionHolder txHolder(connection); applicationsWrapper->put(info.descriptor.name, info); + dbSerial = applicationsWrapper->updateSerial(dbSerial); txHolder.commit(); break; } @@ -1992,11 +2133,13 @@ Database::saveApplication(const ApplicationInfo& info, const DatabaseConnectionP halt(_communicator, ex); } } + return dbSerial; } -void -Database::removeApplication(const string& name, const DatabaseConnectionPtr& connection) +Ice::Long +Database::removeApplication(const string& name, const DatabaseConnectionPtr& connection, Ice::Long dbSerial) { + assert(dbSerial != 0 || _master); for(;;) { try @@ -2004,6 +2147,7 @@ Database::removeApplication(const string& name, const DatabaseConnectionPtr& con ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection); TransactionHolder txHolder(connection); applicationsWrapper->erase(name); + dbSerial = applicationsWrapper->updateSerial(dbSerial); txHolder.commit(); break; } @@ -2016,6 +2160,7 @@ Database::removeApplication(const string& name, const DatabaseConnectionPtr& con halt(_communicator, ex); } } + return dbSerial; } void @@ -2199,12 +2344,14 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update, const ApplicationHelper& previous, const ApplicationHelper& helper, AdminSessionI* /*session*/, - bool noRestart) + bool noRestart, + Ice::Long dbSerial) { const ApplicationDescriptor& newDesc = helper.getDefinition(); DatabaseConnectionPtr connection = _connectionPool->newConnection(); ServerEntrySeq entries; + int serial = 0; try { if(_master) @@ -2215,34 +2362,29 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update, Lock sync(*this); checkForUpdate(previous, helper, connection); reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1, noRestart); - } - catch(const DeploymentException&) - { - finishUpdating(update.descriptor.name); - throw; - } - - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); - int serial; - { - Lock sync(*this); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); ApplicationInfo info = oldApp; info.updateTime = update.updateTime; info.updateUser = update.updateUser; info.revision = update.revision; info.descriptor = newDesc; - saveApplication(info, connection); + dbSerial = saveApplication(info, connection, dbSerial); - ++_applicationSerial; - serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, update); + serial = _applicationObserverTopic->applicationUpdated(dbSerial, update); } - _applicationObserverTopic->waitForSyncedSubscribers(serial); + catch(const DeploymentException&) + { + finishUpdating(update.descriptor.name); + throw; + } + + _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for replicas to be updated. // // Mark the application as updated. All the replicas received the update so it's now safe - // for the nodes to start the servers. + // for the nodes to start servers. // { Lock sync(*this); @@ -2278,7 +2420,7 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update, ApplicationInfo info = oldApp; info.revision = update.revision + 1; - saveApplication(info, connection); + dbSerial = saveApplication(info, connection); reload(previous, helper, entries, info.uuid, info.revision, noRestart); newUpdate.updateTime = IceUtil::Time::now().toMilliSeconds(); @@ -2289,19 +2431,13 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update, vector::iterator p = find(_updating.begin(), _updating.end(), update.descriptor.name); assert(p != _updating.end()); p->unmarkUpdated(); - } - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow)); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); - int serial; - { - Lock sync(*this); - ++_applicationSerial; - serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, newUpdate); + serial = _applicationObserverTopic->applicationUpdated(dbSerial, newUpdate); } - _applicationObserverTopic->waitForSyncedSubscribers(serial); - + _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for subscriber to be updated. + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow)); finishUpdating(newDesc.name); throw ex; } -- cgit v1.2.3