diff options
Diffstat (limited to 'cpp/src/IceGrid/ServerI.cpp')
-rw-r--r-- | cpp/src/IceGrid/ServerI.cpp | 137 |
1 files changed, 119 insertions, 18 deletions
diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp index 2fca2d0c00d..9c24a7a2dec 100644 --- a/cpp/src/IceGrid/ServerI.cpp +++ b/cpp/src/IceGrid/ServerI.cpp @@ -151,7 +151,9 @@ public: catch(const ServerStartException& ex) { Ice::Error out(_traceLevels->logger); - out << "couldn't reactivate server `" + _server->getId() + "' with `always' activation mode after failure:\n" << ex.reason; + out << "couldn't reactivate server `" << _server->getId() + << "' with `always' activation mode after failure:\n" + << ex.reason; } } } @@ -162,6 +164,31 @@ private: const TraceLevelsPtr _traceLevels; }; +class WaitForApplicationReplicationCB : public AMI_NodeSession_waitForApplicationReplication +{ +public: + + WaitForApplicationReplicationCB(const ServerIPtr& server) : _server(server) + { + } + + virtual void + ice_response() + { + _server->activate(); + } + + virtual void + ice_exception(const Ice::Exception&) + { + _server->activate(); + } + +private: + + const ServerIPtr _server; +}; + struct EnvironmentEval : std::unary_function<string, string> { string @@ -837,6 +864,12 @@ ServerI::start(ServerActivation activation, const AMD_Server_startPtr& amdCB) } else if(_state == Active) { + if(activation == Always) + { + return; // Nothing to do, it's already active (and we + // don't want to throw because it would be + // considered as an error.) + } throw ServerStartException(_id, "The server is already active."); } else if(_state == Destroying) @@ -885,12 +918,13 @@ ServerI::load(const AMD_Node_loadServerPtr& amdCB, const ServerInfo& info, bool // In any case, we update the server application revision if // it needs to be updated. // - if(!fromMaster) + if(!_info.uuid.empty() && !fromMaster) { if(_info.uuid != info.uuid) { - throw DeploymentException("server descriptor from replica is from another application (`" + - info.uuid + "'"); + DeploymentException ex; + ex.reason = "server descriptor from replica is from another application (`" + info.uuid + "')"; + throw ex; } else if(_info.revision > info.revision) { @@ -902,7 +936,18 @@ ServerI::load(const AMD_Node_loadServerPtr& amdCB, const ServerInfo& info, bool } } - if(_info.descriptor && _info.sessionId == info.sessionId && + // + // Otherwise, if the following conditions are met: + // + // - the server is already loaded. + // - the descriptor is from the master and the session id didn't change or it's coming from a slave. + // - the descriptor is the same as the one loaded. + // + // we don't re-load the server. We just return the server + // proxy and the proxies of its adapters. + // + if(_info.descriptor && + (!fromMaster || _info.sessionId == info.sessionId) && (_info.uuid == info.uuid && _info.revision == info.revision || descriptorEqual(_node->getCommunicator(), _info.descriptor, info.descriptor))) { @@ -956,11 +1001,35 @@ ServerI::load(const AMD_Node_loadServerPtr& amdCB, const ServerInfo& info, bool } void -ServerI::destroy(const AMD_Node_destroyServerPtr& amdCB) +ServerI::destroy(const AMD_Node_destroyServerPtr& amdCB, const string& uuid, int revision) { ServerCommandPtr command; { Lock sync(*this); + if(!uuid.empty()) // Empty if from checkConsistency. + { + if(_info.uuid.empty()) + { + DeploymentException ex; + ex.reason = "server doesn't exist (`" + uuid + "')"; + throw ex; + } + else if(_info.uuid != uuid) + { + DeploymentException ex; + ex.reason = "server descriptor from replica is from another application (`" + uuid + "')"; + throw ex; + } + else if(_info.revision > revision) + { + ostringstream os; + os << "server descriptor from replica is too old:\n"; + os << "current revision: " << _info.revision << "\n"; + os << "replica revision: " << revision; + throw DeploymentException(os.str()); + } + } + if(!_destroy) { _destroy = new DestroyCommand(this, _state != Inactive && _state != Loading && _state != Patching); @@ -1180,8 +1249,10 @@ ServerI::activationFailed(bool destroyed) void ServerI::activate() { - ServerDescriptorPtr desc; + ServerInfo info; ServerAdapterDict adpts; + bool waitForReplication; + #ifndef _WIN32 uid_t uid; gid_t gid; @@ -1189,8 +1260,18 @@ ServerI::activate() { Lock sync(*this); assert(_state == Activating && _info.descriptor); - desc = _info.descriptor; + info = _info; adpts = _adapters; + + // + // The first time the server is started, we ensure that the + // replication of its descriptor is completed. This is to make + // sure all the replicas are up to date when the server + // starts for the first time with a given descriptor. + // + waitForReplication = _waitForReplication; + _waitForReplication = false; + #ifndef _WIN32 uid = _uid; gid = _gid; @@ -1198,8 +1279,25 @@ ServerI::activate() } // + // We first ensure that the application is replicated on all the + // registries before to start the server. We only do this each + // time the server is updated or the initialy loaded on the node. + // + if(waitForReplication) + { + NodeSessionPrx session = _node->getMasterNodeSession(); + if(session) + { + AMI_NodeSession_waitForApplicationReplicationPtr cb = new WaitForApplicationReplicationCB(this); + _node->getMasterNodeSession()->waitForApplicationReplication_async(cb, info.uuid, info.revision); + return; + } + } + + // // Compute the server command line options. // + ServerDescriptorPtr desc = info.descriptor; Ice::StringSeq options; copy(desc->options.begin(), desc->options.end(), back_inserter(options)); options.push_back("--Ice.Config=" + _serverDir + "/config/config"); @@ -1608,6 +1706,7 @@ ServerI::updateImpl(const ServerInfo& info) assert(_load); _info = info; + _waitForReplication = true; if(!_info.descriptor) { @@ -2157,7 +2256,8 @@ ServerI::setStateNoSync(InternalServerState st, const std::string& reason) } else if(_state == Inactive) { - if(_activation == Disabled && _previousActivation == Always && + if(_activation == Always || + _activation == Disabled && _previousActivation == Always && _disableOnFailure > 0 && _failureTime != IceUtil::Time()) { // @@ -2168,15 +2268,16 @@ ServerI::setStateNoSync(InternalServerState st, const std::string& reason) // callback is executed. // _timer = new DelayedStart(this, _node->getTraceLevels()); - _node->getWaitQueue()->add(_timer, IceUtil::Time::seconds(_disableOnFailure) + IceUtil::Time::milliSeconds(500)); - } - else if(_activation == Always) - { - if(!_start) - { - _start = new StartCommand(this, _node->getWaitQueue(), _activationTimeout); - } - } + _node->getWaitQueue()->add(_timer, + IceUtil::Time::seconds(_disableOnFailure) + IceUtil::Time::milliSeconds(500)); + } +// else if(_activation == Always) +// { +// if(!_start) +// { +// _start = new StartCommand(this, _node->getWaitQueue(), _activationTimeout); +// } +// } } // |