summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2015-06-08 13:05:22 +0200
committerBenoit Foucher <benoit@zeroc.com>2015-06-08 13:05:22 +0200
commitb2538adea86a17fccbacf87814361f050c97ae7e (patch)
treedb900e7f606ca8ce9d114f11cf3ac2e9cc2abaf4 /cpp/src/IceGrid/Database.cpp
parentFix IceSSL.CertFile property typo (diff)
downloadice-b2538adea86a17fccbacf87814361f050c97ae7e.tar.bz2
ice-b2538adea86a17fccbacf87814361f050c97ae7e.tar.xz
ice-b2538adea86a17fccbacf87814361f050c97ae7e.zip
Fixed ICE-6573 - IceGrid application update failure when updating server with allocatables and clients are waiting to allocate the allocatable
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp181
1 files changed, 94 insertions, 87 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 7a52d8715e3..c38dfea7ac0 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -112,12 +112,12 @@ filterAdapterInfos(const string& filter,
{
adapterIds.push_back(p->id);
}
-
+
for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q)
{
adapterIds = (*q)->filter(replicaGroupId, adapterIds, con, ctx);
}
-
+
vector<AdapterInfo> filteredAdpts;
filteredAdpts.reserve(infos.size());
for(Ice::StringSeq::const_iterator q = adapterIds.begin(); q != adapterIds.end(); ++q)
@@ -140,9 +140,9 @@ getSerial(const Freeze::ConnectionPtr& connection, const string& dbName)
SerialsDict dict(connection, serialsDbName);
//
- // If a serial number is provided, juste update the serial number from the database,
+ // If a serial number is provided, juste update the serial number from the database,
// otherwise if the serial is 0, we increment the serial from the database.
- //
+ //
SerialsDict::iterator p = dict.find(dbName);
if(p == dict.end())
{
@@ -163,9 +163,9 @@ updateSerial(const Freeze::ConnectionPtr& connection, const string& dbName, Ice:
SerialsDict dict(connection, serialsDbName);
//
- // If a serial number is provided, juste update the serial number from the database,
+ // If a serial number is provided, juste update the serial number from the database,
// otherwise if the serial is 0, we increment the serial from the database.
- //
+ //
SerialsDict::iterator p = dict.find(dbName);
if(p == dict.end())
{
@@ -215,7 +215,7 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_internalAdapter(registryAdapter),
_topicManager(topicManager),
_instanceName(instanceName),
- _traceLevels(traceLevels),
+ _traceLevels(traceLevels),
_master(info.name == "Master"),
_readonly(readonly || !_master),
_replicaCache(_communicator, topicManager),
@@ -377,7 +377,7 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long
halt(_communicator, ex);
}
}
-
+
ServerEntrySeq entries;
set<string> names;
@@ -510,7 +510,7 @@ Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial)
_objectObserverTopic->waitForSyncedSubscribers(serial);
}
-ApplicationInfoSeq
+ApplicationInfoSeq
Database::getApplications(Ice::Long& serial) const
{
for(;;)
@@ -534,7 +534,7 @@ Database::getApplications(Ice::Long& serial) const
}
}
-AdapterInfoSeq
+AdapterInfoSeq
Database::getAdapters(Ice::Long& serial) const
{
for(;;)
@@ -558,7 +558,7 @@ Database::getAdapters(Ice::Long& serial) const
}
}
-ObjectInfoSeq
+ObjectInfoSeq
Database::getObjects(Ice::Long& serial) const
{
for(;;)
@@ -689,7 +689,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ic
}
void
-Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, AdminSessionI* session,
+Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, AdminSessionI* session,
Ice::Long dbSerial)
{
assert(dbSerial != 0 || _master);
@@ -700,7 +700,7 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, A
IceUtil::UniquePtr<ApplicationHelper> helper;
try
{
- Lock sync(*this);
+ Lock sync(*this);
checkSessionLock(session);
waitForUpdate(update.descriptor.name);
@@ -760,7 +760,7 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool n
update.updateUser = _lockUserId;
update.revision = oldApp.revision + 1;
update.descriptor = helper->diff(*previous);
-
+
startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
}
catch(const DatabaseException& ex)
@@ -772,8 +772,8 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool n
}
void
-Database::instantiateServer(const string& application,
- const string& node,
+Database::instantiateServer(const string& application,
+ const string& node,
const ServerInstanceDescriptor& instance,
AdminSessionI* session)
{
@@ -790,7 +790,7 @@ Database::instantiateServer(const string& application,
checkSessionLock(session);
waitForUpdate(application);
-
+
StringApplicationInfoDict::const_iterator i = _applications.find(application);
if(i == _applications.end())
{
@@ -832,7 +832,7 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon
waitForUpdate(name);
ApplicationInfo appInfo;
-
+
StringApplicationInfoDict::const_iterator i = _applications.find(name);
if(i == _applications.end())
{
@@ -855,7 +855,7 @@ Database::removeApplication(const string& name, AdminSessionI* session, Ice::Lon
throw;
}
}
-
+
dbSerial = removeApplication(name, _connection, dbSerial);
startUpdating(name, appInfo.uuid, appInfo.revision);
@@ -905,7 +905,7 @@ Database::getAllApplications(const string& expression)
void
Database::waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdatePtr& cb,
- const string& uuid,
+ const string& uuid,
int revision)
{
Lock sync(*this);
@@ -957,7 +957,7 @@ Database::getServer(const string& id) const
return _serverCache.get(id);
}
-AllocatableObjectCache&
+AllocatableObjectCache&
Database::getAllocatableObjectCache()
{
return _allocatableObjectCache;
@@ -970,7 +970,7 @@ Database::getAllocatableObject(const Ice::Identity& id) const
}
void
-Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy,
+Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy,
Ice::Long dbSerial)
{
assert(dbSerial != 0 || _master);
@@ -1039,7 +1039,7 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
}
out << " (serial = `" << dbSerial << "')";
}
-
+
if(proxy)
{
if(updated)
@@ -1151,7 +1151,7 @@ Database::removeAdapter(const string& adapterId)
Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "' (serial = `" << dbSerial << "')";
}
-
+
if(infos.empty())
{
serial = _adapterObserverTopic->adapterRemoved(dbSerial, adapterId);
@@ -1174,13 +1174,13 @@ Database::getAdapterProxy(const string& adapterId, const string& replicaGroupId,
return _adapterCache.get(adapterId)->getProxy(replicaGroupId, upToDate);
}
-void
-Database::getLocatorAdapterInfo(const string& id,
+void
+Database::getLocatorAdapterInfo(const string& id,
const Ice::ConnectionPtr& connection,
const Ice::Context& context,
- LocatorAdapterInfoSeq& adpts,
- int& count,
- bool& replicaGroup,
+ LocatorAdapterInfoSeq& adpts,
+ int& count,
+ bool& replicaGroup,
bool& roundRobin,
const set<string>& excludes)
{
@@ -1189,7 +1189,7 @@ Database::getLocatorAdapterInfo(const string& id,
Lock sync(*this); // Make sure this isn't call during an update.
_adapterCache.get(id)->getLocatorAdapterInfo(adpts, count, replicaGroup, roundRobin, filter, excludes);
}
-
+
if(_pluginFacade->hasReplicaGroupFilters() && !adpts.empty())
{
vector<ReplicaGroupFilterPtr> filters = _pluginFacade->getReplicaGroupFilters(filter);
@@ -1200,12 +1200,12 @@ Database::getLocatorAdapterInfo(const string& id,
{
adapterIds.push_back(q->id);
}
-
+
for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q)
{
adapterIds = (*q)->filter(id, adapterIds, connection, context);
}
-
+
LocatorAdapterInfoSeq filteredAdpts;
filteredAdpts.reserve(adpts.size());
for(Ice::StringSeq::const_iterator q = adapterIds.begin(); q != adapterIds.end(); ++q)
@@ -1225,7 +1225,7 @@ Database::getLocatorAdapterInfo(const string& id,
}
bool
-Database::addAdapterSyncCallback(const string& id,
+Database::addAdapterSyncCallback(const string& id,
const SynchronizationCallbackPtr& callback,
const std::set<std::string>& excludes)
{
@@ -1291,7 +1291,7 @@ Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con
ReplicaGroupEntryPtr replicaGroup;
{
Lock sync(*this); // Make sure this isn't call during an update.
-
+
AdapterEntryPtr entry = _adapterCache.get(id);
infos = entry->getAdapterInfo();
replicaGroup = ReplicaGroupEntryPtr::dynamicCast(entry);
@@ -1423,14 +1423,14 @@ Database::addObject(const ObjectInfo& info)
int serial = 0;
{
- Lock sync(*this);
+ Lock sync(*this);
const Ice::Identity id = info.proxy->ice_getIdentity();
-
+
if(_objectCache.has(id))
{
throw ObjectExistsException(id);
}
-
+
Ice::Long dbSerial = 0;
for(;;)
{
@@ -1456,7 +1456,7 @@ Database::addObject(const ObjectInfo& info)
halt(_communicator, ex);
}
}
-
+
serial = _objectObserverTopic->objectAdded(dbSerial, info);
if(_traceLevels->object > 0)
@@ -1475,14 +1475,14 @@ Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial)
int serial = 0; // Initialize to prevent warning.
{
- Lock sync(*this);
+ Lock sync(*this);
const Ice::Identity id = info.proxy->ice_getIdentity();
-
+
if(_objectCache.has(id))
{
throw ObjectExistsException(id);
}
-
+
bool update = false;
for(;;)
{
@@ -1512,7 +1512,7 @@ Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial)
halt(_communicator, ex);
}
}
-
+
if(update)
{
serial = _objectObserverTopic->objectUpdated(dbSerial, info);
@@ -1521,7 +1521,7 @@ Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial)
{
serial = _objectObserverTopic->objectAdded(dbSerial, info);
}
-
+
if(_traceLevels->object > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
@@ -1548,7 +1548,7 @@ Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial)
ex.reason += "'";
throw ex;
}
-
+
for(;;)
{
try
@@ -1561,8 +1561,8 @@ Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial)
ex.id = id;
throw ex;
}
-
- _objects.erase(i);
+
+ _objects.erase(i);
dbSerial = updateSerial(_connection, objectsDbName, dbSerial);
txHolder.commit();
break;
@@ -1578,7 +1578,7 @@ Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial)
}
serial = _objectObserverTopic->objectRemoved(dbSerial, id);
-
+
if(_traceLevels->object > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
@@ -1595,8 +1595,8 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
int serial = 0;
{
- Lock sync(*this);
-
+ Lock sync(*this);
+
const Ice::Identity id = proxy->ice_getIdentity();
if(_objectCache.has(id))
{
@@ -1607,7 +1607,7 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
ex.reason += "'";
throw ex;
}
-
+
ObjectInfo info;
Ice::Long dbSerial = 0;
for(;;)
@@ -1615,7 +1615,7 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
try
{
TransactionHolder txHolder(_connection);
- IdentityObjectInfoDict::iterator i = _objects.find(id);
+ IdentityObjectInfoDict::iterator i = _objects.find(id);
if(i == _objects.end())
{
ObjectNotRegisteredException ex;
@@ -1638,7 +1638,7 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
halt(_communicator, ex);
}
}
-
+
serial = _objectObserverTopic->objectUpdated(dbSerial, info);
if(_traceLevels->object > 0)
{
@@ -1743,7 +1743,7 @@ Database::getObjectByType(const string& type, const Ice::ConnectionPtr& con, con
}
Ice::ObjectPrx
-Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample, const Ice::ConnectionPtr& con,
+Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample, const Ice::ConnectionPtr& con,
const Ice::Context& ctx)
{
Ice::ObjectProxySeq objs = getObjectsByType(type, con, ctx);
@@ -1969,10 +1969,10 @@ Database::checkForAddition(const ApplicationHelper& app, const ConnectionPtr& co
app.getReplicaGroups(repGrps, adptRepGrps);
for_each(adptRepGrps.begin(), adptRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupExists));
}
-
+
void
-Database::checkForUpdate(const ApplicationHelper& origApp,
- const ApplicationHelper& newApp,
+Database::checkForUpdate(const ApplicationHelper& origApp,
+ const ApplicationHelper& newApp,
const ConnectionPtr& connection)
{
set<string> oldSvrs, newSvrs;
@@ -1982,7 +1982,7 @@ Database::checkForUpdate(const ApplicationHelper& origApp,
origApp.getIds(oldSvrs, oldAdpts, oldObjs);
newApp.getIds(newSvrs, newAdpts, newObjs);
- Ice::StringSeq addedSvrs;
+ Ice::StringSeq addedSvrs;
set_difference(newSvrs.begin(), newSvrs.end(), oldSvrs.begin(), oldSvrs.end(), back_inserter(addedSvrs));
for_each(addedSvrs.begin(), addedSvrs.end(), objFunc(*this, &Database::checkServerForAddition));
@@ -2012,7 +2012,7 @@ Database::checkForUpdate(const ApplicationHelper& origApp,
set<string> oldAdptRepGrps, newAdptRepGrps;
origApp.getReplicaGroups(oldRepGrps, oldAdptRepGrps);
newApp.getReplicaGroups(newRepGrps, newAdptRepGrps);
-
+
set<string> rmRepGrps;
set_difference(oldRepGrps.begin(), oldRepGrps.end(), newRepGrps.begin(),newRepGrps.end(), set_inserter(rmRepGrps));
for_each(rmRepGrps.begin(), rmRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupForRemove));
@@ -2023,7 +2023,7 @@ Database::checkForUpdate(const ApplicationHelper& origApp,
for_each(addedAdptRepGrps.begin(), addedAdptRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupExists));
vector<string> invalidAdptRepGrps;
- set_intersection(rmRepGrps.begin(), rmRepGrps.end(), newAdptRepGrps.begin(), newAdptRepGrps.end(),
+ set_intersection(rmRepGrps.begin(), rmRepGrps.end(), newAdptRepGrps.begin(), newAdptRepGrps.end(),
back_inserter(invalidAdptRepGrps));
if(!invalidAdptRepGrps.empty())
{
@@ -2048,7 +2048,7 @@ Database::checkServerForAddition(const string& id)
if(_serverCache.has(id))
{
DeploymentException ex;
- ex.reason = "server `" + id + "' is already registered";
+ ex.reason = "server `" + id + "' is already registered";
throw ex;
}
}
@@ -2080,7 +2080,7 @@ Database::checkAdapterForAddition(const string& id, const StringAdapterInfoDict&
if(found)
{
DeploymentException ex;
- ex.reason = "adapter `" + id + "' is already registered";
+ ex.reason = "adapter `" + id + "' is already registered";
throw ex;
}
}
@@ -2105,7 +2105,7 @@ Database::checkObjectForAddition(const Ice::Identity& objectId, const IdentityOb
if(found)
{
DeploymentException ex;
- ex.reason = "object `" + _communicator->identityToString(objectId) + "' is already registered";
+ ex.reason = "object `" + _communicator->identityToString(objectId) + "' is already registered";
throw ex;
}
}
@@ -2146,16 +2146,16 @@ Database::checkReplicaGroupForRemove(const string& replicaGroup)
{
//
// This would indicate an inconsistency with the cache and
- // database. We don't print an error, it will be printed
+ // database. We don't print an error, it will be printed
// when the application is actually removed.
//
return;
}
-
+
if(entry->hasAdaptersFromOtherApplications())
{
DeploymentException ex;
- ex.reason = "couldn't remove application because the replica group `" + replicaGroup +
+ ex.reason = "couldn't remove application because the replica group `" + replicaGroup +
"' is used by object adapters from other applications.";
throw ex;
}
@@ -2168,7 +2168,7 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries, const stri
const string application = app.getInstance().name;
for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n)
{
- _nodeCache.get(n->first, true)->addDescriptor(application, n->second);
+ _nodeCache.get(n->first, true)->addDescriptor(application, n->second);
}
const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups;
@@ -2185,7 +2185,7 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries, const stri
map<string, ServerInfo> servers = app.getServerInfos(uuid, revision);
for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p)
{
- entries.push_back(_serverCache.add(p->second, false));
+ entries.push_back(_serverCache.add(p->second));
}
}
@@ -2195,7 +2195,7 @@ Database::unload(const ApplicationHelper& app, ServerEntrySeq& entries)
map<string, ServerInfo> servers = app.getServerInfos("", 0);
for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p)
{
- entries.push_back(_serverCache.remove(p->first));
+ entries.push_back(_serverCache.remove(p->first, false));
}
const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups;
@@ -2217,11 +2217,11 @@ Database::unload(const ApplicationHelper& app, ServerEntrySeq& entries)
}
void
-Database::reload(const ApplicationHelper& oldApp,
- const ApplicationHelper& newApp,
- ServerEntrySeq& entries,
- const string& uuid,
- int revision,
+Database::reload(const ApplicationHelper& oldApp,
+ const ApplicationHelper& newApp,
+ ServerEntrySeq& entries,
+ const string& uuid,
+ int revision,
bool noRestart)
{
const string application = oldApp.getInstance().name;
@@ -2231,18 +2231,18 @@ Database::reload(const ApplicationHelper& oldApp,
//
map<string, ServerInfo> oldServers = oldApp.getServerInfos(uuid, revision);
map<string, ServerInfo> newServers = newApp.getServerInfos(uuid, revision);
- vector<ServerInfo> load;
+ vector<pair<bool, ServerInfo> > load;
for(map<string, ServerInfo>::const_iterator p = newServers.begin(); p != newServers.end(); ++p)
{
map<string, ServerInfo>::const_iterator q = oldServers.find(p->first);
if(q == oldServers.end())
{
- load.push_back(p->second);
- }
+ load.push_back(make_pair(false, p->second));
+ }
else if(isServerUpdated(p->second, q->second))
{
- _serverCache.remove(p->first, false); // Don't destroy the server if it was updated.
- load.push_back(p->second);
+ _serverCache.preUpdate(p->second, noRestart);
+ load.push_back(make_pair(true, p->second));
}
else
{
@@ -2256,7 +2256,7 @@ Database::reload(const ApplicationHelper& oldApp,
map<string, ServerInfo>::const_iterator q = newServers.find(p->first);
if(q == newServers.end())
{
- entries.push_back(_serverCache.remove(p->first, true, noRestart));
+ entries.push_back(_serverCache.remove(p->first, noRestart));
}
}
@@ -2328,9 +2328,16 @@ Database::reload(const ApplicationHelper& oldApp,
//
// Add back servers.
//
- for(vector<ServerInfo>::const_iterator q = load.begin(); q != load.end(); ++q)
+ for(vector<pair<bool, ServerInfo> >::const_iterator q = load.begin(); q != load.end(); ++q)
{
- entries.push_back(_serverCache.add(*q, noRestart));
+ if(q->first) // Update
+ {
+ entries.push_back(_serverCache.postUpdate(q->second, noRestart));
+ }
+ else
+ {
+ entries.push_back(_serverCache.add(q->second));
+ }
}
}
@@ -2357,7 +2364,7 @@ Database::saveApplication(const ApplicationInfo& info, const ConnectionPtr& conn
{
halt(_communicator, ex);
}
- }
+ }
return dbSerial;
}
@@ -2384,7 +2391,7 @@ Database::removeApplication(const string& name, const ConnectionPtr& connection,
{
halt(_communicator, ex);
}
- }
+ }
return dbSerial;
}
@@ -2441,8 +2448,8 @@ Database::checkUpdate(const ApplicationHelper& oldApp,
map<string, ServerInfo>::const_iterator q = oldServers.find(p->first);
if(q != oldServers.end() && isServerUpdated(p->second, q->second))
{
- if(noRestart &&
- p->second.node == q->second.node &&
+ if(noRestart &&
+ p->second.node == q->second.node &&
isServerUpdated(p->second, q->second, true)) // Ignore properties
{
//
@@ -2584,7 +2591,7 @@ Database::checkUpdate(const ApplicationHelper& oldApp,
void
Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
const ApplicationInfo& oldApp,
- const ApplicationHelper& previous,
+ const ApplicationHelper& previous,
const ApplicationHelper& helper,
AdminSessionI* /*session*/,
bool noRestart,
@@ -2670,7 +2677,7 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
newUpdate.updateUser = _lockUserId;
newUpdate.revision = info.revision;
newUpdate.descriptor = helper.diff(previous);
-
+
vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), update.descriptor.name);
assert(p != _updating.end());
p->unmarkUpdated();