diff options
author | Benoit Foucher <benoit@zeroc.com> | 2010-02-19 18:45:14 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2010-02-19 18:45:14 +0100 |
commit | ea9a8bcfc079175205766491ab217e65536a656f (patch) | |
tree | 8dd6cd069f9966bd59d72587052a42195d45cf63 /cpp/src/IceGrid/Database.cpp | |
parent | added txt to ICE_LICENSE (diff) | |
download | ice-ea9a8bcfc079175205766491ab217e65536a656f.tar.bz2 ice-ea9a8bcfc079175205766491ab217e65536a656f.tar.xz ice-ea9a8bcfc079175205766491ab217e65536a656f.zip |
Fixed bug 4677 - IceGrid update hang
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 124 |
1 files changed, 77 insertions, 47 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 5f74ff08031..9ea5c095520 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -409,9 +409,20 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) if(_master) { + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); try { - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait)); + for(ServerEntrySeq::const_iterator p = entries.begin(); p != entries.end(); ++p) + { + try + { + (*p)->waitForSync(); + } + catch(const NodeUnreachableException&) + { + // Ignore. + } + } } catch(const DeploymentException& ex) { @@ -680,7 +691,7 @@ Database::removeApplication(const string& name, AdminSessionI* session) if(_master) { for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitNoThrow)); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow)); } { @@ -731,7 +742,7 @@ Database::waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdat Lock sync(*this); vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), make_pair(uuid, revision)); - if(p != _updating.end()) + if(p != _updating.end() && !p->updated) { p->cbs.push_back(cb); } @@ -977,7 +988,7 @@ Database::removeAdapter(const string& adapterId) AdapterPrx Database::getAdapterProxy(const string& adapterId, const string& replicaGroupId, bool upToDate) { - Lock sync(*this); // make sure this isn't call during an update. + Lock sync(*this); // Make sure this isn't call during an update. return _adapterCache.get(adapterId)->getProxy(replicaGroupId, upToDate); } @@ -993,6 +1004,13 @@ Database::getLocatorAdapterInfo(const string& id, _adapterCache.get(id)->getLocatorAdapterInfo(adpts, count, replicaGroup, roundRobin, excludes); } +bool +Database::addAdapterSyncCallback(const string& id, const SynchronizationCallbackPtr& callback) +{ + Lock sync(*this); // Make sure this isn't call during an update. + return _adapterCache.get(id)->addSyncCallback(callback); +} + AdapterInfoSeq Database::getAdapterInfo(const string& id) { @@ -1036,7 +1054,6 @@ Database::getAdapterInfo(const string& id) return infos; } - Ice::StringSeq Database::getAllAdapters(const string& expression) { @@ -2068,15 +2085,42 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries, const ApplicationDescriptor& newDesc, AdminSessionI* session) { + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + + int serial; + { + Lock sync(*this); + ++_applicationSerial; + serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, update); + } + _applicationObserverTopic->waitForSyncedSubscribers(serial); + + // + // Mark the application as updated. All the replicas received the update so it's now safe + // for the nodes to start the servers. + // + { + Lock sync(*this); + vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), update.descriptor.name); + assert(p != _updating.end()); + p->markUpdated(); + } + if(_master) { - // - // Load the servers on the nodes. If a server couldn't be - // deployed we unload the application and throw. - // try { - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait)); + for(ServerEntrySeq::const_iterator p = entries.begin(); p != entries.end(); ++p) + { + try + { + (*p)->waitForSync(); + } + catch(const NodeUnreachableException&) + { + // Ignore. + } + } } catch(const DeploymentException& ex) { @@ -2086,47 +2130,37 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries, entries.clear(); ApplicationHelper previous(_communicator, newDesc); ApplicationHelper helper(_communicator, oldApp.descriptor); - saveApplication(oldApp, _databaseCache->getConnection()); - reload(previous, helper, entries, oldApp.uuid, oldApp.revision); - } - try - { - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait)); + ApplicationInfo info = oldApp; + info.revision = update.revision + 1; + saveApplication(info, _databaseCache->getConnection()); + reload(previous, helper, entries, info.uuid, info.revision); + + newUpdate.updateTime = IceUtil::Time::now().toMilliSeconds(); + newUpdate.updateUser = _lockUserId; + newUpdate.revision = info.revision; + newUpdate.descriptor = helper.diff(previous); + + vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), update.descriptor.name); + assert(p != _updating.end()); + p->unmarkUpdated(); } - catch(const DeploymentException& ex) + + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow)); + + int serial; { - Ice::Error err(_traceLevels->logger); - err << "failed to rollback previous application `" << oldApp.descriptor.name << "':\n" << ex.reason; + Lock sync(*this); + ++_applicationSerial; + serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, newUpdate); } + _applicationObserverTopic->waitForSyncedSubscribers(serial); finishUpdating(newDesc.name); throw ex; } } - else - { - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); - } - - // - // Save the application descriptor. - // - int serial; - { - Lock sync(*this); - ++_applicationSerial; - - if(_traceLevels->application > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); - out << "updated application `" << update.descriptor.name << "'"; - } - - serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, update); - } - - _applicationObserverTopic->waitForSyncedSubscribers(serial); finishUpdating(update.descriptor.name); } @@ -2155,11 +2189,7 @@ Database::finishUpdating(const string& name) vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), name); assert(p != _updating.end()); - for(vector<AMD_NodeSession_waitForApplicationUpdatePtr>::const_iterator q = p->cbs.begin(); q != p->cbs.end(); ++q) - { - (*q)->ice_response(); - } + p->markUpdated(); _updating.erase(p); - notifyAll(); } |