// ********************************************************************** // // Copyright (c) 2003-2005 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. // // ********************************************************************** #include #include #include #include using namespace std; using namespace IceGrid; namespace IceGrid { struct AddComponent : std::unary_function { AddComponent(ServerCache& serverCache, const ServerEntryPtr& entry) : _serverCache(serverCache), _entry(entry) { } void operator()(const ComponentDescriptorPtr& desc) { _serverCache.addComponent(desc, _entry); } ServerCache& _serverCache; const ServerEntryPtr _entry; }; struct RemoveComponent : std::unary_function { RemoveComponent(ServerCache& serverCache, const ServerEntryPtr& entry) : _serverCache(serverCache), _entry(entry) { } void operator()(const ComponentDescriptorPtr& desc) { _serverCache.removeComponent(desc, _entry); } ServerCache& _serverCache; const ServerEntryPtr _entry; }; } ServerCache::ServerCache(Database& db, NodeCache& nodeCache, AdapterCache& adapterCache, ObjectCache& objectCache) : _database(db), _nodeCache(nodeCache), _adapterCache(adapterCache), _objectCache(objectCache) { } ServerEntryPtr ServerCache::add(const string& name, const ServerInstanceDescriptor& instance, const string& application) { Lock sync(*this); ServerEntryPtr entry = getImpl(name, true); entry->update(instance, application); _nodeCache.get(instance.node, true)->addServer(entry); forEachComponent(AddComponent(*this, entry))(instance); return entry; } ServerEntryPtr ServerCache::get(const string& name) { Lock sync(*this); ServerEntryPtr entry = getImpl(name); if(!entry) { ServerNotExistException ex; ex.name = name; throw ex; } return entry; } ServerEntryPtr ServerCache::update(const ServerInstanceDescriptor& instance) { Lock sync(*this); ServerEntryPtr entry = getImpl(instance.descriptor->name); assert(entry); ServerInstanceDescriptor old = entry->getDescriptor(); forEachComponent(RemoveComponent(*this, entry))(old); // // If the node changed, move the server from the old node to the // new one. // if(old.node != instance.node) { _nodeCache.get(old.node)->removeServer(entry); _nodeCache.get(instance.node, true)->addServer(entry); } forEachComponent(AddComponent(*this, entry))(instance); entry->update(instance); return entry; } ServerEntryPtr ServerCache::remove(const string& name) { Lock sync(*this); ServerEntryPtr entry = getImpl(name); ServerInstanceDescriptor instance = entry->getDescriptor(); entry->destroy(); _nodeCache.get(instance.node)->removeServer(entry); forEachComponent(RemoveComponent(*this, entry))(instance); return entry; } void ServerCache::clear(const string& name) { Lock sync(*this); CacheByString::removeImpl(name); } Database& ServerCache::getDatabase() const { return _database; } void ServerCache::addComponent(const ComponentDescriptorPtr& component, const ServerEntryPtr& entry) { for(AdapterDescriptorSeq::const_iterator q = component->adapters.begin() ; q != component->adapters.end(); ++q) { _adapterCache.get(q->id, true)->addServer(entry); for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r) { _objectCache.add(q->id, *r); } } } void ServerCache::removeComponent(const ComponentDescriptorPtr& component, const ServerEntryPtr& entry) { for(AdapterDescriptorSeq::const_iterator q = component->adapters.begin() ; q != component->adapters.end(); ++q) { _adapterCache.get(q->id)->removeServer(entry); for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r) { _objectCache.remove(r->id); } } } ServerEntry::ServerEntry(Cache& cache, const string& name) : _cache(*dynamic_cast(&cache)), _name(name), _synchronizing(false) { } void ServerEntry::sync() { map adapters; int at, dt; try { sync(adapters, at, dt); } catch(const NodeUnreachableException&) { } } bool ServerEntry::needsSync() const { Lock sync(*this); return _failed; } void ServerEntry::update(const ServerInstanceDescriptor& instance, const std::string& application) { Lock sync(*this); auto_ptr descriptor(new ServerInstanceDescriptor()); *descriptor = instance; if(!application.empty()) { _application = application; } if(_loaded.get() && descriptor->node != _loaded->node) { assert(!_destroy.get()); _destroy = _loaded; } else if(_load.get() && descriptor->node != _load->node) { assert(!_destroy.get()); _destroy = _load; } _load = descriptor; _loaded.reset(0); _proxy = 0; _adapters.clear(); } void ServerEntry::destroy() { Lock sync(*this); assert(_loaded.get() || _load.get()); if(_loaded.get()) { assert(!_destroy.get()); _destroy = _loaded; } else if(_load.get()) { assert(!_destroy.get()); _destroy = _load; } _load.reset(0); _loaded.reset(0); _proxy = 0; _adapters.clear(); } ServerInstanceDescriptor ServerEntry::getDescriptor() const { Lock sync(*this); if(!_loaded.get() && !_load.get()) { throw ServerNotExistException(); } if(_proxy) { return *_loaded.get(); } else { return *_load.get(); } } string ServerEntry::getApplication() const { Lock sync(*this); if(!_loaded.get() && !_load.get()) { throw ServerNotExistException(); } return _application; } string ServerEntry::getName() const { return _name; } ServerPrx ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout) { ServerPrx proxy; { Lock sync(*this); if(_proxy) // Synced { proxy = _proxy; activationTimeout = _activationTimeout; deactivationTimeout = _deactivationTimeout; } } if(proxy) { try { proxy->ice_ping(); return proxy; } catch(const Ice::LocalException& ex) { } } StringAdapterPrxDict adapters; proxy = sync(adapters, activationTimeout, deactivationTimeout); if(!proxy) { throw ServerNotExistException(); } return proxy; } AdapterPrx ServerEntry::getAdapter(const string& id) { AdapterPrx proxy; { Lock sync(*this); if(_proxy) // Synced { proxy = _adapters[id]; } } if(proxy) { try { proxy->ice_ping(); return proxy; } catch(const Ice::LocalException& ex) { } } StringAdapterPrxDict adapters; int activationTimeout, deactivationTimeout; sync(adapters, activationTimeout, deactivationTimeout); AdapterPrx adapter = adapters[id]; if(!adapter) { throw AdapterNotExistException(); } return adapter; } ServerPrx ServerEntry::sync(map& adapters, int& activationTimeout, int& deactivationTimeout) { ServerDescriptorPtr load; string loadNode; ServerDescriptorPtr destroy; string destroyNode; { Lock sync(*this); while(_synchronizing) { wait(); } if(!_load.get() && !_destroy.get()) { _load = _loaded; // Re-load the current server. } _synchronizing = true; _failed = false; if(_load.get()) { load = _load->descriptor; loadNode = _load->node; } if(_destroy.get()) { destroy = _destroy->descriptor; destroyNode = _destroy->node; } } ServerPrx proxy; Database& db = _cache.getDatabase(); try { if(destroy) { try { db.getNode(destroyNode)->destroyServer(destroy->name); } catch(const NodeNotExistException& ex) { if(!load) { throw NodeUnreachableException(); } } catch(const Ice::LocalException& ex) { if(!load) { throw NodeUnreachableException(); } } } if(load) { try { proxy = db.getNode(loadNode)->loadServer(load, adapters, activationTimeout, deactivationTimeout); proxy = ServerPrx::uncheckedCast(proxy->ice_collocationOptimization(false)); } catch(const NodeNotExistException& ex) { throw NodeUnreachableException(); } catch(const DeploymentException& ex) { Ice::Warning out(db.getTraceLevels()->logger); out << "failed to load server on node `" << loadNode << "':\n" << ex; throw NodeUnreachableException(); } catch(const Ice::LocalException& ex) { Ice::Warning out(db.getTraceLevels()->logger); out << "unexpected exception while loading on node `" << loadNode << "':\n" << ex; throw NodeUnreachableException(); } } } catch(const NodeUnreachableException& ex) { { Lock sync(*this); _synchronizing = false; _destroy.reset(0); _failed = true; notifyAll(); } if(!load && destroy) { _cache.clear(destroy->name); } throw; } { Lock sync(*this); _synchronizing = false; _loaded = _load; _load.reset(0); _destroy.reset(0); // // Set timeout on server and adapter proxies. Most of the // calls on the proxies shouldn't block for longer than the // node session timeout. Calls that might block for a longer // time should set the correct timeout before invoking on the // proxy (e.g.: server start/stop, adapter activate). // if(proxy) { int timeout = db.getNodeSessionTimeout() * 1000; // sec to ms _proxy = ServerPrx::uncheckedCast(proxy->ice_timeout(timeout)); _adapters.clear(); for(StringAdapterPrxDict::const_iterator p = adapters.begin(); p != adapters.end(); ++p) { AdapterPrx adapter = AdapterPrx::uncheckedCast(p->second->ice_timeout(timeout)); _adapters.insert(make_pair(p->first, adapter)); } activationTimeout += timeout; deactivationTimeout += timeout; _activationTimeout = activationTimeout; _deactivationTimeout = deactivationTimeout; } else { _proxy = 0; _adapters.clear(); _activationTimeout = 0; _deactivationTimeout = 0; } notifyAll(); } if(!load && destroy) { _cache.clear(destroy->name); } return proxy; } bool ServerEntry::isDestroyed() { Lock sync(*this); return !_loaded.get() && !_load.get(); } bool ServerEntry::canRemove() { Lock sync(*this); return !_loaded.get() && !_load.get() && !_destroy.get(); }