summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp56
1 files changed, 30 insertions, 26 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index ac63076458c..1f777a3675f 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -119,7 +119,8 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
const IceStorm::TopicManagerPrx& topicManager,
const string& instanceName,
int sessionTimeout,
- const TraceLevelsPtr& traceLevels) :
+ const TraceLevelsPtr& traceLevels,
+ bool master) :
_communicator(registryAdapter->getCommunicator()),
_internalAdapter(registryAdapter),
_topicManager(topicManager),
@@ -127,7 +128,7 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_instanceName(instanceName),
_traceLevels(traceLevels),
_sessionTimeout(sessionTimeout),
- _master(_communicator->getProperties()->getProperty("IceGrid.Registry.ReplicaName").empty()),
+ _master(master),
_replicaCache(_communicator, topicManager),
_nodeCache(_communicator, _replicaCache, _master),
_objectCache(_communicator),
@@ -165,6 +166,7 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter);
_registryObserverTopic = new RegistryObserverTopic(_topicManager);
+
_applicationObserverTopic = new ApplicationObserverTopic(_topicManager, _applications);
_adapterObserverTopic = new AdapterObserverTopic(_topicManager, _adapters);
_objectObserverTopic = new ObjectObserverTopic(_topicManager, _objects);
@@ -391,24 +393,19 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
checkForAddition(helper);
load(helper, entries, info.uuid, info.revision);
_updating.insert(info.descriptor.name);
+
+ _replicaCache.startApplicationReplication(info.uuid, info.revision);
}
- //
- // If the update is from an admin session, we synchronize the
- // servers and throw if there's errors.
- //
- if(session)
+ if(_master)
{
- //
- // Synchronize the servers on the nodes. If a server couldn't be
- // deployed we unload the application and throw.
- //
try
{
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
}
catch(const DeploymentException& ex)
{
+ try
{
Lock sync(*this);
entries.clear();
@@ -416,21 +413,14 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
_updating.erase(info.descriptor.name);
notifyAll();
}
- try
- {
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
- }
- catch(const DeploymentException&)
+ catch(const DeploymentException& ex)
{
- // TODO: warning?
+ Ice::Error err(_traceLevels->logger);
+ err << "failed to rollback previous application `" << info.descriptor.name << "':\n" << ex.reason;
}
throw ex;
}
}
- else
- {
- // TODO: XXX: Synchronize the servers here?!
- }
//
// Save the application descriptor.
@@ -446,6 +436,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
_applicationObserverTopic->applicationAdded(serial, info);
_replicaCache.waitForUpdateReplication("application", serial);
+ _replicaCache.finishApplicationReplication(info.uuid, info.revision);
if(_traceLevels->application > 0)
{
@@ -491,6 +482,7 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* se
newDesc = helper.getDefinition();
_updating.insert(update.descriptor.name);
+ _replicaCache.startApplicationReplication(oldApp.uuid, update.revision);
}
finishUpdate(entries, update, oldApp, newDesc, session);
@@ -532,6 +524,7 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS
reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
_updating.insert(update.descriptor.name);
+ _replicaCache.startApplicationReplication(oldApp.uuid, update.revision);
}
finishUpdate(entries, update, oldApp, newDesc, session);
@@ -579,6 +572,7 @@ Database::instantiateServer(const string& application,
newDesc = helper.getDefinition();
_updating.insert(update.descriptor.name);
+ _replicaCache.startApplicationReplication(oldApp.uuid, update.revision);
}
finishUpdate(entries, update, oldApp, newDesc, session);
@@ -752,9 +746,15 @@ Database::getReplicaInfo(const string& name) const
}
void
-Database::replicaReceivedUpdate(const string& name, const string& update, int serial)
+Database::replicaReceivedUpdate(const string& name, const string& update, int serial, const string& failure)
+{
+ _replicaCache.replicaReceivedUpdate(name, update, serial, failure);
+}
+
+void
+Database::waitForApplicationReplication(const string& application, int revision)
{
- _replicaCache.replicaReceivedUpdate(name, update, serial);
+ _replicaCache.waitForApplicationReplication(application, revision);
}
void
@@ -1754,7 +1754,7 @@ Database::finishUpdate(ServerEntrySeq& entries,
const ApplicationDescriptor& newDesc,
AdminSessionI* session)
{
- if(session)
+ if(_master)
{
//
// Synchronize the servers on the nodes. If a server couldn't be
@@ -1766,6 +1766,7 @@ Database::finishUpdate(ServerEntrySeq& entries,
}
catch(const DeploymentException& ex)
{
+ ApplicationUpdateInfo newUpdate;
{
Lock sync(*this);
entries.clear();
@@ -1775,13 +1776,15 @@ Database::finishUpdate(ServerEntrySeq& entries,
_updating.erase(newDesc.name);
notifyAll();
}
+
try
{
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
}
- catch(const DeploymentException&)
+ catch(const DeploymentException& ex)
{
- // TODO: warning?
+ Ice::Error err(_traceLevels->logger);
+ err << "failed to rollback previous application `" << oldApp.descriptor.name << "':\n" << ex.reason;
}
throw ex;
}
@@ -1808,6 +1811,7 @@ Database::finishUpdate(ServerEntrySeq& entries,
_applicationObserverTopic->applicationUpdated(serial, update);
_replicaCache.waitForUpdateReplication("application", serial);
+ _replicaCache.finishApplicationReplication(oldApp.uuid, update.revision);
if(_traceLevels->application > 0)
{