summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/NodeI.cpp')
-rw-r--r--cpp/src/IceGrid/NodeI.cpp71
1 files changed, 41 insertions, 30 deletions
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index 35bfd9d9013..470c6cd2490 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -229,8 +229,8 @@ NodeI::~NodeI()
void
NodeI::loadServer_async(const AMD_Node_loadServerPtr& amdCB,
- const ServerInfo& info,
- bool fromMaster,
+ const InternalServerDescriptorPtr& descriptor,
+ const string& replicaName,
const Ice::Current& current)
{
ServerCommandPtr command;
@@ -238,7 +238,7 @@ NodeI::loadServer_async(const AMD_Node_loadServerPtr& amdCB,
Lock sync(*this);
++_serial;
- Ice::Identity id = createServerIdentity(info.descriptor->id);
+ Ice::Identity id = createServerIdentity(descriptor->id);
//
// Check if we already have a servant for this server. If that's
@@ -252,14 +252,14 @@ NodeI::loadServer_async(const AMD_Node_loadServerPtr& amdCB,
if(!server)
{
ServerPrx proxy = ServerPrx::uncheckedCast(_adapter->createProxy(id));
- server = new ServerI(this, proxy, _serversDir, info.descriptor->id, _waitTime);
+ server = new ServerI(this, proxy, _serversDir, descriptor->id, _waitTime);
_adapter->add(server, id);
added = true;
}
try
{
- command = server->load(amdCB, info, fromMaster);
+ command = server->load(amdCB, descriptor, replicaName);
}
catch(const Ice::ObjectNotExistException&)
{
@@ -288,6 +288,7 @@ NodeI::destroyServer_async(const AMD_Node_destroyServerPtr& amdCB,
const string& serverId,
const string& uuid,
int revision,
+ const string& replicaName,
const Ice::Current& current)
{
ServerCommandPtr command;
@@ -306,7 +307,7 @@ NodeI::destroyServer_async(const AMD_Node_destroyServerPtr& amdCB,
//
try
{
- command = server->destroy(amdCB, uuid, revision);
+ command = server->destroy(amdCB, uuid, revision, replicaName);
}
catch(const Ice::ObjectNotExistException&)
{
@@ -323,7 +324,7 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB,
const PatcherFeedbackPrx& feedback,
const string& application,
const string& server,
- const DistributionDescriptor& appDistrib,
+ const InternalDistributionDescriptorPtr& appDistrib,
bool shutdown,
const Ice::Current&)
{
@@ -339,7 +340,7 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB,
}
set<ServerIPtr> servers;
- if(!appDistrib.icepatch.empty())
+ if(!appDistrib->icepatch.empty())
{
//
// Get all the application servers (even the ones which
@@ -357,13 +358,13 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB,
set<ServerIPtr>::iterator s = servers.begin();
while(s != servers.end())
{
- if((*s)->getDistribution().icepatch.empty())
+ if((*s)->getDistribution())
{
- servers.erase(s++);
+ ++s;
}
else
{
- ++s;
+ servers.erase(s++);
}
}
}
@@ -405,7 +406,7 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB,
}
}
- if((servers.empty() || !appDistrib.icepatch.empty()) && !running.empty())
+ if((servers.empty() || !appDistrib->icepatch.empty()) && !running.empty())
{
if(running.size() == 1)
{
@@ -426,14 +427,14 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB,
// Patch the application.
//
FileServerPrx icepatch;
- if(!appDistrib.icepatch.empty())
+ if(!appDistrib->icepatch.empty())
{
- icepatch = FileServerPrx::checkedCast(_communicator->stringToProxy(appDistrib.icepatch));
+ icepatch = FileServerPrx::checkedCast(_communicator->stringToProxy(appDistrib->icepatch));
if(!icepatch)
{
- throw "proxy `" + appDistrib.icepatch + "' is not a file server.";
+ throw "proxy `" + appDistrib->icepatch + "' is not a file server.";
}
- patch(icepatch, "distrib/" + application, appDistrib.directories);
+ patch(icepatch, "distrib/" + application, appDistrib->directories);
}
//
@@ -441,18 +442,18 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB,
//
for(s = servers.begin(); s != servers.end(); ++s)
{
- DistributionDescriptor dist = (*s)->getDistribution();
- if(dist.icepatch.empty() || (!server.empty() && (*s)->getId() != server))
+ InternalDistributionDescriptorPtr dist = (*s)->getDistribution();
+ if(!dist || (!server.empty() && (*s)->getId() != server))
{
continue;
}
- icepatch = FileServerPrx::checkedCast(_communicator->stringToProxy(dist.icepatch));
+ icepatch = FileServerPrx::checkedCast(_communicator->stringToProxy(dist->icepatch));
if(!icepatch)
{
- throw "proxy `" + dist.icepatch + "' is not a file server.";
+ throw "proxy `" + dist->icepatch + "' is not a file server.";
}
- patch(icepatch, "servers/" + (*s)->getId() + "/distrib", dist.directories);
+ patch(icepatch, "servers/" + (*s)->getId() + "/distrib", dist->directories);
if(!server.empty())
{
@@ -499,7 +500,6 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB,
}
catch(const Ice::LocalException& ex)
{
- cerr << ex << endl;
}
}
@@ -640,7 +640,7 @@ NodeI::getRedirectErrToOut() const
NodeSessionPrx
NodeI::registerWithRegistry(const InternalRegistryPrx& registry)
{
- return registry->registerNode(_platform.getNodeInfo(), _proxy);
+ return registry->registerNode(_platform.getInternalNodeInfo(), _proxy);
}
void
@@ -658,6 +658,7 @@ NodeI::checkConsistency(const NodeSessionPrx& session)
//
unsigned long serial = 0;
Ice::StringSeq servers;
+ vector<ServerCommandPtr> commands;
while(true)
{
{
@@ -665,7 +666,7 @@ NodeI::checkConsistency(const NodeSessionPrx& session)
if(serial == _serial)
{
_serial = 1; // We can reset the serial number.
- checkConsistencyNoSync(servers);
+ commands = checkConsistencyNoSync(servers);
break;
}
serial = _serial;
@@ -681,6 +682,8 @@ NodeI::checkConsistency(const NodeSessionPrx& session)
}
sort(servers.begin(), servers.end());
}
+
+ for_each(commands.begin(), commands.end(), IceUtil::voidMemFun(&ServerCommand::execute));
}
void
@@ -839,9 +842,11 @@ NodeI::removeServer(const ServerIPtr& server, const std::string& application, bo
}
}
-void
+vector<ServerCommandPtr>
NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers)
{
+ vector<ServerCommandPtr> commands;
+
//
// Check if the servers directory doesn't contain more servers
// than the registry really knows.
@@ -855,7 +860,7 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers)
{
Ice::Error out(_traceLevels->logger);
out << "couldn't read directory `" << _serversDir << "':" << msg;
- return;
+ return commands;
}
vector<string> remove;
@@ -876,7 +881,11 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers)
//
try
{
- server->destroy(0, "", 0);
+ ServerCommandPtr command = server->destroy(0, "", 0, "Master");
+ if(command)
+ {
+ commands.push_back(command);
+ }
p = remove.erase(p);
continue;
}
@@ -916,9 +925,9 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers)
if(remove.empty())
{
- return;
+ return commands;
}
-
+
//
// If there's server that couldn't be removed we move them to the
// temporary backup directory. First, we rotate the temporary
@@ -933,7 +942,7 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers)
{
Ice::Error out(_traceLevels->logger);
out << "couldn't read directory `" << _tmpDir << "':" << msg;
- return;
+ return commands;
}
if(contents.size() < 10)
@@ -975,6 +984,8 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers)
Ice::Warning out(_traceLevels->logger);
out << "rotation failed: " << msg;
}
+
+ return commands;
}
NodeSessionPrx