summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp311
1 files changed, 221 insertions, 90 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 601a452eb82..3c9f6a004a4 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -43,30 +43,6 @@ struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::Ob
}
};
-bool
-isServerUpdated(const ServerInfo& lhs, const ServerInfo& rhs)
-{
- if(lhs.node != rhs.node)
- {
- return true;
- }
-
- IceBoxDescriptorPtr lhsIceBox = IceBoxDescriptorPtr::dynamicCast(lhs.descriptor);
- IceBoxDescriptorPtr rhsIceBox = IceBoxDescriptorPtr::dynamicCast(rhs.descriptor);
- if(lhsIceBox && rhsIceBox)
- {
- return IceBoxHelper(lhsIceBox) != IceBoxHelper(rhsIceBox);
- }
- else if(!lhsIceBox && !rhsIceBox)
- {
- return ServerHelper(lhs.descriptor) != ServerHelper(rhs.descriptor);
- }
- else
- {
- return true;
- }
-}
-
void
halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
{
@@ -275,7 +251,7 @@ Database::syncApplications(const ApplicationInfoSeq& newApplications)
{
ApplicationHelper previous(_communicator, q->second.descriptor);
ApplicationHelper helper(_communicator, p->descriptor);
- reload(previous, helper, entries, p->uuid, p->revision);
+ reload(previous, helper, entries, p->uuid, p->revision, false);
}
else
{
@@ -466,12 +442,12 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
}
void
-Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* session)
+Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, AdminSessionI* session)
{
- ServerEntrySeq entries;
ApplicationInfo oldApp;
- ApplicationDescriptor newDesc;
ApplicationUpdateInfo update = updt;
+ auto_ptr<ApplicationHelper> previous;
+ auto_ptr<ApplicationHelper> helper;
try
{
Lock sync(*this);
@@ -495,20 +471,8 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* se
update.revision = oldApp.revision + 1;
}
- ApplicationHelper previous(_communicator, oldApp.descriptor);
- ApplicationHelper helper(_communicator, previous.update(update.descriptor), true);
- newDesc = helper.getDefinition();
-
- checkForUpdate(previous, helper, connection);
-
- ApplicationInfo info = oldApp;
- info.updateTime = update.updateTime;
- info.updateUser = update.updateUser;
- info.revision = update.revision;
- info.descriptor = newDesc;
- saveApplication(info, connection);
-
- reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
+ previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
+ helper.reset(new ApplicationHelper(_communicator, previous->update(update.descriptor), true));
startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
}
@@ -517,15 +481,16 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* se
halt(_communicator, ex);
}
- finishApplicationUpdate(entries, update, oldApp, newDesc, session);
+ finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart);
}
void
-Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminSessionI* session)
+Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool noRestart, AdminSessionI* session)
{
- ServerEntrySeq entries;
ApplicationUpdateInfo update;
ApplicationInfo oldApp;
+ auto_ptr<ApplicationHelper> previous;
+ auto_ptr<ApplicationHelper> helper;
try
{
Lock sync(*this);
@@ -544,25 +509,14 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS
throw ApplicationNotExistException(newDesc.name);
}
- ApplicationHelper previous(_communicator, oldApp.descriptor);
- ApplicationHelper helper(_communicator, newDesc, true);
+ previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
+ helper.reset(new ApplicationHelper(_communicator, newDesc, true));
update.updateTime = IceUtil::Time::now().toMilliSeconds();
update.updateUser = _lockUserId;
update.revision = oldApp.revision + 1;
- update.descriptor = helper.diff(previous);
+ update.descriptor = helper->diff(*previous);
- checkForUpdate(previous, helper, connection);
-
- ApplicationInfo info = oldApp;
- info.updateTime = update.updateTime;
- info.updateUser = update.updateUser;
- info.revision = update.revision;
- info.descriptor = newDesc;
- saveApplication(info, connection);
-
- reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
-
startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
}
catch(const DatabaseException& ex)
@@ -570,7 +524,7 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS
halt(_communicator, ex);
}
- finishApplicationUpdate(entries, update, oldApp, newDesc, session);
+ finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart);
}
void
@@ -579,10 +533,10 @@ Database::instantiateServer(const string& application,
const ServerInstanceDescriptor& instance,
AdminSessionI* session)
{
- ServerEntrySeq entries;
ApplicationUpdateInfo update;
ApplicationInfo oldApp;
- ApplicationDescriptor newDesc;
+ auto_ptr<ApplicationHelper> previous;
+ auto_ptr<ApplicationHelper> helper;
try
{
@@ -602,25 +556,13 @@ Database::instantiateServer(const string& application,
throw ApplicationNotExistException(application);
}
- ApplicationHelper previous(_communicator, oldApp.descriptor);
- ApplicationHelper helper(_communicator, previous.instantiateServer(node, instance), true);
- newDesc = helper.getDefinition();
+ previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
+ helper.reset(new ApplicationHelper(_communicator, previous->instantiateServer(node, instance), true));
update.updateTime = IceUtil::Time::now().toMilliSeconds();
update.updateUser = _lockUserId;
update.revision = oldApp.revision + 1;
- update.descriptor = helper.diff(previous);
-
- checkForUpdate(previous, helper, connection);
-
- ApplicationInfo info = oldApp;
- info.updateTime = update.updateTime;
- info.updateUser = update.updateUser;
- info.revision = update.revision;
- info.descriptor = newDesc;
- saveApplication(info, connection);
-
- reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
+ update.descriptor = helper->diff(*previous);
startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
}
@@ -629,7 +571,7 @@ Database::instantiateServer(const string& application,
halt(_communicator, ex);
}
- finishApplicationUpdate(entries, update, oldApp, newDesc, session);
+ finishApplicationUpdate(update, oldApp, *previous, *helper, session, true);
}
void
@@ -1034,7 +976,7 @@ Database::getAdapterInfo(const string& id)
// Otherwise, we check the adapter endpoint table -- if there's an
// entry the adapter is managed by the registry itself.
//
- DatabaseConnectionPtr connection = _connectionPool->getConnection();
+ DatabaseConnectionPtr connection = _connectionPool->newConnection();
AdaptersWrapperPtr adaptersWrapper = _connectionPool->getAdapters(connection);
AdapterInfoSeq infos;
try
@@ -1878,7 +1820,7 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries, const stri
map<string, ServerInfo> servers = app.getServerInfos(uuid, revision);
for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p)
{
- entries.push_back(_serverCache.add(p->second));
+ entries.push_back(_serverCache.add(p->second, false));
}
}
@@ -1914,7 +1856,8 @@ Database::reload(const ApplicationHelper& oldApp,
const ApplicationHelper& newApp,
ServerEntrySeq& entries,
const string& uuid,
- int revision)
+ int revision,
+ bool noRestart)
{
const string application = oldApp.getInstance().name;
@@ -1939,7 +1882,7 @@ Database::reload(const ApplicationHelper& oldApp,
else
{
ServerEntryPtr server = _serverCache.get(p->first);
- server->update(q->second); // Just update the server revision on the node.
+ server->update(q->second, noRestart); // Just update the server revision on the node.
entries.push_back(server);
}
}
@@ -2026,7 +1969,7 @@ Database::reload(const ApplicationHelper& oldApp,
//
for(vector<ServerInfo>::const_iterator q = load.begin(); q != load.end(); ++q)
{
- entries.push_back(_serverCache.add(*q));
+ entries.push_back(_serverCache.add(*q, noRestart));
}
}
@@ -2079,17 +2022,205 @@ Database::removeApplication(const string& name, const DatabaseConnectionPtr& con
}
void
-Database::finishApplicationUpdate(ServerEntrySeq& entries,
- const ApplicationUpdateInfo& update,
- const ApplicationInfo& oldApp,
- const ApplicationDescriptor& newDesc,
- AdminSessionI* session)
+Database::checkUpdate(const ApplicationHelper& oldApp,
+ const ApplicationHelper& newApp,
+ const string& uuid,
+ int revision,
+ bool noRestart)
{
+ const string application = oldApp.getInstance().name;
+
+ map<string, ServerInfo> oldServers = oldApp.getServerInfos(uuid, revision);
+ map<string, ServerInfo> newServers = newApp.getServerInfos(uuid, revision + 1);
+
+ map<string, ServerInfo>::const_iterator p;
+ vector<string> servers;
+ vector<string> reasons;
+ if(noRestart)
+ {
+ for(p = oldServers.begin(); p != oldServers.end(); ++p)
+ {
+ map<string, ServerInfo>::const_iterator q = newServers.find(p->first);
+ if(q == newServers.end())
+ {
+ servers.push_back(p->first);
+ reasons.push_back("server `" + p->first + "' needs to be removed");
+ }
+ }
+ }
+
+ vector<CheckUpdateResultPtr> results;
+ set<string> unreachableNodes;
+ for(p = newServers.begin(); p != newServers.end(); ++p)
+ {
+ map<string, ServerInfo>::const_iterator q = oldServers.find(p->first);
+ if(q != oldServers.end() && isServerUpdated(p->second, q->second))
+ {
+ if(noRestart && isServerUpdated(p->second, q->second, true)) // Ignore properties
+ {
+ //
+ // The updates are not only property updates and noRestart is required, no
+ // need to check the server update on the node, we know already it requires
+ // a restart.
+ //
+ servers.push_back(p->first);
+ reasons.push_back("update requires the server `" + p->first + "' to be stopped");
+ }
+ else
+ {
+ //
+ // Ask the node to check the server update.
+ //
+ try
+ {
+ CheckUpdateResultPtr result = _serverCache.get(p->first)->checkUpdate(p->second, noRestart);
+ if(result)
+ {
+ results.push_back(result);
+ }
+ }
+ catch(const NodeUnreachableException& ex)
+ {
+ unreachableNodes.insert(ex.name);
+ }
+ catch(const DeploymentException& ex)
+ {
+ servers.push_back(p->first);
+ reasons.push_back(ex.reason);
+ }
+
+ }
+ }
+ }
+
+ for(vector<CheckUpdateResultPtr>::const_iterator q = results.begin(); q != results.end(); ++q)
+ {
+ try
+ {
+ (*q)->getResult();
+ }
+ catch(const NodeUnreachableException& ex)
+ {
+ unreachableNodes.insert(ex.name);
+ }
+ catch(const DeploymentException& ex)
+ {
+ servers.push_back((*q)->getServer());
+ reasons.push_back(ex.reason);
+ }
+ }
+
+ if(noRestart)
+ {
+ if(!servers.empty() || !unreachableNodes.empty())
+ {
+ if(_traceLevels->application > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
+ out << "check for application `" << application << "' update failed:";
+ if(!unreachableNodes.empty())
+ {
+ Ice::StringSeq nodes(unreachableNodes.begin(), unreachableNodes.end());
+ if(nodes.size() == 1)
+ {
+ out << "\nthe node `" << nodes[0] << "' is down";
+ }
+ else
+ {
+ out << "\nthe nodes `" << toString(nodes, ", ") << "' are down";
+ }
+ }
+ if(!reasons.empty())
+ {
+ for(vector<string>::const_iterator p = reasons.begin(); p != reasons.end(); ++p)
+ {
+ out << "\n" << *p;
+ }
+ }
+ }
+
+ ostringstream os;
+ os << "check for application `" << application << "' update failed:";
+ if(!servers.empty())
+ {
+ if(servers.size() == 1)
+ {
+ os << "\nthe server `" << servers[0] << "' would need to be stopped";
+ }
+ else
+ {
+ os << "\nthe servers `" << toString(servers, ", ") << "' would need to be stopped";
+ }
+ }
+ if(!unreachableNodes.empty())
+ {
+ Ice::StringSeq nodes(unreachableNodes.begin(), unreachableNodes.end());
+ if(nodes.size() == 1)
+ {
+ os << "\nthe node `" << nodes[0] << "' is down";
+ }
+ else
+ {
+ os << "\nthe nodes `" << toString(nodes, ", ") << "' are down";
+ }
+ }
+ throw DeploymentException(os.str());
+ }
+ }
+ else if(!reasons.empty())
+ {
+ ostringstream os;
+ os << "check for application `" << application << "' update failed:";
+ for(vector<string>::const_iterator p = reasons.begin(); p != reasons.end(); ++p)
+ {
+ os << "\n" << *p;
+ }
+ throw DeploymentException(os.str());
+ }
+}
+
+void
+Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
+ const ApplicationInfo& oldApp,
+ const ApplicationHelper& previous,
+ const ApplicationHelper& helper,
+ AdminSessionI* session,
+ bool noRestart)
+{
+ const ApplicationDescriptor& newDesc = helper.getDefinition();
+ DatabaseConnectionPtr connection = _connectionPool->newConnection();
+
+ ServerEntrySeq entries;
+ try
+ {
+ if(_master)
+ {
+ checkUpdate(previous, helper, oldApp.uuid, oldApp.revision, noRestart);
+ }
+
+ Lock sync(*this);
+ checkForUpdate(previous, helper, connection);
+ reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1, noRestart);
+ }
+ catch(const DeploymentException&)
+ {
+ finishUpdating(update.descriptor.name);
+ throw;
+ }
+
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
int serial;
{
Lock sync(*this);
+
+ ApplicationInfo info = oldApp;
+ info.updateTime = update.updateTime;
+ info.updateUser = update.updateUser;
+ info.revision = update.revision;
+ info.descriptor = newDesc;
+ saveApplication(info, connection);
+
++_applicationSerial;
serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, update);
}
@@ -2133,8 +2264,8 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries,
ApplicationInfo info = oldApp;
info.revision = update.revision + 1;
- saveApplication(info, _connectionPool->getConnection());
- reload(previous, helper, entries, info.uuid, info.revision);
+ saveApplication(info, connection);
+ reload(previous, helper, entries, info.uuid, info.revision, noRestart);
newUpdate.updateTime = IceUtil::Time::now().toMilliSeconds();
newUpdate.updateUser = _lockUserId;