From 99894938dbde9a0bb10fc1998d9863cae52b8977 Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Fri, 8 Jul 2005 13:47:30 +0000 Subject: More adapter replication changes. --- cpp/src/IceGrid/ServerCache.cpp | 512 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 512 insertions(+) create mode 100644 cpp/src/IceGrid/ServerCache.cpp (limited to 'cpp/src/IceGrid/ServerCache.cpp') diff --git a/cpp/src/IceGrid/ServerCache.cpp b/cpp/src/IceGrid/ServerCache.cpp new file mode 100644 index 00000000000..0fc2ca76c5a --- /dev/null +++ b/cpp/src/IceGrid/ServerCache.cpp @@ -0,0 +1,512 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2005 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include +#include +#include +#include + +using namespace std; +using namespace IceGrid; + +namespace IceGrid +{ + + struct AddComponent : std::unary_function + { + AddComponent(ServerCache& serverCache, const ServerEntryPtr& entry) : + _serverCache(serverCache), _entry(entry) + { + } + + void + operator()(const ComponentDescriptorPtr& desc) + { + _serverCache.addComponent(desc, _entry); + } + + ServerCache& _serverCache; + const ServerEntryPtr _entry; + }; + + struct RemoveComponent : std::unary_function + { + RemoveComponent(ServerCache& serverCache, const ServerEntryPtr& entry) : + _serverCache(serverCache), _entry(entry) + { + } + + void + operator()(const ComponentDescriptorPtr& desc) + { + _serverCache.removeComponent(desc, _entry); + } + + ServerCache& _serverCache; + const ServerEntryPtr _entry; + }; +} + +ServerCache::ServerCache(Database& db, NodeCache& nodeCache, AdapterCache& adapterCache, ObjectCache& objectCache) : + _database(db), _nodeCache(nodeCache), _adapterCache(adapterCache), _objectCache(objectCache) +{ +} + +ServerEntryPtr +ServerCache::add(const string& name, const ServerInstanceDescriptor& instance, const string& application) +{ + Lock sync(*this); + + ServerEntryPtr entry = getImpl(name, true); + entry->update(instance, application); + _nodeCache.get(instance.node, true)->addServer(entry); + + forEachComponent(AddComponent(*this, entry))(instance); + return entry; +} + +ServerEntryPtr +ServerCache::get(const string& name) +{ + Lock sync(*this); + ServerEntryPtr entry = getImpl(name); + if(!entry) + { + ServerNotExistException ex; + ex.name = name; + throw ex; + } + return entry; +} + +ServerEntryPtr +ServerCache::update(const ServerInstanceDescriptor& instance) +{ + Lock sync(*this); + + ServerEntryPtr entry = getImpl(instance.descriptor->name); + assert(entry); + + ServerInstanceDescriptor old = entry->getDescriptor(); + forEachComponent(RemoveComponent(*this, entry))(old); + + // + // If the node changed, move the server from the old node to the + // new one. + // + if(old.node != instance.node) + { + _nodeCache.get(old.node)->removeServer(entry); + _nodeCache.get(instance.node)->addServer(entry); + } + + forEachComponent(AddComponent(*this, entry))(instance); + entry->update(instance); + + return entry; +} + +ServerEntryPtr +ServerCache::remove(const string& name) +{ + Lock sync(*this); + + ServerEntryPtr entry = getImpl(name); + ServerInstanceDescriptor instance = entry->getDescriptor(); + entry->destroy(); + _nodeCache.get(instance.node)->removeServer(entry); + + forEachComponent(RemoveComponent(*this, entry))(instance); + + return entry; +} + +void +ServerCache::clear(const string& name) +{ + Lock sync(*this); + CacheByString::removeImpl(name); +} + +Database& +ServerCache::getDatabase() const +{ + return _database; +} + +void +ServerCache::addComponent(const ComponentDescriptorPtr& component, const ServerEntryPtr& entry) +{ + for(AdapterDescriptorSeq::const_iterator q = component->adapters.begin() ; q != component->adapters.end(); ++q) + { + _adapterCache.get(q->id, true)->addServer(entry); + for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r) + { + _objectCache.add(q->id, *r); + } + } +} + +void +ServerCache::removeComponent(const ComponentDescriptorPtr& component, const ServerEntryPtr& entry) +{ + for(AdapterDescriptorSeq::const_iterator q = component->adapters.begin() ; q != component->adapters.end(); ++q) + { + _adapterCache.get(q->id)->removeServer(entry); + for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r) + { + _objectCache.remove(r->id); + } + } +} + +ServerEntry::ServerEntry(Cache& cache, const string& name) : + _cache(*dynamic_cast(&cache)), + _name(name), + _synchronizing(false) +{ +} + +void +ServerEntry::sync() +{ + map adapters; + int at, dt; + try + { + sync(adapters, at, dt); + } + catch(const NodeUnreachableException&) + { + } +} + +bool +ServerEntry::needsSync() const +{ + Lock sync(*this); + return _failed; +} + +void +ServerEntry::update(const ServerInstanceDescriptor& instance, const std::string& application) +{ + Lock sync(*this); + + auto_ptr descriptor(new ServerInstanceDescriptor()); + *descriptor = instance; + + if(!application.empty()) + { + _application = application; + } + + if(_loaded.get() && descriptor->node != _loaded->node) + { + assert(!_destroy.get()); + _destroy = _loaded; + } + else if(_load.get() && descriptor->node != _load->node) + { + assert(!_destroy.get()); + _destroy = _load; + } + + _load = descriptor; + _loaded.reset(0); + _proxy = 0; + _adapters.clear(); +} + +void +ServerEntry::destroy() +{ + Lock sync(*this); + + assert(_loaded.get() || _load.get()); + if(_loaded.get()) + { + assert(!_destroy.get()); + _destroy = _loaded; + } + else if(_load.get()) + { + assert(!_destroy.get()); + _destroy = _load; + } + + _load.reset(0); + _loaded.reset(0); + _proxy = 0; + _adapters.clear(); +} + +ServerInstanceDescriptor +ServerEntry::getDescriptor() const +{ + Lock sync(*this); + if(!_loaded.get() && !_load.get()) + { + throw ServerNotExistException(); + } + if(_proxy) + { + return *_loaded.get(); + } + else + { + return *_load.get(); + } +} + +string +ServerEntry::getApplication() const +{ + Lock sync(*this); + if(!_loaded.get() && !_load.get()) + { + throw ServerNotExistException(); + } + return _application; +} + +string +ServerEntry::getName() const +{ + return _name; +} + +ServerPrx +ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout) +{ + ServerPrx proxy; + { + Lock sync(*this); + if(_proxy) // Synced + { + proxy = _proxy; + activationTimeout = _activationTimeout; + deactivationTimeout = _deactivationTimeout; + } + } + + if(proxy) + { + try + { + proxy->ice_ping(); + return proxy; + } + catch(const Ice::LocalException& ex) + { + } + } + + StringAdapterPrxDict adapters; + proxy = sync(adapters, activationTimeout, deactivationTimeout); + if(!proxy) + { + throw ServerNotExistException(); + } + return proxy; +} + +AdapterPrx +ServerEntry::getAdapter(const string& id) +{ + AdapterPrx proxy; + { + Lock sync(*this); + if(_proxy) // Synced + { + proxy = _adapters[id]; + } + } + + if(proxy) + { + try + { + proxy->ice_ping(); + return proxy; + } + catch(const Ice::LocalException& ex) + { + } + } + + StringAdapterPrxDict adapters; + int activationTimeout, deactivationTimeout; + sync(adapters, activationTimeout, deactivationTimeout); + AdapterPrx adapter = adapters[id]; + if(!adapter) + { + throw AdapterNotExistException(); + } + return adapter; +} + +ServerPrx +ServerEntry::sync(map& adapters, int& activationTimeout, int& deactivationTimeout) +{ + ServerDescriptorPtr load; + string loadNode; + ServerDescriptorPtr destroy; + string destroyNode; + { + Lock sync(*this); + while(_synchronizing) + { + wait(); + } + + if(!_load.get() && !_destroy.get()) + { + _load = _loaded; // Re-load the current server. + } + + _synchronizing = true; + _failed = false; + if(_load.get()) + { + load = _load->descriptor; + loadNode = _load->node; + } + if(_destroy.get()) + { + destroy = _destroy->descriptor; + destroyNode = _destroy->node; + } + } + + ServerPrx proxy; + Database& db = _cache.getDatabase(); + try + { + if(destroy) + { + try + { + db.getNode(destroyNode)->destroyServer(destroy->name); + } + catch(const NodeNotExistException& ex) + { + if(!load) + { + throw NodeUnreachableException(); + } + } + catch(const Ice::LocalException& ex) + { + if(!load) + { + throw NodeUnreachableException(); + } + } + } + + if(load) + { + try + { + proxy = db.getNode(loadNode)->loadServer(load, adapters, activationTimeout, deactivationTimeout); + proxy = ServerPrx::uncheckedCast(proxy->ice_collocationOptimization(false)); + } + catch(const NodeNotExistException& ex) + { + throw NodeUnreachableException(); + } + catch(const DeploymentException& ex) + { + Ice::Warning out(db.getTraceLevels()->logger); + out << "failed to load server on node `" << loadNode << "':\n" << ex; + throw NodeUnreachableException(); + } + catch(const Ice::LocalException& ex) + { + Ice::Warning out(db.getTraceLevels()->logger); + out << "unexpected exception while loading on node `" << loadNode << "':\n" << ex; + throw NodeUnreachableException(); + } + } + } + catch(const NodeUnreachableException& ex) + { + { + Lock sync(*this); + _synchronizing = false; + _destroy.reset(0); + _failed = true; + notifyAll(); + } + if(!load && destroy) + { + _cache.clear(destroy->name); + } + throw; + } + + { + Lock sync(*this); + _synchronizing = false; + _loaded = _load; + _load.reset(0); + _destroy.reset(0); + + // + // Set timeout on server and adapter proxies. Most of the + // calls on the proxies shouldn't block for longer than the + // node session timeout. Calls that might block for a longer + // time should set the correct timeout before invoking on the + // proxy (e.g.: server start/stop, adapter activate). + // + if(proxy) + { + int timeout = db.getNodeSessionTimeout() * 1000; // sec to ms + _proxy = ServerPrx::uncheckedCast(proxy->ice_timeout(timeout)); + _adapters.clear(); + for(StringAdapterPrxDict::const_iterator p = adapters.begin(); p != adapters.end(); ++p) + { + AdapterPrx adapter = AdapterPrx::uncheckedCast(p->second->ice_timeout(timeout)); + _adapters.insert(make_pair(p->first, adapter)); + } + activationTimeout += timeout; + deactivationTimeout += timeout; + _activationTimeout = activationTimeout; + _deactivationTimeout = deactivationTimeout; + } + else + { + _proxy = 0; + _adapters.clear(); + _activationTimeout = 0; + _deactivationTimeout = 0; + } + notifyAll(); + } + if(!load && destroy) + { + _cache.clear(destroy->name); + } + return proxy; +} + +bool +ServerEntry::isDestroyed() +{ + Lock sync(*this); + return !_loaded.get() && !_load.get(); +} + +bool +ServerEntry::canRemove() +{ + Lock sync(*this); + return !_loaded.get() && !_load.get() && !_destroy.get(); +} -- cgit v1.2.3