diff options
Diffstat (limited to 'cpp/src/IceGrid/NodeI.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 71 |
1 files changed, 41 insertions, 30 deletions
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 35bfd9d9013..470c6cd2490 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -229,8 +229,8 @@ NodeI::~NodeI() void NodeI::loadServer_async(const AMD_Node_loadServerPtr& amdCB, - const ServerInfo& info, - bool fromMaster, + const InternalServerDescriptorPtr& descriptor, + const string& replicaName, const Ice::Current& current) { ServerCommandPtr command; @@ -238,7 +238,7 @@ NodeI::loadServer_async(const AMD_Node_loadServerPtr& amdCB, Lock sync(*this); ++_serial; - Ice::Identity id = createServerIdentity(info.descriptor->id); + Ice::Identity id = createServerIdentity(descriptor->id); // // Check if we already have a servant for this server. If that's @@ -252,14 +252,14 @@ NodeI::loadServer_async(const AMD_Node_loadServerPtr& amdCB, if(!server) { ServerPrx proxy = ServerPrx::uncheckedCast(_adapter->createProxy(id)); - server = new ServerI(this, proxy, _serversDir, info.descriptor->id, _waitTime); + server = new ServerI(this, proxy, _serversDir, descriptor->id, _waitTime); _adapter->add(server, id); added = true; } try { - command = server->load(amdCB, info, fromMaster); + command = server->load(amdCB, descriptor, replicaName); } catch(const Ice::ObjectNotExistException&) { @@ -288,6 +288,7 @@ NodeI::destroyServer_async(const AMD_Node_destroyServerPtr& amdCB, const string& serverId, const string& uuid, int revision, + const string& replicaName, const Ice::Current& current) { ServerCommandPtr command; @@ -306,7 +307,7 @@ NodeI::destroyServer_async(const AMD_Node_destroyServerPtr& amdCB, // try { - command = server->destroy(amdCB, uuid, revision); + command = server->destroy(amdCB, uuid, revision, replicaName); } catch(const Ice::ObjectNotExistException&) { @@ -323,7 +324,7 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB, const PatcherFeedbackPrx& feedback, const string& application, const string& server, - const DistributionDescriptor& appDistrib, + const InternalDistributionDescriptorPtr& appDistrib, bool shutdown, const Ice::Current&) { @@ -339,7 +340,7 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB, } set<ServerIPtr> servers; - if(!appDistrib.icepatch.empty()) + if(!appDistrib->icepatch.empty()) { // // Get all the application servers (even the ones which @@ -357,13 +358,13 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB, set<ServerIPtr>::iterator s = servers.begin(); while(s != servers.end()) { - if((*s)->getDistribution().icepatch.empty()) + if((*s)->getDistribution()) { - servers.erase(s++); + ++s; } else { - ++s; + servers.erase(s++); } } } @@ -405,7 +406,7 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB, } } - if((servers.empty() || !appDistrib.icepatch.empty()) && !running.empty()) + if((servers.empty() || !appDistrib->icepatch.empty()) && !running.empty()) { if(running.size() == 1) { @@ -426,14 +427,14 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB, // Patch the application. // FileServerPrx icepatch; - if(!appDistrib.icepatch.empty()) + if(!appDistrib->icepatch.empty()) { - icepatch = FileServerPrx::checkedCast(_communicator->stringToProxy(appDistrib.icepatch)); + icepatch = FileServerPrx::checkedCast(_communicator->stringToProxy(appDistrib->icepatch)); if(!icepatch) { - throw "proxy `" + appDistrib.icepatch + "' is not a file server."; + throw "proxy `" + appDistrib->icepatch + "' is not a file server."; } - patch(icepatch, "distrib/" + application, appDistrib.directories); + patch(icepatch, "distrib/" + application, appDistrib->directories); } // @@ -441,18 +442,18 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB, // for(s = servers.begin(); s != servers.end(); ++s) { - DistributionDescriptor dist = (*s)->getDistribution(); - if(dist.icepatch.empty() || (!server.empty() && (*s)->getId() != server)) + InternalDistributionDescriptorPtr dist = (*s)->getDistribution(); + if(!dist || (!server.empty() && (*s)->getId() != server)) { continue; } - icepatch = FileServerPrx::checkedCast(_communicator->stringToProxy(dist.icepatch)); + icepatch = FileServerPrx::checkedCast(_communicator->stringToProxy(dist->icepatch)); if(!icepatch) { - throw "proxy `" + dist.icepatch + "' is not a file server."; + throw "proxy `" + dist->icepatch + "' is not a file server."; } - patch(icepatch, "servers/" + (*s)->getId() + "/distrib", dist.directories); + patch(icepatch, "servers/" + (*s)->getId() + "/distrib", dist->directories); if(!server.empty()) { @@ -499,7 +500,6 @@ NodeI::patch_async(const AMD_Node_patchPtr& amdCB, } catch(const Ice::LocalException& ex) { - cerr << ex << endl; } } @@ -640,7 +640,7 @@ NodeI::getRedirectErrToOut() const NodeSessionPrx NodeI::registerWithRegistry(const InternalRegistryPrx& registry) { - return registry->registerNode(_platform.getNodeInfo(), _proxy); + return registry->registerNode(_platform.getInternalNodeInfo(), _proxy); } void @@ -658,6 +658,7 @@ NodeI::checkConsistency(const NodeSessionPrx& session) // unsigned long serial = 0; Ice::StringSeq servers; + vector<ServerCommandPtr> commands; while(true) { { @@ -665,7 +666,7 @@ NodeI::checkConsistency(const NodeSessionPrx& session) if(serial == _serial) { _serial = 1; // We can reset the serial number. - checkConsistencyNoSync(servers); + commands = checkConsistencyNoSync(servers); break; } serial = _serial; @@ -681,6 +682,8 @@ NodeI::checkConsistency(const NodeSessionPrx& session) } sort(servers.begin(), servers.end()); } + + for_each(commands.begin(), commands.end(), IceUtil::voidMemFun(&ServerCommand::execute)); } void @@ -839,9 +842,11 @@ NodeI::removeServer(const ServerIPtr& server, const std::string& application, bo } } -void +vector<ServerCommandPtr> NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers) { + vector<ServerCommandPtr> commands; + // // Check if the servers directory doesn't contain more servers // than the registry really knows. @@ -855,7 +860,7 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers) { Ice::Error out(_traceLevels->logger); out << "couldn't read directory `" << _serversDir << "':" << msg; - return; + return commands; } vector<string> remove; @@ -876,7 +881,11 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers) // try { - server->destroy(0, "", 0); + ServerCommandPtr command = server->destroy(0, "", 0, "Master"); + if(command) + { + commands.push_back(command); + } p = remove.erase(p); continue; } @@ -916,9 +925,9 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers) if(remove.empty()) { - return; + return commands; } - + // // If there's server that couldn't be removed we move them to the // temporary backup directory. First, we rotate the temporary @@ -933,7 +942,7 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers) { Ice::Error out(_traceLevels->logger); out << "couldn't read directory `" << _tmpDir << "':" << msg; - return; + return commands; } if(contents.size() < 10) @@ -975,6 +984,8 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers) Ice::Warning out(_traceLevels->logger); out << "rotation failed: " << msg; } + + return commands; } NodeSessionPrx |