diff options
Diffstat (limited to 'cpp/src/IceGrid/ServerI.cpp')
-rw-r--r-- | cpp/src/IceGrid/ServerI.cpp | 580 |
1 files changed, 403 insertions, 177 deletions
diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp index 2fddb91959f..dbbce6c3467 100644 --- a/cpp/src/IceGrid/ServerI.cpp +++ b/cpp/src/IceGrid/ServerI.cpp @@ -18,6 +18,7 @@ #include <IceGrid/NodeI.h> #include <IceGrid/Util.h> #include <IceGrid/ServerAdapterI.h> +#include <IceGrid/WaitQueue.h> #include <IcePatch2/Util.h> @@ -37,12 +38,42 @@ using namespace std; using namespace IceGrid; using namespace IcePatch2; -ServerI::ServerI(const NodeIPtr& node, const string& serversDir, const string& name) : +namespace IceGrid +{ + +class WaitForActivationItem : public WaitItem +{ +public: + + WaitForActivationItem(const ServerIPtr& server) : + WaitItem(server), + _server(server) + { + } + + virtual void execute() + { + } + + virtual void expired(bool destroyed) + { + _server->activationFailed(!destroyed); + } + +private: + + const ServerIPtr _server; +}; + +} + +ServerI::ServerI(const NodeIPtr& node, const ServerPrx& proxy, const string& serversDir, const string& name, int wt) : _node(node), + _this(proxy), _name(name), - _waitTime(_node->getCommunicator()->getProperties()->getPropertyAsIntWithDefault("IceGrid.Node.WaitTime", 60)), + _waitTime(wt), _serversDir(serversDir), - _state(Inactive) + _state(ServerI::Inactive) { assert(_node->getActivator()); } @@ -58,13 +89,13 @@ ServerI::load(const ServerDescriptorPtr& descriptor, StringAdapterPrxDict& adapt { { Lock sync(*this); - if(_state == Destroying || _state == Destroyed) + if(_state == ServerI::Destroying || _state == ServerI::Destroyed) { Ice::ObjectNotExistException ex(__FILE__,__LINE__); - ex.id = current.id; + ex.id = _this->ice_getIdentity(); throw ex; } - else if(_state == Inactive) + else if(_state == ServerI::Inactive) { // // If the server is inactive we can update its descriptor and its directory. @@ -84,107 +115,16 @@ ServerI::load(const ServerDescriptorPtr& descriptor, StringAdapterPrxDict& adapt } // - // If the server is not inactive we stop it and try again to update it. + // If the server is active we stop it and try again to update it. // stop(current); } } -bool -ServerI::start(ServerActivation act, const Ice::Current& current) +void +ServerI::start_async(const AMD_Server_startPtr& amdCB, const Ice::Current& current) { - ServerDescriptorPtr desc; - while(true) - { - Lock sync(*this); - switch(_state) - { - case Inactive: - { - if(act < _activation) - { - return false; - } - - setStateNoSync(Activating, current); - break; - } - case Activating: - case Deactivating: - { - wait(); // TODO: Timeout? - continue; - } - case Active: - { - return true; // Raise an exception instead? - } - case Destroying: - case Destroyed: - { - Ice::ObjectNotExistException ex(__FILE__,__LINE__); - ex.id = current.id; - throw ex; - } - } - - assert(_state == Activating); - - desc = _desc; - break; - } - - // - // Compute the server command line options. - // - Ice::StringSeq options; - string exe; - if(!desc->interpreter.empty()) - { - if(desc->interpreter == "icebox") - { - exe = desc->exe.empty() ? "icebox" : desc->exe; - copy(desc->interpreterOptions.begin(), desc->interpreterOptions.end(), back_inserter(options)); - } - else if(desc->interpreter == "java-icebox") - { - exe = desc->exe.empty() ? "java" : desc->exe; - copy(desc->interpreterOptions.begin(), desc->interpreterOptions.end(), back_inserter(options)); - options.push_back("IceBox.Server"); - } - else - { - exe = desc->interpreter; - copy(desc->interpreterOptions.begin(), desc->interpreterOptions.end(), back_inserter(options)); - options.push_back(desc->exe); - } - } - else - { - exe = desc->exe; - } - copy(desc->options.begin(), desc->options.end(), back_inserter(options)); - options.push_back("--Ice.Config=" + _serverDir + "/config/config"); - - Ice::StringSeq envs; - copy(desc->envs.begin(), desc->envs.end(), back_inserter(envs)); - - try - { - ServerPrx self = ServerPrx::uncheckedCast(current.adapter->createProxy(current.id)); - bool active = _node->getActivator()->activate(desc->name, exe, desc->pwd, options, envs, self); - setState(active ? Active : Inactive, current); - return active; - } - catch(const Ice::SyscallException& ex) - { - Ice::Warning out(_node->getTraceLevels()->logger); - out << "activation failed for server `" << _name << "':\n"; - out << ex; - - setState(Inactive, current); - return false; - } + startInternal(Manual, amdCB); } void @@ -195,31 +135,32 @@ ServerI::stop(const Ice::Current& current) Lock sync(*this); switch(_state) { - case Inactive: + case ServerI::Inactive: { return; } - case Activating: - case Deactivating: + case ServerI::Activating: + case ServerI::Deactivating: { wait(); // TODO: Timeout? continue; } - case Active: + case ServerI::WaitForActivation: + case ServerI::Active: { - setStateNoSync(Deactivating, current); + setStateNoSync(ServerI::Deactivating); break; } - case Destroying: - case Destroyed: + case ServerI::Destroying: + case ServerI::Destroyed: { Ice::ObjectNotExistException ex(__FILE__,__LINE__); - ex.id = current.id; + ex.id = _this->ice_getIdentity(); throw ex; } } - assert(_state == Deactivating); + assert(_state == ServerI::Deactivating); break; } @@ -258,33 +199,34 @@ ServerI::destroy(const Ice::Current& current) Lock sync(*this); switch(_state) { - case Inactive: + case ServerI::Inactive: { - setStateNoSync(Destroyed, current); + setStateNoSync(ServerI::Destroyed); break; } - case Active: + case ServerI::WaitForActivation: + case ServerI::Active: { stop = true; - setStateNoSync(Destroying, current); + setStateNoSync(ServerI::Destroying); break; } - case Activating: - case Deactivating: + case ServerI::Activating: + case ServerI::Deactivating: { wait(); // TODO: Timeout? continue; } - case Destroying: - case Destroyed: + case ServerI::Destroying: + case ServerI::Destroyed: { Ice::ObjectNotExistException ex(__FILE__,__LINE__); - ex.id = current.id; + ex.id = _this->ice_getIdentity(); throw ex; } } - assert(_state == Destroyed || _state == Destroying); + assert(_state == ServerI::Destroyed || _state == ServerI::Destroying); break; } @@ -296,11 +238,11 @@ ServerI::destroy(const Ice::Current& current) // // Destroy the object adapters. // - for(StringAdapterPrxDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + for(map<string, ServerAdapterIPtr>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) { try { - p->second->destroy(); + p->second->destroy(current); } catch(const Ice::LocalException&) { @@ -328,51 +270,52 @@ ServerI::destroy(const Ice::Current& current) void ServerI::terminated(const Ice::Current& current) { - ServerState newState = Inactive; // Initialize to keep the compiler happy. - StringAdapterPrxDict adpts; + InternalServerState newState = ServerI::Inactive; // Initialize to keep the compiler happy. + map<string, ServerAdapterIPtr> adpts; while(true) { Lock sync(*this); switch(_state) { - case Inactive: + case ServerI::Inactive: { assert(false); } - case Activating: + case ServerI::Activating: { wait(); // TODO: Timeout? continue; } - case Active: + case ServerI::WaitForActivation: + case ServerI::Active: { - setStateNoSync(Deactivating, current); - newState = Inactive; + setStateNoSync(ServerI::Deactivating); + newState = ServerI::Inactive; break; } - case Deactivating: + case ServerI::Deactivating: { // // Deactivation was initiated by the stop method. // - newState = Inactive; + newState = ServerI::Inactive; break; } - case Destroying: + case ServerI::Destroying: { // // Deactivation was initiated by the destroy method. // - newState = Destroyed; + newState = ServerI::Destroyed; break; } - case Destroyed: + case ServerI::Destroyed: { assert(false); } } - assert(_state == Deactivating || _state == Destroying); + assert(_state == ServerI::Deactivating || _state == ServerI::Destroying); adpts = _adapters; // @@ -382,18 +325,18 @@ ServerI::terminated(const Ice::Current& current) break; } - if(newState != Destroyed) + if(newState != ServerI::Destroyed) { // // The server has terminated, set its adapter direct proxies to // null to cause the server re-activation if one of its adapter // direct proxy is requested. // - for(StringAdapterPrxDict::iterator p = adpts.begin(); p != adpts.end(); ++p) + for(map<string, ServerAdapterIPtr>::iterator p = adpts.begin(); p != adpts.end(); ++p) { try { - p->second->setDirectProxy(0); + p->second->setDirectProxy(0, current); } catch(const Ice::ObjectNotExistException&) { @@ -401,18 +344,18 @@ ServerI::terminated(const Ice::Current& current) } } - setState(newState, current); + setState(newState); } ServerState -ServerI::getState(const Ice::Current&) +ServerI::getState(const Ice::Current&) const { Lock sync(*this); - return _state; + return toServerState(_state); } Ice::Int -ServerI::getPid(const Ice::Current& current) +ServerI::getPid(const Ice::Current&) const { return _node->getActivator()->getServerPid(_name); } @@ -425,14 +368,14 @@ ServerI::setActivationMode(ServerActivation mode, const ::Ice::Current&) } ServerActivation -ServerI::getActivationMode(const ::Ice::Current&) +ServerI::getActivationMode(const ::Ice::Current&) const { Lock sync(*this); return _activation; } ServerDescriptorPtr -ServerI::getDescriptor(const Ice::Current&) +ServerI::getDescriptor(const Ice::Current&) const { Lock sync(*this); return _desc; @@ -446,11 +389,252 @@ ServerI::setProcess(const ::Ice::ProcessPrx& proc, const ::Ice::Current&) notifyAll(); } -StringAdapterPrxDict -ServerI::getAdapters(const Ice::Current&) +bool +ServerI::startInternal(ServerActivation act, const AMD_Server_startPtr& amdCB) +{ + ServerDescriptorPtr desc; + while(true) + { + Lock sync(*this); + switch(_state) + { + case ServerI::Inactive: + { + if(act < _activation) + { + if(amdCB) + { + amdCB->ice_response(false); + } + return false; + } + + setStateNoSync(ServerI::Activating); + break; + } + case ServerI::Activating: + case ServerI::Deactivating: + { + wait(); // TODO: Timeout? + continue; + } + case ServerI::WaitForActivation: + { + if(amdCB) + { + _startCB.push_back(amdCB); + } + return true; + } + case ServerI::Active: + { + if(amdCB) + { + amdCB->ice_response(true); + } + return true; + } + case ServerI::Destroying: + case ServerI::Destroyed: + { + Ice::ObjectNotExistException ex(__FILE__,__LINE__); + ex.id = _this->ice_getIdentity(); + throw ex; + } + } + + assert(_state == ServerI::Activating); + + desc = _desc; + if(amdCB) + { + _startCB.push_back(amdCB); + } + break; + } + + // + // Compute the server command line options. + // + Ice::StringSeq options; + string exe; + if(!desc->interpreter.empty()) + { + if(desc->interpreter == "icebox") + { + exe = desc->exe.empty() ? "icebox" : desc->exe; + copy(desc->interpreterOptions.begin(), desc->interpreterOptions.end(), back_inserter(options)); + } + else if(desc->interpreter == "java-icebox") + { + exe = desc->exe.empty() ? "java" : desc->exe; + copy(desc->interpreterOptions.begin(), desc->interpreterOptions.end(), back_inserter(options)); + options.push_back("IceBox.Server"); + } + else + { + exe = desc->interpreter; + copy(desc->interpreterOptions.begin(), desc->interpreterOptions.end(), back_inserter(options)); + options.push_back(desc->exe); + } + } + else + { + exe = desc->exe; + } + copy(desc->options.begin(), desc->options.end(), back_inserter(options)); + options.push_back("--Ice.Config=" + _serverDir + "/config/config"); + + Ice::StringSeq envs; + copy(desc->envs.begin(), desc->envs.end(), back_inserter(envs)); + + try + { + bool started = _node->getActivator()->activate(desc->name, exe, desc->pwd, options, envs, _this); + if(!started) + { + setState(ServerI::Inactive); + return false; + } + else + { + Lock sync(*this); + int timeout = _desc->activationTimeout > 0 ? _desc->activationTimeout : _waitTime; + _node->getWaitQueue()->add(new WaitForActivationItem(this), IceUtil::Time::seconds(timeout)); + setStateNoSync(ServerI::WaitForActivation); + checkActivation(); + notifyAll(); + return true; + } + } + catch(const Ice::SyscallException& ex) + { + Ice::Warning out(_node->getTraceLevels()->logger); + out << "activation failed for server `" << _name << "':\n"; + out << ex; + + setState(ServerI::Inactive); + return false; + } +} + +void +ServerI::adapterActivated(const string& id) { Lock sync(*this); - return _adapters; + _activeAdapters.insert(id); + checkActivation(); +} + +void +ServerI::adapterDeactivated(const string& id) +{ + Lock sync(*this); + _activeAdapters.erase(id); +} + +void +ServerI::activationFailed(bool timeout) +{ + map<string, ServerAdapterIPtr> adapters; + { + Lock sync(*this); + if(_state != ServerI::WaitForActivation) + { + return; + } + + for(vector<AMD_Server_startPtr>::const_iterator p = _startCB.begin(); p != _startCB.end(); ++p) + { + (*p)->ice_response(false); + } + _startCB.clear(); + + if(_node->getTraceLevels()->server > 1) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->serverCat); + if(timeout) + { + out << "server `" << _name << "' activation timed out"; + } + else + { + out << "server `" << _name << "' activation failed"; + } + } + adapters = _adapters; + } + + for(map<string, ServerAdapterIPtr>::const_iterator p = adapters.begin(); p != adapters.end(); ++p) + { + try + { + p->second->activationFailed(timeout); + } + catch(const Ice::ObjectNotExistException&) + { + } + } +} + +void +ServerI::addDynamicInfo(ServerDynamicInfoSeq& serverInfos, AdapterDynamicInfoSeq& adapterInfos) const +{ + // + // Add server info if it's not inactive. + // + ServerDynamicInfo info; + map<string, ServerAdapterIPtr> adapters; + { + Lock sync(*this); + if(_state == ServerI::Inactive) + { + return; + } + adapters = _adapters; + info.state = toServerState(_state); + } + info.name = _name; + info.pid = info.state == IceGrid::Active ? getPid() : 0; + serverInfos.push_back(info); + + // + // Add adapters info. + // + for(map<string, ServerAdapterIPtr>::const_iterator p = adapters.begin(); p != adapters.end(); ++p) + { + try + { + AdapterDynamicInfo adapter; + adapter.id = p->first; + adapter.proxy = p->second->getDirectProxy(); + adapterInfos.push_back(adapter); + } + catch(const AdapterNotActiveException&) + { + } + catch(const Ice::ObjectNotExistException&) + { + } + } +} + +void +ServerI::checkActivation() +{ + //assert(locked()); + if(_state == ServerI::WaitForActivation) + { + for(AdapterDescriptorSeq::const_iterator p = _desc->adapters.begin(); p != _desc->adapters.end(); ++p) + { + if(!p->noWaitForActivation && _activeAdapters.find(p->id) == _activeAdapters.end()) + { + return; + } + } + setStateNoSync(ServerI::Active); + notifyAll(); + } } void @@ -464,7 +648,7 @@ ServerI::stopInternal(bool kill, const Ice::Current& current) { while(!_process) { - if(_state == Inactive || _state == Destroyed) + if(_state == ServerI::Inactive || _state == ServerI::Destroyed) { // // State changed to inactive or destroyed, the server @@ -504,12 +688,12 @@ ServerI::stopInternal(bool kill, const Ice::Current& current) Lock sync(*this); #ifndef NDEBUG - ServerState oldState = _state; + InternalServerState oldState = _state; #endif while(true) { - if(_state == Inactive || _state == Destroyed) + if(_state == ServerI::Inactive || _state == ServerI::Destroyed) { // // State changed to inactive or destroyed, the server @@ -521,7 +705,8 @@ ServerI::stopInternal(bool kill, const Ice::Current& current) // // Wait for a notification. // - bool notify = timedWait(IceUtil::Time::seconds(_waitTime)); + int timeout = _desc->deactivationTimeout > 0 ? _desc->deactivationTimeout : _waitTime; + bool notify = timedWait(IceUtil::Time::seconds(timeout)); if(!notify) { assert(oldState == _state); @@ -556,40 +741,51 @@ ServerI::stopInternal(bool kill, const Ice::Current& current) out << "deactivation failed for server `" << _name << "':\n"; out << ex; - setState(Active, current); + setState(ServerI::Active); } } void -ServerI::setState(ServerState st, const Ice::Current& current) +ServerI::setState(InternalServerState st) { Lock sync(*this); - setStateNoSync(st, current); + setStateNoSync(st); notifyAll(); } void -ServerI::setStateNoSync(ServerState st, const Ice::Current& current) +ServerI::setStateNoSync(InternalServerState st) { _state = st; + if(!_startCB.empty() && _state != ServerI::WaitForActivation) + { + for(vector<AMD_Server_startPtr>::const_iterator p = _startCB.begin(); p != _startCB.end(); ++p) + { + (*p)->ice_response(_state == ServerI::Active); + } + _startCB.clear(); + + _node->getWaitQueue()->notifyAllWaitingOn(this); + } + NodeObserverPrx observer = _node->getObserver(); if(observer) { ServerDynamicInfo info; info.name = _name; - info.state = st; + info.state = toServerState(st); // // NOTE: this must be done only for the active state. Otherwise, we could get a // deadlock since getPid() will lock the activator and since this method might // be called from the activator locked. // - info.pid = st == Active ? getPid(current) : 0; + info.pid = st == ServerI::Active ? getPid() : 0; try { - observer->updateServer(_node->getName(current), info); + observer->updateServer(_node->getName(), info); } catch(const Ice::LocalException&) { @@ -598,34 +794,39 @@ ServerI::setStateNoSync(ServerState st, const Ice::Current& current) if(_node->getTraceLevels()->server > 1) { - if(_state == Active) + if(_state == ServerI::Active) { Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->serverCat); out << "changed server `" << _name << "' state to `Active'"; } - else if(_state == Inactive) + else if(_state == ServerI::Inactive) { Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->serverCat); out << "changed server `" << _name << "' state to `Inactive'"; } - else if(_state == Destroyed) + else if(_state == ServerI::Destroyed) { Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->serverCat); out << "changed server `" << _name << "' state to `Destroyed'"; } else if(_node->getTraceLevels()->server > 2) { - if(_state == Activating) + if(_state == ServerI::WaitForActivation) + { + Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->serverCat); + out << "changed server `" << _name << "' state to `WaitForActivation'"; + } + else if(_state == ServerI::Activating) { Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->serverCat); out << "changed server `" << _name << "' state to `Activating'"; } - else if(_state == Deactivating) + else if(_state == ServerI::Deactivating) { Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->serverCat); out << "changed server `" << _name << "' state to `Deactivating'"; } - else if(_state == Destroying) + else if(_state == ServerI::Destroying) { Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->serverCat); out << "changed server `" << _name << "' state to `Destroying'"; @@ -637,8 +838,6 @@ ServerI::setStateNoSync(ServerState st, const Ice::Current& current) void ServerI::update(const ServerDescriptorPtr& descriptor, StringAdapterPrxDict& adapters, const Ice::Current& current) { - ServerPrx self = ServerPrx::uncheckedCast(current.adapter->createProxy(current.id)); - _desc = descriptor; _serverDir = _serversDir + "/" + descriptor->name; _activation = descriptor->activation == "on-demand" ? OnDemand : Manual; @@ -742,11 +941,11 @@ ServerI::update(const ServerDescriptorPtr& descriptor, StringAdapterPrxDict& ada // Create the object adapter objects if necessary. // _processRegistered = false; - StringAdapterPrxDict oldAdapters; + map<string, ServerAdapterIPtr> oldAdapters; oldAdapters.swap(_adapters); for(AdapterDescriptorSeq::const_iterator p = descriptor->adapters.begin(); p != descriptor->adapters.end(); ++p) { - addAdapter(*p, self, current); + adapters.insert(make_pair(p->id, addAdapter(*p, current))); oldAdapters.erase(p->id); } if(iceBox) @@ -756,36 +955,39 @@ ServerI::update(const ServerDescriptorPtr& descriptor, StringAdapterPrxDict& ada ServiceDescriptorPtr s = ServiceDescriptorPtr::dynamicCast(p->descriptor); for(AdapterDescriptorSeq::const_iterator q = s->adapters.begin(); q != s->adapters.end(); ++q) { - addAdapter(*q, self, current); + adapters.insert(make_pair(q->id, addAdapter(*q, current))); oldAdapters.erase(q->id); } } } - for(StringAdapterPrxDict::const_iterator p = oldAdapters.begin(); p != oldAdapters.end(); ++p) + for(map<string, ServerAdapterIPtr>::const_iterator p = oldAdapters.begin(); p != oldAdapters.end(); ++p) { try { - p->second->destroy(); + p->second->destroy(current); } catch(const Ice::LocalException&) { } } - adapters = _adapters; } -void -ServerI::addAdapter(const AdapterDescriptor& descriptor, const ServerPrx& self, const Ice::Current& current) +AdapterPrx +ServerI::addAdapter(const AdapterDescriptor& descriptor, const Ice::Current& current) { Ice::Identity id; id.category = "IceGridServerAdapter"; id.name = _desc->name + "-" + descriptor.id; - if(!current.adapter->find(id)) + AdapterPrx proxy = AdapterPrx::uncheckedCast(current.adapter->createProxy(id)); + ServerAdapterIPtr servant = ServerAdapterIPtr::dynamicCast(current.adapter->find(id)); + if(!servant) { - current.adapter->add(new ServerAdapterI(_node, self, descriptor.id, _waitTime), id); + servant = new ServerAdapterI(_node, this, proxy, descriptor.id, _waitTime); + current.adapter->add(servant, id); } - _adapters[descriptor.id] = AdapterPrx::uncheckedCast(current.adapter->createProxy(id)); + _adapters.insert(make_pair(descriptor.id, servant)); _processRegistered |= descriptor.registerProcess; + return proxy; } void @@ -954,3 +1156,27 @@ ServerI::createProperty(const string& name, const string& value) return prop; } +ServerState +ServerI::toServerState(InternalServerState st) const +{ + switch(st) + { + case ServerI::Inactive: + return IceGrid::Inactive; + case ServerI::Activating: + return IceGrid::Activating; + case ServerI::WaitForActivation: + return IceGrid::Activating; + case ServerI::Active: + return IceGrid::Active; + case ServerI::Deactivating: + return IceGrid::Deactivating; + case ServerI::Destroying: + return IceGrid::Destroying; + case ServerI::Destroyed: + return IceGrid::Destroyed; + default: + assert(false); + return IceGrid::Destroyed; + } +} |