diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 16 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeCache.cpp | 34 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeCache.h | 5 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.cpp | 36 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerCache.cpp | 125 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerCache.h | 11 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerI.cpp | 25 |
7 files changed, 144 insertions, 108 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index b7f26779a04..09e77531310 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -301,7 +301,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) { try { - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::load)); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait)); } catch(const DeploymentException& ex) { @@ -516,14 +516,8 @@ Database::removeApplication(const string& name, AdminSessionI* session) if(_master) { - try - { - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::load)); - } - catch(const DeploymentException&) - { - // Ignore, this is traced by the node cache. - } + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitNoThrow)); } _applicationObserverTopic->waitForSyncedSubscribers(serial); @@ -1468,7 +1462,7 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries, // try { - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::load)); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait)); } catch(const DeploymentException& ex) { @@ -1485,7 +1479,7 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries, try { - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::load)); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait)); } catch(const DeploymentException& ex) { diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp index dfd740c97e5..08fb3348763 100644 --- a/cpp/src/IceGrid/NodeCache.cpp +++ b/cpp/src/IceGrid/NodeCache.cpp @@ -350,16 +350,16 @@ NodeEntry::getInfo() const return _session->getInfo(); } -Ice::StringSeq +ServerEntrySeq NodeEntry::getServers() const { Lock sync(*this); - Ice::StringSeq names; + ServerEntrySeq entries; for(map<string, ServerEntryPtr>::const_iterator p = _servers.begin(); p != _servers.end(); ++p) { - names.push_back(p->second->getId()); + entries.push_back(p->second); } - return names; + return entries; } LoadInfo @@ -439,7 +439,7 @@ NodeEntry::loadServer(const ServerEntryPtr& entry, const ServerInfo& server, con // if(timeout > 0 && timeout != sessionTimeout) { - node = NodePrx::uncheckedCast(node->ice_timeout(sessionTimeout)); + node = NodePrx::uncheckedCast(node->ice_timeout(timeout + sessionTimeout)); } try @@ -486,17 +486,37 @@ NodeEntry::loadServer(const ServerEntryPtr& entry, const ServerInfo& server, con } void -NodeEntry::destroyServer(const ServerEntryPtr& entry, const ServerInfo& info) +NodeEntry::destroyServer(const ServerEntryPtr& entry, const ServerInfo& info, int timeout) { try { + NodePrx node; + { + Lock sync(*this); + checkSession(); + node = _session->getNode(); + int sessionTimeout = _session->getTimeout(); + + // + // Check if we should use a specific timeout (the load + // call can deactivate the server and it can take some + // time to deactivate, up to "deactivation-timeout" + // seconds). + // + if(timeout > 0 && timeout != sessionTimeout) + { + node = NodePrx::uncheckedCast(node->ice_timeout(timeout + sessionTimeout)); + } + } + if(_cache.getTraceLevels() && _cache.getTraceLevels()->server > 2) { Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->serverCat); out << "unloading `" << info.descriptor->id << "' on node `" << _name << "'"; } + AMI_Node_destroyServerPtr amiCB = new DestroyCB(_cache.getTraceLevels(), entry, _name); - getProxy()->destroyServer_async(amiCB, info.descriptor->id, info.uuid, info.revision); + node->destroyServer_async(amiCB, info.descriptor->id, info.uuid, info.revision); } catch(const NodeUnreachableException& ex) { diff --git a/cpp/src/IceGrid/NodeCache.h b/cpp/src/IceGrid/NodeCache.h index c61c1363ea3..8493921cb7e 100644 --- a/cpp/src/IceGrid/NodeCache.h +++ b/cpp/src/IceGrid/NodeCache.h @@ -28,6 +28,7 @@ typedef IceUtil::Handle<NodeSessionI> NodeSessionIPtr; class ServerEntry; typedef IceUtil::Handle<ServerEntry> ServerEntryPtr; +typedef std::vector<ServerEntryPtr> ServerEntrySeq; class ReplicaCache; @@ -48,13 +49,13 @@ public: NodePrx getProxy() const; NodeInfo getInfo() const; - Ice::StringSeq getServers() const; + ServerEntrySeq getServers() const; LoadInfo getLoadInfoAndLoadFactor(const std::string&, float&) const; bool canRemove(); void loadServer(const ServerEntryPtr&, const ServerInfo&, const SessionIPtr&, int); - void destroyServer(const ServerEntryPtr&, const ServerInfo&); + void destroyServer(const ServerEntryPtr&, const ServerInfo&, int); ServerInfo getServerInfo(const ServerInfo&, const SessionIPtr&); void __incRef(); diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp index ee87d6ac16c..a86c04600df 100644 --- a/cpp/src/IceGrid/NodeSessionI.cpp +++ b/cpp/src/IceGrid/NodeSessionI.cpp @@ -112,24 +112,20 @@ NodeSessionI::loadServers_async(const AMD_NodeSession_loadServersPtr& amdCB, con // // Get the server proxies to load them on the node. // - Ice::StringSeq servers = _database->getNode(_info.name)->getServers(); - for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p) - { - try - { - _database->getServer(*p)->load(); - } - catch(const Ice::UserException&) - { - // Ignore. - } - } + ServerEntrySeq servers = _database->getNode(_info.name)->getServers(); + for_each(servers.begin(), servers.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } Ice::StringSeq NodeSessionI::getServers(const Ice::Current& current) const { - return _database->getNode(_info.name)->getServers(); + ServerEntrySeq servers = _database->getNode(_info.name)->getServers(); + Ice::StringSeq names; + for(ServerEntrySeq::const_iterator p = servers.begin(); p != servers.end(); ++p) + { + names.push_back((*p)->getId()); + } + return names; } void @@ -208,18 +204,8 @@ NodeSessionI::destroyImpl(bool shutdown) _destroy = true; } - Ice::StringSeq servers = _database->getNode(_info.name)->getServers(); - for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p) - { - try - { - _database->getServer(*p)->unload(); - } - catch(const Ice::UserException&) - { - // Ignore. - } - } + ServerEntrySeq servers = _database->getNode(_info.name)->getServers(); + for_each(servers.begin(), servers.end(), IceUtil::voidMemFun(&ServerEntry::unsync)); // // If the registry isn't being shutdown we remove the node diff --git a/cpp/src/IceGrid/ServerCache.cpp b/cpp/src/IceGrid/ServerCache.cpp index ba6a18f73a2..f8ddf356417 100644 --- a/cpp/src/IceGrid/ServerCache.cpp +++ b/cpp/src/IceGrid/ServerCache.cpp @@ -211,20 +211,42 @@ ServerEntry::ServerEntry(ServerCache& cache, const string& id) : } void -ServerEntry::load() +ServerEntry::sync() { + syncImpl(); +} + +void +ServerEntry::syncAndWait() +{ + syncImpl(); try { - syncImpl(true); + waitImpl(); } catch(const NodeUnreachableException&) { - // Ignore + // + // The node being unreachable isn't considered as a failure to + // synchronize the server. + // + } +} + +void +ServerEntry::waitNoThrow() +{ + try + { + waitImpl(); + } + catch(const Ice::Exception&) + { } } void -ServerEntry::unload() +ServerEntry::unsync() { Lock sync(*this); if(_loaded.get()) @@ -356,7 +378,12 @@ ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout, string& while(true) { - syncImpl(true); + // + // Note that we don't call syncAndWait() because we want + // NodeUnreachableException exceptions to go through. + // + syncImpl(); + waitImpl(); { Lock sync(*this); @@ -402,7 +429,12 @@ ServerEntry::getAdapter(const string& id, bool upToDate) while(true) { - syncImpl(true); + // + // Note that we don't call syncAndWait() because we want + // NodeUnreachableException exceptions to go through. + // + syncImpl(); + waitImpl(); { Lock sync(*this); @@ -470,7 +502,7 @@ ServerEntry::getLoad(LoadSample sample) const } void -ServerEntry::syncImpl(bool waitForUpdate) +ServerEntry::syncImpl() { ServerInfo load; SessionIPtr session; @@ -481,17 +513,7 @@ ServerEntry::syncImpl(bool waitForUpdate) Lock sync(*this); if(_synchronizing) { - if(waitForUpdate) - { - while(_synchronizing) - { - wait(); - } - } - else - { - return; - } + return; } if(!_load.get() && !_destroy.get()) @@ -505,6 +527,7 @@ ServerEntry::syncImpl(bool waitForUpdate) if(_destroy.get()) { destroy = *_destroy; + timeout = _deactivationTimeout; } else if(_load.get()) { @@ -524,7 +547,7 @@ ServerEntry::syncImpl(bool waitForUpdate) { try { - _cache.getNodeCache().get(destroy.node)->destroyServer(this, destroy); + _cache.getNodeCache().get(destroy.node)->destroyServer(this, destroy, timeout); } catch(NodeNotExistException&) { @@ -541,39 +564,47 @@ ServerEntry::syncImpl(bool waitForUpdate) { exception(NodeUnreachableException(load.node, "node is not active")); } + } +} + +void +ServerEntry::waitImpl() +{ + Lock sync(*this); + while(_synchronizing) + { + wait(); } - - if(waitForUpdate) + + if(_exception.get()) { - Lock sync(*this); - while(_synchronizing) + try { - wait(); + _exception->ice_throw(); } - if(_exception.get()) + catch(const DeploymentException&) { - try - { - _exception->ice_throw(); - } - catch(const DeploymentException&) - { - throw; - } - catch(const NodeUnreachableException&) - { - throw; - } - catch(const Ice::Exception& ex) + throw; + } + catch(const NodeUnreachableException&) + { + throw; + } + catch(const Ice::Exception& ex) // This shouln't happen. + { + ostringstream os; + os << "unexpected exception while synchronizing server `" + _id + "':\n" << ex; + TraceLevelsPtr traceLevels = _cache.getTraceLevels(); + if(traceLevels) { - ostringstream os; - os << "unexpected exception while synchronizing server `" + _id + "':\n" << ex; - throw DeploymentException(os.str()); + Ice::Error err(traceLevels->logger); + err << os.str(); } + throw DeploymentException(os.str()); } } } - + void ServerEntry::loadCallback(const ServerPrx& proxy, const AdapterPrxDict& adpts, int at, int dt) { @@ -626,7 +657,7 @@ ServerEntry::loadCallback(const ServerPrx& proxy, const AdapterPrxDict& adpts, i { try { - _cache.getNodeCache().get(destroy.node)->destroyServer(this, destroy); + _cache.getNodeCache().get(destroy.node)->destroyServer(this, destroy, timeout); } catch(NodeNotExistException&) { @@ -842,7 +873,8 @@ ServerEntry::allocatedNoSync(const SessionIPtr& session) } } - syncImpl(true); // We sync here to ensure the "session" server will be activated. + sync(); + waitNoThrow(); } void @@ -933,6 +965,7 @@ ServerEntry::releasedNoSync(const SessionIPtr& session) return; } } - - syncImpl(true); // We sync here to ensure the "session" server will be shutdown. + + sync(); + waitNoThrow(); } diff --git a/cpp/src/IceGrid/ServerCache.h b/cpp/src/IceGrid/ServerCache.h index 892e89d140a..1e934430c30 100644 --- a/cpp/src/IceGrid/ServerCache.h +++ b/cpp/src/IceGrid/ServerCache.h @@ -36,8 +36,10 @@ public: ServerEntry(ServerCache&, const std::string&); - void load(); - void unload(); + void sync(); + void syncAndWait(); + void waitNoThrow(); + void unsync(); void update(const ServerInfo&); void destroy(); @@ -64,8 +66,9 @@ public: private: - void syncImpl(bool); - + void syncImpl(); + void waitImpl(); + ServerCache& _cache; const std::string _id; std::auto_ptr<ServerInfo> _loaded; diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp index a3e5871237e..daeef08ca53 100644 --- a/cpp/src/IceGrid/ServerI.cpp +++ b/cpp/src/IceGrid/ServerI.cpp @@ -363,13 +363,7 @@ DestroyCommand::DestroyCommand(const ServerIPtr& server, bool kill) : bool DestroyCommand::canExecute(ServerI::InternalServerState state) { - return - state == ServerI::Inactive || - state == ServerI::WaitForActivation || - state == ServerI::Active || - state == ServerI::ActivationTimeout || - state == ServerI::Deactivating || - state == ServerI::DeactivatingWaitForProcess; + return state == ServerI::Inactive; } ServerI::InternalServerState @@ -1051,9 +1045,14 @@ ServerI::destroy(const AMD_Node_destroyServerPtr& amdCB, const string& uuid, int } } + if(!StopCommand::isStopped(_state) && !_stop) + { + _stop = new StopCommand(this, _node->getWaitQueue(), _deactivationTimeout); + } if(!_destroy) { - _destroy = new DestroyCommand(this, _state != Inactive && _state != Loading && _state != Patching); + //_destroy = new DestroyCommand(this, _state != Inactive && _state != Loading && _state != Patching); + _destroy = new DestroyCommand(this, false); } if(amdCB) { @@ -2097,13 +2096,13 @@ ServerCommandPtr ServerI::nextCommand() { ServerCommandPtr command; - if(_destroy && _destroy->canExecute(_state)) + if(_stop && _stop->canExecute(_state)) { - command = _destroy; + command = _stop; } - else if(_stop && _stop->canExecute(_state)) + else if(_destroy && _destroy->canExecute(_state)) { - command = _stop; + command = _destroy; } else if(_load && _load->canExecute(_state)) { @@ -2159,7 +2158,7 @@ ServerI::setStateNoSync(InternalServerState st, const std::string& reason) assert(_state == Deactivating); break; case Destroying: - assert(_destroy && _destroy->canExecute(_state)); + assert(_state == Inactive && _destroy && _destroy->canExecute(_state)); break; case Destroyed: assert(_destroy); |