diff options
author | Benoit Foucher <benoit@zeroc.com> | 2015-06-08 13:05:22 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2015-06-08 13:05:22 +0200 |
commit | b2538adea86a17fccbacf87814361f050c97ae7e (patch) | |
tree | db900e7f606ca8ce9d114f11cf3ac2e9cc2abaf4 /cpp/src/IceGrid/Database.cpp | |
parent | Fix IceSSL.CertFile property typo (diff) | |
download | ice-b2538adea86a17fccbacf87814361f050c97ae7e.tar.bz2 ice-b2538adea86a17fccbacf87814361f050c97ae7e.tar.xz ice-b2538adea86a17fccbacf87814361f050c97ae7e.zip |
Fixed ICE-6573 - IceGrid application update failure when updating server with allocatables and clients are waiting to allocate the allocatable
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 181 |
1 files changed, 94 insertions, 87 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 7a52d8715e3..c38dfea7ac0 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -112,12 +112,12 @@ filterAdapterInfos(const string& filter, { adapterIds.push_back(p->id); } - + for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q) { adapterIds = (*q)->filter(replicaGroupId, adapterIds, con, ctx); } - + vector<AdapterInfo> filteredAdpts; filteredAdpts.reserve(infos.size()); for(Ice::StringSeq::const_iterator q = adapterIds.begin(); q != adapterIds.end(); ++q) @@ -140,9 +140,9 @@ getSerial(const Freeze::ConnectionPtr& connection, const string& dbName) SerialsDict dict(connection, serialsDbName); // - // If a serial number is provided, juste update the serial number from the database, + // If a serial number is provided, juste update the serial number from the database, // otherwise if the serial is 0, we increment the serial from the database. - // + // SerialsDict::iterator p = dict.find(dbName); if(p == dict.end()) { @@ -163,9 +163,9 @@ updateSerial(const Freeze::ConnectionPtr& connection, const string& dbName, Ice: SerialsDict dict(connection, serialsDbName); // - // If a serial number is provided, juste update the serial number from the database, + // If a serial number is provided, juste update the serial number from the database, // otherwise if the serial is 0, we increment the serial from the database. - // + // SerialsDict::iterator p = dict.find(dbName); if(p == dict.end()) { @@ -215,7 +215,7 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _internalAdapter(registryAdapter), _topicManager(topicManager), _instanceName(instanceName), - _traceLevels(traceLevels), + _traceLevels(traceLevels), _master(info.name == "Master"), _readonly(readonly || !_master), _replicaCache(_communicator, topicManager), @@ -377,7 +377,7 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long halt(_communicator, ex); } } - + ServerEntrySeq entries; set<string> names; @@ -510,7 +510,7 @@ Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial) _objectObserverTopic->waitForSyncedSubscribers(serial); } -ApplicationInfoSeq +ApplicationInfoSeq Database::getApplications(Ice::Long& serial) const { for(;;) @@ -534,7 +534,7 @@ Database::getApplications(Ice::Long& serial) const } } -AdapterInfoSeq +AdapterInfoSeq Database::getAdapters(Ice::Long& serial) const { for(;;) @@ -558,7 +558,7 @@ Database::getAdapters(Ice::Long& serial) const } } -ObjectInfoSeq +ObjectInfoSeq Database::getObjects(Ice::Long& serial) const { for(;;) @@ -689,7 +689,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic } void -Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, AdminSessionI* session, +Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, AdminSessionI* session, Ice::Long dbSerial) { assert(dbSerial != 0 || _master); @@ -700,7 +700,7 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, A IceUtil::UniquePtr<ApplicationHelper> helper; try { - Lock sync(*this); + Lock sync(*this); checkSessionLock(session); waitForUpdate(update.descriptor.name); @@ -760,7 +760,7 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool n update.updateUser = _lockUserId; update.revision = oldApp.revision + 1; update.descriptor = helper->diff(*previous); - + startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1); } catch(const DatabaseException& ex) @@ -772,8 +772,8 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool n } void -Database::instantiateServer(const string& application, - const string& node, +Database::instantiateServer(const string& application, + const string& node, const ServerInstanceDescriptor& instance, AdminSessionI* session) { @@ -790,7 +790,7 @@ Database::instantiateServer(const string& application, checkSessionLock(session); waitForUpdate(application); - + StringApplicationInfoDict::const_iterator i = _applications.find(application); if(i == _applications.end()) { @@ -832,7 +832,7 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon waitForUpdate(name); ApplicationInfo appInfo; - + StringApplicationInfoDict::const_iterator i = _applications.find(name); if(i == _applications.end()) { @@ -855,7 +855,7 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon throw; } } - + dbSerial = removeApplication(name, _connection, dbSerial); startUpdating(name, appInfo.uuid, appInfo.revision); @@ -905,7 +905,7 @@ Database::getAllApplications(const string& expression) void Database::waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdatePtr& cb, - const string& uuid, + const string& uuid, int revision) { Lock sync(*this); @@ -957,7 +957,7 @@ Database::getServer(const string& id) const return _serverCache.get(id); } -AllocatableObjectCache& +AllocatableObjectCache& Database::getAllocatableObjectCache() { return _allocatableObjectCache; @@ -970,7 +970,7 @@ Database::getAllocatableObject(const Ice::Identity& id) const } void -Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy, +Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy, Ice::Long dbSerial) { assert(dbSerial != 0 || _master); @@ -1039,7 +1039,7 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr } out << " (serial = `" << dbSerial << "')"; } - + if(proxy) { if(updated) @@ -1151,7 +1151,7 @@ Database::removeAdapter(const string& adapterId) Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "' (serial = `" << dbSerial << "')"; } - + if(infos.empty()) { serial = _adapterObserverTopic->adapterRemoved(dbSerial, adapterId); @@ -1174,13 +1174,13 @@ Database::getAdapterProxy(const string& adapterId, const string& replicaGroupId, return _adapterCache.get(adapterId)->getProxy(replicaGroupId, upToDate); } -void -Database::getLocatorAdapterInfo(const string& id, +void +Database::getLocatorAdapterInfo(const string& id, const Ice::ConnectionPtr& connection, const Ice::Context& context, - LocatorAdapterInfoSeq& adpts, - int& count, - bool& replicaGroup, + LocatorAdapterInfoSeq& adpts, + int& count, + bool& replicaGroup, bool& roundRobin, const set<string>& excludes) { @@ -1189,7 +1189,7 @@ Database::getLocatorAdapterInfo(const string& id, Lock sync(*this); // Make sure this isn't call during an update. _adapterCache.get(id)->getLocatorAdapterInfo(adpts, count, replicaGroup, roundRobin, filter, excludes); } - + if(_pluginFacade->hasReplicaGroupFilters() && !adpts.empty()) { vector<ReplicaGroupFilterPtr> filters = _pluginFacade->getReplicaGroupFilters(filter); @@ -1200,12 +1200,12 @@ Database::getLocatorAdapterInfo(const string& id, { adapterIds.push_back(q->id); } - + for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q) { adapterIds = (*q)->filter(id, adapterIds, connection, context); } - + LocatorAdapterInfoSeq filteredAdpts; filteredAdpts.reserve(adpts.size()); for(Ice::StringSeq::const_iterator q = adapterIds.begin(); q != adapterIds.end(); ++q) @@ -1225,7 +1225,7 @@ Database::getLocatorAdapterInfo(const string& id, } bool -Database::addAdapterSyncCallback(const string& id, +Database::addAdapterSyncCallback(const string& id, const SynchronizationCallbackPtr& callback, const std::set<std::string>& excludes) { @@ -1291,7 +1291,7 @@ Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con ReplicaGroupEntryPtr replicaGroup; { Lock sync(*this); // Make sure this isn't call during an update. - + AdapterEntryPtr entry = _adapterCache.get(id); infos = entry->getAdapterInfo(); replicaGroup = ReplicaGroupEntryPtr::dynamicCast(entry); @@ -1423,14 +1423,14 @@ Database::addObject(const ObjectInfo& info) int serial = 0; { - Lock sync(*this); + Lock sync(*this); const Ice::Identity id = info.proxy->ice_getIdentity(); - + if(_objectCache.has(id)) { throw ObjectExistsException(id); } - + Ice::Long dbSerial = 0; for(;;) { @@ -1456,7 +1456,7 @@ Database::addObject(const ObjectInfo& info) halt(_communicator, ex); } } - + serial = _objectObserverTopic->objectAdded(dbSerial, info); if(_traceLevels->object > 0) @@ -1475,14 +1475,14 @@ Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial) int serial = 0; // Initialize to prevent warning. { - Lock sync(*this); + Lock sync(*this); const Ice::Identity id = info.proxy->ice_getIdentity(); - + if(_objectCache.has(id)) { throw ObjectExistsException(id); } - + bool update = false; for(;;) { @@ -1512,7 +1512,7 @@ Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial) halt(_communicator, ex); } } - + if(update) { serial = _objectObserverTopic->objectUpdated(dbSerial, info); @@ -1521,7 +1521,7 @@ Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial) { serial = _objectObserverTopic->objectAdded(dbSerial, info); } - + if(_traceLevels->object > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); @@ -1548,7 +1548,7 @@ Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial) ex.reason += "'"; throw ex; } - + for(;;) { try @@ -1561,8 +1561,8 @@ Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial) ex.id = id; throw ex; } - - _objects.erase(i); + + _objects.erase(i); dbSerial = updateSerial(_connection, objectsDbName, dbSerial); txHolder.commit(); break; @@ -1578,7 +1578,7 @@ Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial) } serial = _objectObserverTopic->objectRemoved(dbSerial, id); - + if(_traceLevels->object > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); @@ -1595,8 +1595,8 @@ Database::updateObject(const Ice::ObjectPrx& proxy) int serial = 0; { - Lock sync(*this); - + Lock sync(*this); + const Ice::Identity id = proxy->ice_getIdentity(); if(_objectCache.has(id)) { @@ -1607,7 +1607,7 @@ Database::updateObject(const Ice::ObjectPrx& proxy) ex.reason += "'"; throw ex; } - + ObjectInfo info; Ice::Long dbSerial = 0; for(;;) @@ -1615,7 +1615,7 @@ Database::updateObject(const Ice::ObjectPrx& proxy) try { TransactionHolder txHolder(_connection); - IdentityObjectInfoDict::iterator i = _objects.find(id); + IdentityObjectInfoDict::iterator i = _objects.find(id); if(i == _objects.end()) { ObjectNotRegisteredException ex; @@ -1638,7 +1638,7 @@ Database::updateObject(const Ice::ObjectPrx& proxy) halt(_communicator, ex); } } - + serial = _objectObserverTopic->objectUpdated(dbSerial, info); if(_traceLevels->object > 0) { @@ -1743,7 +1743,7 @@ Database::getObjectByType(const string& type, const Ice::ConnectionPtr& con, con } Ice::ObjectPrx -Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample, const Ice::ConnectionPtr& con, +Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample, const Ice::ConnectionPtr& con, const Ice::Context& ctx) { Ice::ObjectProxySeq objs = getObjectsByType(type, con, ctx); @@ -1969,10 +1969,10 @@ Database::checkForAddition(const ApplicationHelper& app, const ConnectionPtr& co app.getReplicaGroups(repGrps, adptRepGrps); for_each(adptRepGrps.begin(), adptRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupExists)); } - + void -Database::checkForUpdate(const ApplicationHelper& origApp, - const ApplicationHelper& newApp, +Database::checkForUpdate(const ApplicationHelper& origApp, + const ApplicationHelper& newApp, const ConnectionPtr& connection) { set<string> oldSvrs, newSvrs; @@ -1982,7 +1982,7 @@ Database::checkForUpdate(const ApplicationHelper& origApp, origApp.getIds(oldSvrs, oldAdpts, oldObjs); newApp.getIds(newSvrs, newAdpts, newObjs); - Ice::StringSeq addedSvrs; + Ice::StringSeq addedSvrs; set_difference(newSvrs.begin(), newSvrs.end(), oldSvrs.begin(), oldSvrs.end(), back_inserter(addedSvrs)); for_each(addedSvrs.begin(), addedSvrs.end(), objFunc(*this, &Database::checkServerForAddition)); @@ -2012,7 +2012,7 @@ Database::checkForUpdate(const ApplicationHelper& origApp, set<string> oldAdptRepGrps, newAdptRepGrps; origApp.getReplicaGroups(oldRepGrps, oldAdptRepGrps); newApp.getReplicaGroups(newRepGrps, newAdptRepGrps); - + set<string> rmRepGrps; set_difference(oldRepGrps.begin(), oldRepGrps.end(), newRepGrps.begin(),newRepGrps.end(), set_inserter(rmRepGrps)); for_each(rmRepGrps.begin(), rmRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupForRemove)); @@ -2023,7 +2023,7 @@ Database::checkForUpdate(const ApplicationHelper& origApp, for_each(addedAdptRepGrps.begin(), addedAdptRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupExists)); vector<string> invalidAdptRepGrps; - set_intersection(rmRepGrps.begin(), rmRepGrps.end(), newAdptRepGrps.begin(), newAdptRepGrps.end(), + set_intersection(rmRepGrps.begin(), rmRepGrps.end(), newAdptRepGrps.begin(), newAdptRepGrps.end(), back_inserter(invalidAdptRepGrps)); if(!invalidAdptRepGrps.empty()) { @@ -2048,7 +2048,7 @@ Database::checkServerForAddition(const string& id) if(_serverCache.has(id)) { DeploymentException ex; - ex.reason = "server `" + id + "' is already registered"; + ex.reason = "server `" + id + "' is already registered"; throw ex; } } @@ -2080,7 +2080,7 @@ Database::checkAdapterForAddition(const string& id, const StringAdapterInfoDict& if(found) { DeploymentException ex; - ex.reason = "adapter `" + id + "' is already registered"; + ex.reason = "adapter `" + id + "' is already registered"; throw ex; } } @@ -2105,7 +2105,7 @@ Database::checkObjectForAddition(const Ice::Identity& objectId, const IdentityOb if(found) { DeploymentException ex; - ex.reason = "object `" + _communicator->identityToString(objectId) + "' is already registered"; + ex.reason = "object `" + _communicator->identityToString(objectId) + "' is already registered"; throw ex; } } @@ -2146,16 +2146,16 @@ Database::checkReplicaGroupForRemove(const string& replicaGroup) { // // This would indicate an inconsistency with the cache and - // database. We don't print an error, it will be printed + // database. We don't print an error, it will be printed // when the application is actually removed. // return; } - + if(entry->hasAdaptersFromOtherApplications()) { DeploymentException ex; - ex.reason = "couldn't remove application because the replica group `" + replicaGroup + + ex.reason = "couldn't remove application because the replica group `" + replicaGroup + "' is used by object adapters from other applications."; throw ex; } @@ -2168,7 +2168,7 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries, const stri const string application = app.getInstance().name; for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n) { - _nodeCache.get(n->first, true)->addDescriptor(application, n->second); + _nodeCache.get(n->first, true)->addDescriptor(application, n->second); } const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups; @@ -2185,7 +2185,7 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries, const stri map<string, ServerInfo> servers = app.getServerInfos(uuid, revision); for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p) { - entries.push_back(_serverCache.add(p->second, false)); + entries.push_back(_serverCache.add(p->second)); } } @@ -2195,7 +2195,7 @@ Database::unload(const ApplicationHelper& app, ServerEntrySeq& entries) map<string, ServerInfo> servers = app.getServerInfos("", 0); for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p) { - entries.push_back(_serverCache.remove(p->first)); + entries.push_back(_serverCache.remove(p->first, false)); } const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups; @@ -2217,11 +2217,11 @@ Database::unload(const ApplicationHelper& app, ServerEntrySeq& entries) } void -Database::reload(const ApplicationHelper& oldApp, - const ApplicationHelper& newApp, - ServerEntrySeq& entries, - const string& uuid, - int revision, +Database::reload(const ApplicationHelper& oldApp, + const ApplicationHelper& newApp, + ServerEntrySeq& entries, + const string& uuid, + int revision, bool noRestart) { const string application = oldApp.getInstance().name; @@ -2231,18 +2231,18 @@ Database::reload(const ApplicationHelper& oldApp, // map<string, ServerInfo> oldServers = oldApp.getServerInfos(uuid, revision); map<string, ServerInfo> newServers = newApp.getServerInfos(uuid, revision); - vector<ServerInfo> load; + vector<pair<bool, ServerInfo> > load; for(map<string, ServerInfo>::const_iterator p = newServers.begin(); p != newServers.end(); ++p) { map<string, ServerInfo>::const_iterator q = oldServers.find(p->first); if(q == oldServers.end()) { - load.push_back(p->second); - } + load.push_back(make_pair(false, p->second)); + } else if(isServerUpdated(p->second, q->second)) { - _serverCache.remove(p->first, false); // Don't destroy the server if it was updated. - load.push_back(p->second); + _serverCache.preUpdate(p->second, noRestart); + load.push_back(make_pair(true, p->second)); } else { @@ -2256,7 +2256,7 @@ Database::reload(const ApplicationHelper& oldApp, map<string, ServerInfo>::const_iterator q = newServers.find(p->first); if(q == newServers.end()) { - entries.push_back(_serverCache.remove(p->first, true, noRestart)); + entries.push_back(_serverCache.remove(p->first, noRestart)); } } @@ -2328,9 +2328,16 @@ Database::reload(const ApplicationHelper& oldApp, // // Add back servers. // - for(vector<ServerInfo>::const_iterator q = load.begin(); q != load.end(); ++q) + for(vector<pair<bool, ServerInfo> >::const_iterator q = load.begin(); q != load.end(); ++q) { - entries.push_back(_serverCache.add(*q, noRestart)); + if(q->first) // Update + { + entries.push_back(_serverCache.postUpdate(q->second, noRestart)); + } + else + { + entries.push_back(_serverCache.add(q->second)); + } } } @@ -2357,7 +2364,7 @@ Database::saveApplication(const ApplicationInfo& info, const ConnectionPtr& conn { halt(_communicator, ex); } - } + } return dbSerial; } @@ -2384,7 +2391,7 @@ Database::removeApplication(const string& name, const ConnectionPtr& connection, { halt(_communicator, ex); } - } + } return dbSerial; } @@ -2441,8 +2448,8 @@ Database::checkUpdate(const ApplicationHelper& oldApp, map<string, ServerInfo>::const_iterator q = oldServers.find(p->first); if(q != oldServers.end() && isServerUpdated(p->second, q->second)) { - if(noRestart && - p->second.node == q->second.node && + if(noRestart && + p->second.node == q->second.node && isServerUpdated(p->second, q->second, true)) // Ignore properties { // @@ -2584,7 +2591,7 @@ Database::checkUpdate(const ApplicationHelper& oldApp, void Database::finishApplicationUpdate(const ApplicationUpdateInfo& update, const ApplicationInfo& oldApp, - const ApplicationHelper& previous, + const ApplicationHelper& previous, const ApplicationHelper& helper, AdminSessionI* /*session*/, bool noRestart, @@ -2670,7 +2677,7 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update, newUpdate.updateUser = _lockUserId; newUpdate.revision = info.revision; newUpdate.descriptor = helper.diff(previous); - + vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), update.descriptor.name); assert(p != _updating.end()); p->unmarkUpdated(); |