summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2013-09-03 15:42:19 +0200
committerBenoit Foucher <benoit@zeroc.com>2013-09-03 15:42:19 +0200
commit91f6ebb998532b36fc70187b641a5b7404060422 (patch)
treeac88e961c68e4b09eb819f4b57b9ecac56854567 /cpp/src/IceGrid/Database.cpp
parentICE-5378 - Remove slice35d.dll from Windows installer (diff)
downloadice-91f6ebb998532b36fc70187b641a5b7404060422.tar.bz2
ice-91f6ebb998532b36fc70187b641a5b7404060422.tar.xz
ice-91f6ebb998532b36fc70187b641a5b7404060422.zip
Fixed ICE-5358 - allow IceGrid replica to initialize its database from another replica
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp382
1 files changed, 259 insertions, 123 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index f822bf576c8..14e454df6d5 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -43,6 +43,17 @@ struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::Ob
}
};
+template<typename K, typename V> vector<V>
+toVector(const map<K,V>& m)
+{
+ vector<V> v;
+ for(typename map<K,V>::const_iterator p = m.begin(); p != m.end(); ++p)
+ {
+ v.push_back(p->second);
+ }
+ return v;
+}
+
void
halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
{
@@ -78,8 +89,7 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_serverCache(_communicator, _instanceName, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache),
_connectionPool(plugin->getConnectionPool()),
_databasePlugin(plugin),
- _lock(0),
- _applicationSerial(0)
+ _lock(0)
{
ServerEntrySeq entries;
@@ -108,13 +118,9 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter);
_registryObserverTopic = new RegistryObserverTopic(_topicManager);
- _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, applications);
-
- AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
- _adapterObserverTopic = new AdapterObserverTopic(_topicManager, adaptersWrapper->getMap());
-
- ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
- _objectObserverTopic = new ObjectObserverTopic(_topicManager, objectsWrapper->getMap());
+ _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, applicationsWrapper);
+ _adapterObserverTopic = new AdapterObserverTopic(_topicManager, _connectionPool->getAdapters(connection));
+ _objectObserverTopic = new ObjectObserverTopic(_topicManager, _connectionPool->getObjects(connection));
_registryObserverTopic->registryUp(info);
}
@@ -188,8 +194,8 @@ Database::lock(AdminSessionI* session, const string& userId)
_lock = session;
_lockUserId = userId;
-
- return _applicationSerial;
+
+ return _applicationObserverTopic->getSerial();
}
void
@@ -206,9 +212,10 @@ Database::unlock(AdminSessionI* session)
}
void
-Database::syncApplications(const ApplicationInfoSeq& newApplications)
+Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long dbSerial)
{
- int serial = 0; // Initialize to prevent warning.
+ assert(dbSerial != 0);
+ int serial = 0;
{
Lock sync(*this);
@@ -226,6 +233,7 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications)
{
applicationsWrapper->put(p->descriptor.name, *p);
}
+ dbSerial = applicationsWrapper->updateSerial(dbSerial);
txHolder.commit();
break;
}
@@ -274,16 +282,24 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications)
}
}
- ++_applicationSerial;
- serial = _applicationObserverTopic->applicationInit(_applicationSerial, newApplications);
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+
+ if(_traceLevels->application > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
+ out << "synchronized applications (serial = `" << dbSerial << "')";
+ }
+
+ serial = _applicationObserverTopic->applicationInit(dbSerial, newApplications);
}
_applicationObserverTopic->waitForSyncedSubscribers(serial);
}
void
-Database::syncAdapters(const AdapterInfoSeq& adapters)
+Database::syncAdapters(const AdapterInfoSeq& adapters, Ice::Long dbSerial)
{
- int serial;
+ assert(dbSerial != 0);
+ int serial = 0;
{
Lock sync(*this);
for(;;)
@@ -298,6 +314,7 @@ Database::syncAdapters(const AdapterInfoSeq& adapters)
{
adaptersWrapper->put(r->id, *r);
}
+ dbSerial = adaptersWrapper->updateSerial(dbSerial);
txHolder.commit();
break;
}
@@ -310,15 +327,23 @@ Database::syncAdapters(const AdapterInfoSeq& adapters)
halt(_communicator, ex);
}
}
- serial = _adapterObserverTopic->adapterInit(adapters);
+
+ if(_traceLevels->application > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
+ out << "synchronized adapters (serial = `" << dbSerial << "')";
+ }
+
+ serial = _adapterObserverTopic->adapterInit(dbSerial, adapters);
}
_adapterObserverTopic->waitForSyncedSubscribers(serial);
}
void
-Database::syncObjects(const ObjectInfoSeq& objects)
+Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial)
{
- int serial;
+ assert(dbSerial != 0);
+ int serial = 0;
{
Lock sync(*this);
for(;;)
@@ -333,6 +358,7 @@ Database::syncObjects(const ObjectInfoSeq& objects)
{
objectsWrapper->put(q->proxy->ice_getIdentity(), *q);
}
+ dbSerial = objectsWrapper->updateSerial(dbSerial);
txHolder.commit();
break;
}
@@ -345,14 +371,102 @@ Database::syncObjects(const ObjectInfoSeq& objects)
halt(_communicator, ex);
}
}
- serial = _objectObserverTopic->objectInit(objects);
+
+ if(_traceLevels->application > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
+ out << "synchronized objects (serial = `" << dbSerial << "')";
+ }
+
+ serial = _objectObserverTopic->objectInit(dbSerial, objects);
}
_objectObserverTopic->waitForSyncedSubscribers(serial);
}
+ApplicationInfoSeq
+Database::getApplications(Ice::Long& serial) const
+{
+ for(;;)
+ {
+ try
+ {
+ DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ TransactionHolder txHolder(connection);
+ ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
+ serial = applicationsWrapper->getSerial();
+ return toVector(applicationsWrapper->getMap());
+ }
+ catch(const DeadlockException&)
+ {
+ continue;
+ }
+ catch(const DatabaseException& ex)
+ {
+ halt(_communicator, ex);
+ }
+ }
+}
+
+AdapterInfoSeq
+Database::getAdapters(Ice::Long& serial) const
+{
+ for(;;)
+ {
+ try
+ {
+ DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ TransactionHolder txHolder(connection);
+ AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
+ serial = adaptersWrapper->getSerial();
+ return toVector(adaptersWrapper->getMap());
+ }
+ catch(const DeadlockException&)
+ {
+ continue;
+ }
+ catch(const DatabaseException& ex)
+ {
+ halt(_communicator, ex);
+ }
+ }
+}
+
+ObjectInfoSeq
+Database::getObjects(Ice::Long& serial) const
+{
+ for(;;)
+ {
+ try
+ {
+ DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ TransactionHolder txHolder(connection);
+ ObjectsWrapperPtr objectsWrapper = _connectionPool->getObjects(connection);
+ serial = objectsWrapper->getSerial();
+ return toVector(objectsWrapper->getMap());
+ }
+ catch(const DeadlockException&)
+ {
+ continue;
+ }
+ catch(const DatabaseException& ex)
+ {
+ halt(_communicator, ex);
+ }
+ }
+}
+
+StringLongDict
+Database::getSerials() const
+{
+ return _connectionPool->getSerials();
+}
+
void
-Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
+Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ice::Long dbSerial)
{
+ assert(dbSerial != 0 || _master);
+
+ int serial = 0; // Initialize to prevent warning.
ServerEntrySeq entries;
try
{
@@ -374,18 +488,33 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
ApplicationHelper helper(_communicator, info.descriptor, true);
checkForAddition(helper, connection);
- saveApplication(info, connection);
+ dbSerial = saveApplication(info, connection, dbSerial);
load(helper, entries, info.uuid, info.revision);
startUpdating(info.descriptor.name, info.uuid, info.revision);
+
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ serial = _applicationObserverTopic->applicationAdded(dbSerial, info);
}
catch(const DatabaseException& ex)
{
halt(_communicator, ex);
}
+ _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for replicas to be updated.
+
+ //
+ // Mark the application as updated. All the replicas received the update so it's now safe
+ // for the nodes to start the servers.
+ //
+ {
+ Lock sync(*this);
+ vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), info.descriptor.name);
+ assert(p != _updating.end());
+ p->markUpdated();
+ }
+
if(_master)
{
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
try
{
for(ServerEntrySeq::const_iterator p = entries.begin(); p != entries.end(); ++p)
@@ -407,7 +536,10 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
Lock sync(*this);
entries.clear();
unload(ApplicationHelper(_communicator, info.descriptor), entries);
- removeApplication(info.descriptor.name, _connectionPool->getConnection());
+ dbSerial = removeApplication(info.descriptor.name, _connectionPool->getConnection());
+
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ serial = _applicationObserverTopic->applicationRemoved(dbSerial, info.descriptor.name);
}
catch(const DeploymentException& ex)
{
@@ -418,32 +550,27 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
{
halt(_communicator, ex);
}
+ _applicationObserverTopic->waitForSyncedSubscribers(serial);
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
finishUpdating(info.descriptor.name);
throw ex;
}
}
- int serial;
+ if(_traceLevels->application > 0)
{
- Lock sync(*this);
- ++_applicationSerial;
- serial = _applicationObserverTopic->applicationAdded(_applicationSerial, info);
-
- if(_traceLevels->application > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "added application `" << info.descriptor.name << "'";
- }
+ Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
+ out << "added application `" << info.descriptor.name << "'";
}
-
- _applicationObserverTopic->waitForSyncedSubscribers(serial);
-
finishUpdating(info.descriptor.name);
}
void
-Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, AdminSessionI* session)
+Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, AdminSessionI* session,
+ Ice::Long dbSerial)
{
+ assert(dbSerial != 0 || _master);
+
ApplicationInfo oldApp;
ApplicationUpdateInfo update = updt;
IceUtil::UniquePtr<ApplicationHelper> previous;
@@ -481,12 +608,14 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, A
halt(_communicator, ex);
}
- finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart);
+ finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart, dbSerial);
}
void
Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool noRestart, AdminSessionI* session)
{
+ assert(_master);
+
ApplicationUpdateInfo update;
ApplicationInfo oldApp;
IceUtil::UniquePtr<ApplicationHelper> previous;
@@ -533,6 +662,8 @@ Database::instantiateServer(const string& application,
const ServerInstanceDescriptor& instance,
AdminSessionI* session)
{
+ assert(_master);
+
ApplicationUpdateInfo update;
ApplicationInfo oldApp;
IceUtil::UniquePtr<ApplicationHelper> previous;
@@ -575,11 +706,12 @@ Database::instantiateServer(const string& application,
}
void
-Database::removeApplication(const string& name, AdminSessionI* session)
+Database::removeApplication(const string& name, AdminSessionI* session, Ice::Long dbSerial)
{
+ assert(dbSerial != 0 || _master);
ServerEntrySeq entries;
- int serial;
+ int serial = 0; // Initialize to prevent warning.
try
{
Lock sync(*this);
@@ -605,7 +737,6 @@ Database::removeApplication(const string& name, AdminSessionI* session)
ApplicationHelper helper(_communicator, appInfo.descriptor);
init = true;
checkForRemove(helper);
- removeApplication(name, connection);
unload(helper, entries);
}
catch(const DeploymentException&)
@@ -614,42 +745,31 @@ Database::removeApplication(const string& name, AdminSessionI* session)
{
throw;
}
-
- //
- // For some reasons the application became invalid. If
- // it's invalid, it's most likely not loaded either. So we
- // ignore the error and erase the descriptor.
- //
- removeApplication(name, connection);
}
+ dbSerial = removeApplication(name, connection, dbSerial);
startUpdating(name, appInfo.uuid, appInfo.revision);
+
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ serial = _applicationObserverTopic->applicationRemoved(dbSerial, name);
}
catch(const DatabaseException& ex)
{
halt(_communicator, ex);
}
+ _applicationObserverTopic->waitForSyncedSubscribers(serial);
if(_master)
{
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
}
+ if(_traceLevels->application > 0)
{
- Lock sync(*this);
- ++_applicationSerial;
- serial = _applicationObserverTopic->applicationRemoved(_applicationSerial, name);
-
- if(_traceLevels->application > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "removed application `" << name << "'";
- }
+ Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
+ out << "removed application `" << name << "'";
}
- _applicationObserverTopic->waitForSyncedSubscribers(serial);
-
finishUpdating(name);
}
@@ -743,9 +863,12 @@ Database::getAllocatableObject(const Ice::Identity& id) const
}
void
-Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy)
+Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy,
+ Ice::Long dbSerial)
{
- int serial = 0;
+ assert(dbSerial != 0 || _master);
+
+ int serial = 0; // Initialize to prevent warning.
{
Lock sync(*this);
if(_adapterCache.has(adapterId))
@@ -783,6 +906,7 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
{
adaptersWrapper->erase(adapterId);
}
+ dbSerial = adaptersWrapper->updateSerial(dbSerial);
txHolder.commit();
break;
}
@@ -810,16 +934,16 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
{
if(updated)
{
- serial = _adapterObserverTopic->adapterUpdated(info);
+ serial = _adapterObserverTopic->adapterUpdated(dbSerial, info);
}
else
{
- serial = _adapterObserverTopic->adapterAdded(info);
+ serial = _adapterObserverTopic->adapterAdded(dbSerial, info);
}
}
else
{
- serial = _adapterObserverTopic->adapterRemoved(adapterId);
+ serial = _adapterObserverTopic->adapterRemoved(dbSerial, adapterId);
}
}
_adapterObserverTopic->waitForSyncedSubscribers(serial);
@@ -859,6 +983,8 @@ Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& en
void
Database::removeAdapter(const string& adapterId)
{
+ assert(_master);
+
int serial = 0; // Initialize to prevent warning.
{
Lock sync(*this);
@@ -870,8 +996,9 @@ Database::removeAdapter(const string& adapterId)
ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'";
throw ex;
}
-
+
AdapterInfoSeq infos;
+ Ice::Long dbSerial = 0;
for(;;)
{
try
@@ -897,6 +1024,7 @@ Database::removeAdapter(const string& adapterId)
adaptersWrapper->put(p->id, *p);
}
}
+ dbSerial = adaptersWrapper->updateSerial();
txHolder.commit();
break;
}
@@ -909,6 +1037,7 @@ Database::removeAdapter(const string& adapterId)
halt(_communicator, ex);
}
}
+
if(_traceLevels->adapter > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
@@ -917,13 +1046,13 @@ Database::removeAdapter(const string& adapterId)
if(infos.empty())
{
- serial = _adapterObserverTopic->adapterRemoved(adapterId);
+ serial = _adapterObserverTopic->adapterRemoved(dbSerial, adapterId);
}
else
{
for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
{
- serial = _adapterObserverTopic->adapterUpdated(*p);
+ serial = _adapterObserverTopic->adapterUpdated(dbSerial, *p);
}
}
}
@@ -1039,7 +1168,9 @@ Database::getAllAdapters(const string& expression)
void
Database::addObject(const ObjectInfo& info)
{
- int serial;
+ assert(_master);
+
+ int serial = 0;
{
Lock sync(*this);
const Ice::Identity id = info.proxy->ice_getIdentity();
@@ -1049,6 +1180,7 @@ Database::addObject(const ObjectInfo& info)
throw ObjectExistsException(id);
}
+ Ice::Long dbSerial = 0;
for(;;)
{
try
@@ -1065,6 +1197,7 @@ Database::addObject(const ObjectInfo& info)
{
}
objectsWrapper->put(id, info);
+ dbSerial = objectsWrapper->updateSerial();
txHolder.commit();
break;
}
@@ -1078,7 +1211,7 @@ Database::addObject(const ObjectInfo& info)
}
}
- serial = _objectObserverTopic->objectAdded(info);
+ serial = _objectObserverTopic->objectAdded(dbSerial, info);
if(_traceLevels->object > 0)
{
@@ -1090,9 +1223,11 @@ Database::addObject(const ObjectInfo& info)
}
void
-Database::addOrUpdateObject(const ObjectInfo& info)
+Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial)
{
- int serial;
+ assert(dbSerial != 0 || _master);
+
+ int serial = 0; // Initialize to prevent warning.
{
Lock sync(*this);
const Ice::Identity id = info.proxy->ice_getIdentity();
@@ -1119,6 +1254,7 @@ Database::addOrUpdateObject(const ObjectInfo& info)
{
}
objectsWrapper->put(id, info);
+ dbSerial = objectsWrapper->updateSerial(dbSerial);
txHolder.commit();
break;
}
@@ -1134,11 +1270,11 @@ Database::addOrUpdateObject(const ObjectInfo& info)
if(update)
{
- serial = _objectObserverTopic->objectUpdated(info);
+ serial = _objectObserverTopic->objectUpdated(dbSerial, info);
}
else
{
- serial = _objectObserverTopic->objectAdded(info);
+ serial = _objectObserverTopic->objectAdded(dbSerial, info);
}
if(_traceLevels->object > 0)
@@ -1151,9 +1287,11 @@ Database::addOrUpdateObject(const ObjectInfo& info)
}
void
-Database::removeObject(const Ice::Identity& id)
+Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial)
{
- int serial;
+ assert(dbSerial != 0 || _master);
+
+ int serial = 0; // Initialize to prevent warning.
{
Lock sync(*this);
if(_objectCache.has(id))
@@ -1184,7 +1322,8 @@ Database::removeObject(const Ice::Identity& id)
throw ex;
}
- objectsWrapper->erase(id);
+ objectsWrapper->erase(id);
+ dbSerial = objectsWrapper->updateSerial(dbSerial);
txHolder.commit();
break;
}
@@ -1198,7 +1337,7 @@ Database::removeObject(const Ice::Identity& id)
}
}
- serial = _objectObserverTopic->objectRemoved(id);
+ serial = _objectObserverTopic->objectRemoved(dbSerial, id);
if(_traceLevels->object > 0)
{
@@ -1212,7 +1351,9 @@ Database::removeObject(const Ice::Identity& id)
void
Database::updateObject(const Ice::ObjectPrx& proxy)
{
- int serial;
+ assert(_master);
+
+ int serial = 0;
{
Lock sync(*this);
@@ -1228,6 +1369,7 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
}
ObjectInfo info;
+ Ice::Long dbSerial = 0;
for(;;)
{
try
@@ -1248,6 +1390,7 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
info.proxy = proxy;
objectsWrapper->put(id, info);
+ dbSerial = objectsWrapper->updateSerial();
txHolder.commit();
break;
}
@@ -1261,7 +1404,7 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
}
}
- serial = _objectObserverTopic->objectUpdated(info);
+ serial = _objectObserverTopic->objectUpdated(dbSerial, info);
if(_traceLevels->object > 0)
{
@@ -1273,10 +1416,9 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
}
int
-Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects)
+Database::addOrUpdateRegistryWellKnownObjects(const ObjectInfoSeq& objects)
{
Lock sync(*this);
-
for(;;)
{
try
@@ -1300,15 +1442,13 @@ Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects)
halt(_communicator, ex);
}
}
- int serial = _objectObserverTopic->objectsAddedOrUpdated(objects);
- return serial;
+ return _objectObserverTopic->wellKnownObjectsAddedOrUpdated(objects);
}
-void
-Database::removeObjectsInDatabase(const ObjectInfoSeq& objects)
+int
+Database::removeRegistryWellKnownObjects(const ObjectInfoSeq& objects)
{
Lock sync(*this);
-
for(;;)
{
try
@@ -1332,7 +1472,7 @@ Database::removeObjectsInDatabase(const ObjectInfoSeq& objects)
halt(_communicator, ex);
}
}
- _objectObserverTopic->objectsRemoved(objects);
+ return _objectObserverTopic->wellKnownObjectsRemoved(objects);
}
Ice::ObjectPrx
@@ -1924,7 +2064,6 @@ Database::reload(const ApplicationHelper& oldApp,
// Remove all the node descriptors.
//
const NodeDescriptorDict& oldNodes = oldApp.getInstance().nodes;
-
for(NodeDescriptorDict::const_iterator n = oldNodes.begin(); n != oldNodes.end(); ++n)
{
_nodeCache.get(n->first)->removeDescriptor(application);
@@ -1970,9 +2109,10 @@ Database::reload(const ApplicationHelper& oldApp,
}
}
-void
-Database::saveApplication(const ApplicationInfo& info, const DatabaseConnectionPtr& connection)
+Ice::Long
+Database::saveApplication(const ApplicationInfo& info, const DatabaseConnectionPtr& connection, Ice::Long dbSerial)
{
+ assert(dbSerial != 0 || _master);
for(;;)
{
try
@@ -1980,6 +2120,7 @@ Database::saveApplication(const ApplicationInfo& info, const DatabaseConnectionP
ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
TransactionHolder txHolder(connection);
applicationsWrapper->put(info.descriptor.name, info);
+ dbSerial = applicationsWrapper->updateSerial(dbSerial);
txHolder.commit();
break;
}
@@ -1992,11 +2133,13 @@ Database::saveApplication(const ApplicationInfo& info, const DatabaseConnectionP
halt(_communicator, ex);
}
}
+ return dbSerial;
}
-void
-Database::removeApplication(const string& name, const DatabaseConnectionPtr& connection)
+Ice::Long
+Database::removeApplication(const string& name, const DatabaseConnectionPtr& connection, Ice::Long dbSerial)
{
+ assert(dbSerial != 0 || _master);
for(;;)
{
try
@@ -2004,6 +2147,7 @@ Database::removeApplication(const string& name, const DatabaseConnectionPtr& con
ApplicationsWrapperPtr applicationsWrapper = _connectionPool->getApplications(connection);
TransactionHolder txHolder(connection);
applicationsWrapper->erase(name);
+ dbSerial = applicationsWrapper->updateSerial(dbSerial);
txHolder.commit();
break;
}
@@ -2016,6 +2160,7 @@ Database::removeApplication(const string& name, const DatabaseConnectionPtr& con
halt(_communicator, ex);
}
}
+ return dbSerial;
}
void
@@ -2199,12 +2344,14 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
const ApplicationHelper& previous,
const ApplicationHelper& helper,
AdminSessionI* /*session*/,
- bool noRestart)
+ bool noRestart,
+ Ice::Long dbSerial)
{
const ApplicationDescriptor& newDesc = helper.getDefinition();
DatabaseConnectionPtr connection = _connectionPool->newConnection();
ServerEntrySeq entries;
+ int serial = 0;
try
{
if(_master)
@@ -2215,34 +2362,29 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
Lock sync(*this);
checkForUpdate(previous, helper, connection);
reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1, noRestart);
- }
- catch(const DeploymentException&)
- {
- finishUpdating(update.descriptor.name);
- throw;
- }
-
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
- int serial;
- {
- Lock sync(*this);
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
ApplicationInfo info = oldApp;
info.updateTime = update.updateTime;
info.updateUser = update.updateUser;
info.revision = update.revision;
info.descriptor = newDesc;
- saveApplication(info, connection);
+ dbSerial = saveApplication(info, connection, dbSerial);
- ++_applicationSerial;
- serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, update);
+ serial = _applicationObserverTopic->applicationUpdated(dbSerial, update);
}
- _applicationObserverTopic->waitForSyncedSubscribers(serial);
+ catch(const DeploymentException&)
+ {
+ finishUpdating(update.descriptor.name);
+ throw;
+ }
+
+ _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for replicas to be updated.
//
// Mark the application as updated. All the replicas received the update so it's now safe
- // for the nodes to start the servers.
+ // for the nodes to start servers.
//
{
Lock sync(*this);
@@ -2278,7 +2420,7 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
ApplicationInfo info = oldApp;
info.revision = update.revision + 1;
- saveApplication(info, connection);
+ dbSerial = saveApplication(info, connection);
reload(previous, helper, entries, info.uuid, info.revision, noRestart);
newUpdate.updateTime = IceUtil::Time::now().toMilliSeconds();
@@ -2289,19 +2431,13 @@ Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), update.descriptor.name);
assert(p != _updating.end());
p->unmarkUpdated();
- }
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
- int serial;
- {
- Lock sync(*this);
- ++_applicationSerial;
- serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, newUpdate);
+ serial = _applicationObserverTopic->applicationUpdated(dbSerial, newUpdate);
}
- _applicationObserverTopic->waitForSyncedSubscribers(serial);
-
+ _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for subscriber to be updated.
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
finishUpdating(newDesc.name);
throw ex;
}