summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp124
1 files changed, 77 insertions, 47 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 5f74ff08031..9ea5c095520 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -409,9 +409,20 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
if(_master)
{
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
try
{
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait));
+ for(ServerEntrySeq::const_iterator p = entries.begin(); p != entries.end(); ++p)
+ {
+ try
+ {
+ (*p)->waitForSync();
+ }
+ catch(const NodeUnreachableException&)
+ {
+ // Ignore.
+ }
+ }
}
catch(const DeploymentException& ex)
{
@@ -680,7 +691,7 @@ Database::removeApplication(const string& name, AdminSessionI* session)
if(_master)
{
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitNoThrow));
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
}
{
@@ -731,7 +742,7 @@ Database::waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdat
Lock sync(*this);
vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), make_pair(uuid, revision));
- if(p != _updating.end())
+ if(p != _updating.end() && !p->updated)
{
p->cbs.push_back(cb);
}
@@ -977,7 +988,7 @@ Database::removeAdapter(const string& adapterId)
AdapterPrx
Database::getAdapterProxy(const string& adapterId, const string& replicaGroupId, bool upToDate)
{
- Lock sync(*this); // make sure this isn't call during an update.
+ Lock sync(*this); // Make sure this isn't call during an update.
return _adapterCache.get(adapterId)->getProxy(replicaGroupId, upToDate);
}
@@ -993,6 +1004,13 @@ Database::getLocatorAdapterInfo(const string& id,
_adapterCache.get(id)->getLocatorAdapterInfo(adpts, count, replicaGroup, roundRobin, excludes);
}
+bool
+Database::addAdapterSyncCallback(const string& id, const SynchronizationCallbackPtr& callback)
+{
+ Lock sync(*this); // Make sure this isn't call during an update.
+ return _adapterCache.get(id)->addSyncCallback(callback);
+}
+
AdapterInfoSeq
Database::getAdapterInfo(const string& id)
{
@@ -1036,7 +1054,6 @@ Database::getAdapterInfo(const string& id)
return infos;
}
-
Ice::StringSeq
Database::getAllAdapters(const string& expression)
{
@@ -2068,15 +2085,42 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries,
const ApplicationDescriptor& newDesc,
AdminSessionI* session)
{
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+
+ int serial;
+ {
+ Lock sync(*this);
+ ++_applicationSerial;
+ serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, update);
+ }
+ _applicationObserverTopic->waitForSyncedSubscribers(serial);
+
+ //
+ // Mark the application as updated. All the replicas received the update so it's now safe
+ // for the nodes to start the servers.
+ //
+ {
+ Lock sync(*this);
+ vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), update.descriptor.name);
+ assert(p != _updating.end());
+ p->markUpdated();
+ }
+
if(_master)
{
- //
- // Load the servers on the nodes. If a server couldn't be
- // deployed we unload the application and throw.
- //
try
{
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait));
+ for(ServerEntrySeq::const_iterator p = entries.begin(); p != entries.end(); ++p)
+ {
+ try
+ {
+ (*p)->waitForSync();
+ }
+ catch(const NodeUnreachableException&)
+ {
+ // Ignore.
+ }
+ }
}
catch(const DeploymentException& ex)
{
@@ -2086,47 +2130,37 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries,
entries.clear();
ApplicationHelper previous(_communicator, newDesc);
ApplicationHelper helper(_communicator, oldApp.descriptor);
- saveApplication(oldApp, _databaseCache->getConnection());
- reload(previous, helper, entries, oldApp.uuid, oldApp.revision);
- }
- try
- {
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait));
+ ApplicationInfo info = oldApp;
+ info.revision = update.revision + 1;
+ saveApplication(info, _databaseCache->getConnection());
+ reload(previous, helper, entries, info.uuid, info.revision);
+
+ newUpdate.updateTime = IceUtil::Time::now().toMilliSeconds();
+ newUpdate.updateUser = _lockUserId;
+ newUpdate.revision = info.revision;
+ newUpdate.descriptor = helper.diff(previous);
+
+ vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), update.descriptor.name);
+ assert(p != _updating.end());
+ p->unmarkUpdated();
}
- catch(const DeploymentException& ex)
+
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
+
+ int serial;
{
- Ice::Error err(_traceLevels->logger);
- err << "failed to rollback previous application `" << oldApp.descriptor.name << "':\n" << ex.reason;
+ Lock sync(*this);
+ ++_applicationSerial;
+ serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, newUpdate);
}
+ _applicationObserverTopic->waitForSyncedSubscribers(serial);
finishUpdating(newDesc.name);
throw ex;
}
}
- else
- {
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
- }
-
- //
- // Save the application descriptor.
- //
- int serial;
- {
- Lock sync(*this);
- ++_applicationSerial;
-
- if(_traceLevels->application > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "updated application `" << update.descriptor.name << "'";
- }
-
- serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, update);
- }
-
- _applicationObserverTopic->waitForSyncedSubscribers(serial);
finishUpdating(update.descriptor.name);
}
@@ -2155,11 +2189,7 @@ Database::finishUpdating(const string& name)
vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), name);
assert(p != _updating.end());
- for(vector<AMD_NodeSession_waitForApplicationUpdatePtr>::const_iterator q = p->cbs.begin(); q != p->cbs.end(); ++q)
- {
- (*q)->ice_response();
- }
+ p->markUpdated();
_updating.erase(p);
-
notifyAll();
}