From 99894938dbde9a0bb10fc1998d9863cae52b8977 Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Fri, 8 Jul 2005 13:47:30 +0000 Subject: More adapter replication changes. --- cpp/src/IceGrid/Database.cpp | 831 +++++++++++-------------------------------- 1 file changed, 217 insertions(+), 614 deletions(-) (limited to 'cpp/src/IceGrid/Database.cpp') diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index ffe0ba34210..67d4c9aab9c 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -29,25 +29,9 @@ const string Database::_objectDbName = "objects"; namespace IceGrid { -struct AddComponent : std::unary_function -{ - AddComponent(Database& database, const Database::ServerEntryPtr& entry) : _database(database), _entry(entry) - { - } - - void - operator()(const ComponentDescriptorPtr& desc) - { - _database.addComponent(_entry, desc); - } - - Database& _database; - const Database::ServerEntryPtr _entry; -}; - struct AddAdapterId : std::unary_function { - AddAdapterId(set& ids) : _ids(ids) + AddAdapterId(set& ids, set& replicatedIds) : _ids(ids), _replicatedIds(replicatedIds) { } @@ -59,19 +43,39 @@ struct AddAdapterId : std::unary_function if(p->id.empty()) { DeploymentException ex; - ex.reason = "invalid descriptor: empty adapter id for adapter `" + p->name + "' in `" + desc->name - + "'"; + ex.reason = "empty adapter id for adapter `" + p->name + "' in `" + desc->name + "'"; throw ex; } - if(!_ids.insert(p->id).second) + if(!_ids.insert(p->id).second && _replicatedIds.find(p->id) == _replicatedIds.end()) { DeploymentException ex; - ex.reason = "invalid descriptor: duplicated adapter id `" + p->id + "'"; + ex.reason = "duplicated adapter id `" + p->id + "'"; throw ex; } } } + set& _ids; + const set& _replicatedIds; +}; + +struct AddReplicatedAdapterId : std::unary_function +{ + AddReplicatedAdapterId(set& ids) : _ids(ids) + { + } + + void + operator()(const ReplicatedAdapterDescriptor& desc) + { + if(!_ids.insert(desc.id).second) + { + DeploymentException ex; + ex.reason = "duplicated replicated adapter id `" + desc.id + "'"; + throw ex; + } + } + set& _ids; }; @@ -88,17 +92,10 @@ struct AddObjectId : std::unary_function { for(ObjectDescriptorSeq::const_iterator q = p->objects.begin(); q != p->objects.end(); ++q) { - if(!q->proxy) - { - DeploymentException ex; - ex.reason = "invalid descriptor: object proxy is null in `" + desc->name + "'"; - throw ex; - } - if(!_ids.insert(q->proxy->ice_getIdentity()).second) + if(!_ids.insert(q->id).second) { DeploymentException ex; - ex.reason = "invalid descriptor: duplicated object id `" + - Ice::identityToString(q->proxy->ice_getIdentity()) + "'"; + ex.reason = "duplicated object id `" + Ice::identityToString(q->id) + "'"; throw ex; } } @@ -192,6 +189,8 @@ Database::Database(const Ice::ObjectAdapterPtr& adapter, _envName(envName), _nodeSessionTimeout(nodeSessionTimeout), _traceLevels(traceLevels), + _objectCache(_communicator), + _serverCache(*this, _nodeCache, _adapterCache, _objectCache), _connection(Freeze::createConnection(adapter->getCommunicator(), envName)), _descriptors(_connection, _descriptorDbName), _objects(_connection, _objectDbName), @@ -204,10 +203,16 @@ Database::Database(const Ice::ObjectAdapterPtr& adapter, _internalAdapter->addServantLocator(new AdapterServantLocator(this), "IceGridAdapter"); // - // Cache the servers. + // Cache the servers & adapters. // for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p) { + for(ReplicatedAdapterDescriptorSeq::const_iterator r = p->second->replicatedAdapters.begin(); + r != p->second->replicatedAdapters.end(); ++r) + { + _adapterCache.get(r->id, true)->enableReplication(r->loadBalancing); + } + ServerInstanceDescriptorSeq::const_iterator q; for(q = p->second->servers.begin(); q != p->second->servers.end(); ++q) { @@ -329,9 +334,25 @@ Database::addApplicationDescriptor(ObserverSessionI* session, const ApplicationD ex.reason = "server `" + e.name + "' is already registered"; throw ex; } + + set replicatedAdapterIds; + AddReplicatedAdapterId addReplicatedAdpt(replicatedAdapterIds); + for_each(descriptor->replicatedAdapters.begin(), descriptor->replicatedAdapters.end(), addReplicatedAdpt); + try + { + ObjFunc func = objFunc(*this, &Database::checkAdapterForAddition); + for_each(replicatedAdapterIds.begin(), replicatedAdapterIds.end(), func); + } + catch(const AdapterExistsException& e) + { + DeploymentException ex; + ex.reason = "replicated adapter `" + e.id + "' is already registered"; + throw ex; + } set adapterIds; - for_each(descriptor->servers.begin(), descriptor->servers.end(), forEachComponent(AddAdapterId(adapterIds))); + AddAdapterId addAdpt(adapterIds, replicatedAdapterIds); + for_each(descriptor->servers.begin(), descriptor->servers.end(), forEachComponent(addAdpt)); try { for_each(adapterIds.begin(), adapterIds.end(), objFunc(*this, &Database::checkAdapterForAddition)); @@ -356,6 +377,15 @@ Database::addApplicationDescriptor(ObserverSessionI* session, const ApplicationD throw ex; } + // + // Register the replicated adapters. + // + for(ReplicatedAdapterDescriptorSeq::const_iterator p = descriptor->replicatedAdapters.begin(); + p != descriptor->replicatedAdapters.end(); ++p) + { + _adapterCache.get(p->id, true)->enableReplication(p->loadBalancing); + } + // // Register the application servers. // @@ -383,7 +413,7 @@ Database::addApplicationDescriptor(ObserverSessionI* session, const ApplicationD // // Synchronize the servers on the nodes. // - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&Database::ServerEntry::sync)); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } void @@ -440,7 +470,7 @@ Database::updateApplicationDescriptor(ObserverSessionI* session, const Applicati out << "updated application `" << update.name << "'"; } - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&Database::ServerEntry::sync)); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } void @@ -495,7 +525,7 @@ Database::syncApplicationDescriptor(ObserverSessionI* session, const Application out << "synced application `" << newDesc->name << "'"; } - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&Database::ServerEntry::sync)); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } void @@ -527,14 +557,42 @@ Database::syncApplicationDescriptorNoSync(const ApplicationDescriptorPtr& origDe throw ex; } + // + // Ensure that the new application replicated adapters aren't + // already registered. + // + set oldReplicatedAdapterIds; + set newReplicatedAdapterIds; + AddReplicatedAdapterId addOldReplicatedAdpt(oldReplicatedAdapterIds); + for_each(origDesc->replicatedAdapters.begin(), origDesc->replicatedAdapters.end(), addOldReplicatedAdpt); + AddReplicatedAdapterId addNewReplicatedAdpt(newReplicatedAdapterIds); + for_each(newDesc->replicatedAdapters.begin(), newDesc->replicatedAdapters.end(), addNewReplicatedAdpt); + + set addedReplicatedAdpts; + set_difference(newReplicatedAdapterIds.begin(), newReplicatedAdapterIds.end(), oldReplicatedAdapterIds.begin(), + oldReplicatedAdapterIds.end(), set_inserter(addedReplicatedAdpts)); + try + { + ObjFunc func = objFunc(*this, &Database::checkAdapterForAddition); + for_each(addedReplicatedAdpts.begin(), addedReplicatedAdpts.end(), func); + } + catch(const AdapterExistsException& e) + { + DeploymentException ex; + ex.reason = "replicated adapter `" + e.id + "' is already registered"; + throw ex; + } + // // Ensure that the new application adapters aren't already // registered. // set oldAdpts; set newAdpts; - for_each(origDesc->servers.begin(), origDesc->servers.end(), forEachComponent(AddAdapterId(oldAdpts))); - for_each(newDesc->servers.begin(), newDesc->servers.end(), forEachComponent(AddAdapterId(newAdpts))); + AddAdapterId addOldAdpt(oldAdpts, oldReplicatedAdapterIds); + for_each(origDesc->servers.begin(), origDesc->servers.end(), forEachComponent(addOldAdpt)); + AddAdapterId addNewAdpt(newAdpts, newReplicatedAdapterIds); + for_each(newDesc->servers.begin(), newDesc->servers.end(), forEachComponent(addNewAdpt)); set addedAdpts; set_difference(newAdpts.begin(), newAdpts.end(), oldAdpts.begin(), oldAdpts.end(), set_inserter(addedAdpts)); @@ -571,6 +629,21 @@ Database::syncApplicationDescriptorNoSync(const ApplicationDescriptorPtr& origDe throw ex; } + // + // Update the replicated adapters. + // + for(ReplicatedAdapterDescriptorSeq::const_iterator p = origDesc->replicatedAdapters.begin(); + p != origDesc->replicatedAdapters.end(); ++p) + { + _adapterCache.get(p->id)->disableReplication(); + } + for(ReplicatedAdapterDescriptorSeq::const_iterator p = newDesc->replicatedAdapters.begin(); + p != newDesc->replicatedAdapters.end(); ++p) + { + _adapterCache.get(p->id, true)->enableReplication(p->loadBalancing); + } + + // // Register the new servers, unregister the old ones and // update the updated ones. @@ -601,6 +674,12 @@ Database::removeApplicationDescriptor(ObserverSessionI* session, const std::stri descriptor = p->second; _descriptors.erase(p); + for(ReplicatedAdapterDescriptorSeq::const_iterator q = descriptor->replicatedAdapters.begin(); + q != descriptor->replicatedAdapters.end(); ++q) + { + _adapterCache.get(q->id)->disableReplication(); + } + set servers; for_each(descriptor->servers.begin(), descriptor->servers.end(), AddServerName(servers)); removeServers(descriptor->name, descriptor->servers, servers, entries); @@ -619,7 +698,7 @@ Database::removeApplicationDescriptor(ObserverSessionI* session, const std::stri out << "removed application `" << name << "'"; } - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&Database::ServerEntry::sync)); + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } ApplicationDescriptorPtr @@ -648,81 +727,21 @@ Database::getAllApplications(const string& expression) } void -Database::addNode(const string& name, const NodeSessionIPtr& node) +Database::addNode(const string& name, const NodeSessionIPtr& session) { - ServerEntrySeq entries; - { - Lock sync(*this); - - if(_nodes.find(name) != _nodes.end()) - { - throw NodeActiveException(); - } - - if(_traceLevels->node > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat); - out << "added node `" << name << "'"; - } - - _nodes.insert(make_pair(name, node)); - - // - // Get all the node servers and see if they need to be synced. - // - map >::const_iterator p = _serversByNode.find(name); - if(p == _serversByNode.end()) - { - return; - } - for(set::const_iterator q = p->second.begin() ; q != p->second.end(); ++q) - { - ServerEntryPtr entry = _servers[*q]; - assert(entry); - if(entry->needsSync()) - { - entries.push_back(entry); - } - } - } - - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&Database::ServerEntry::sync)); + _nodeCache.get(name, true)->setSession(session); } NodePrx Database::getNode(const string& name) const { - Lock sync(*this); - - map::const_iterator p = _nodes.find(name); - if(p == _nodes.end()) - { - if(_serversByNode.find(name) == _serversByNode.end()) - { - throw NodeNotExistException(); - } - else - { - throw NodeUnreachableException(); - } - } - return p->second->getNode(); + return _nodeCache.get(name)->getProxy(); } void Database::removeNode(const string& name) { - { - Lock sync(*this); - if(_nodes.erase(name) > 0) - { - if(_traceLevels->node > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat); - out << "removed node `" << name << "'"; - } - } - } + _nodeCache.get(name)->setSession(0); try { @@ -737,13 +756,7 @@ Database::removeNode(const string& name) Ice::StringSeq Database::getAllNodes(const string& expression) { - Lock sync(*this); - set nodes; - Ice::StringSeq r = getMatchingKeys > >(_serversByNode, expression); - nodes.insert(r.begin(), r.end()); - r = getMatchingKeys >(_nodes, expression); - nodes.insert(r.begin(), r.end()); - return Ice::StringSeq(nodes.begin(), nodes.end()); + return _nodeCache.getAll(expression); } ServerInstanceDescriptor @@ -767,16 +780,7 @@ Database::getServerDescriptor(const std::string& name) string Database::getServerApplication(const string& name) { - Lock sync(*this); - map::const_iterator p = _applicationsByServerName.find(name); - if(p == _applicationsByServerName.end()) - { - ServerNotExistException ex; - ex.name = name; - throw ex; - } - - return p->second; + return _serverCache.get(name)->getApplication(); } ServerPrx @@ -789,41 +793,19 @@ Database::getServer(const string& name) ServerPrx Database::getServerWithTimeouts(const string& name, int& activationTimeout, int& deactivationTimeout) { - ServerEntryPtr entry; - { - Lock sync(*this); - map::const_iterator p = _servers.find(name); - if(p != _servers.end()) - { - entry = p->second; - } - } - if(!entry) - { - ServerNotExistException ex; - ex.name = name; - throw ex; - } - return entry->getProxy(activationTimeout, deactivationTimeout); + return _serverCache.get(name)->getProxy(activationTimeout, deactivationTimeout); } Ice::StringSeq Database::getAllServers(const string& expression) { - Lock sync(*this); - return getMatchingKeys >(_servers, expression); + return _serverCache.getAll(expression); } Ice::StringSeq Database::getAllNodeServers(const string& node) { - Lock sync(*this); - map >::const_iterator p = _serversByNode.find(node); - if(p == _serversByNode.end()) - { - return Ice::StringSeq(); - } - return Ice::StringSeq(p->second.begin(), p->second.end()); + return _nodeCache.get(node)->getServers(); } void @@ -875,7 +857,7 @@ Database::getAdapterDirectProxy(const string& id) } AdapterPrx -Database::getAdapter(const string& id) +Database::getAdapter(const string& id, const string& serverId) { // // TODO: Perhaps we should also cache the adapter proxies here @@ -887,18 +869,12 @@ Database::getAdapter(const string& id) // server, if that's the case we get the adapter proxy from the // server. // - ServerEntryPtr entry; + try { - Lock sync(*this); - map::const_iterator p = _serversByAdapterId.find(id); - if(p != _serversByAdapterId.end()) - { - entry = p->second; - } + return _adapterCache.get(id)->getProxy(serverId); } - if(entry) + catch(const AdapterNotExistException&) { - return entry->getAdapter(id); } // @@ -925,8 +901,7 @@ Database::getAllAdapters(const string& expression) { Lock sync(*this); vector result; - vector ids; - ids = getMatchingKeys >(_serversByAdapterId, expression); + vector ids = _adapterCache.getAll(expression); result.swap(ids); ids = getMatchingKeys(_adapters, expression); result.insert(result.end(), ids.begin(), ids.end()); @@ -934,18 +909,18 @@ Database::getAllAdapters(const string& expression) } void -Database::addObjectDescriptor(const ObjectDescriptor& object) +Database::addObject(const ObjectInfo& info) { Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectDescDict objects(connection, _objectDbName); - const Ice::Identity id = object.proxy->ice_getIdentity(); + IdentityObjectInfoDict objects(connection, _objectDbName); + const Ice::Identity id = info.proxy->ice_getIdentity(); if(objects.find(id) != objects.end()) { ObjectExistsException ex; ex.id = id; throw ex; } - objects.put(make_pair(id, object)); + objects.put(make_pair(id, info)); if(_traceLevels->object > 0) { @@ -955,10 +930,10 @@ Database::addObjectDescriptor(const ObjectDescriptor& object) } void -Database::removeObjectDescriptor(const Ice::Identity& id) +Database::removeObject(const Ice::Identity& id) { Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectDescDict objects(connection, _objectDbName); + IdentityObjectInfoDict objects(connection, _objectDbName); if(objects.find(id) == objects.end()) { ObjectNotExistException ex; @@ -975,21 +950,21 @@ Database::removeObjectDescriptor(const Ice::Identity& id) } void -Database::updateObjectDescriptor(const Ice::ObjectPrx& proxy) +Database::updateObject(const Ice::ObjectPrx& proxy) { Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectDescDict objects(connection, _objectDbName); + IdentityObjectInfoDict objects(connection, _objectDbName); const Ice::Identity id = proxy->ice_getIdentity(); - IdentityObjectDescDict::iterator p = objects.find(id); + IdentityObjectInfoDict::iterator p = objects.find(id); if(p == objects.end()) { ObjectNotExistException ex; ex.id = id; throw ex; } - ObjectDescriptor desc = p->second; - desc.proxy = proxy; - p.set(desc); + ObjectInfo info = p->second; + info.proxy = proxy; + p.set(info); if(_traceLevels->object > 0) { @@ -998,19 +973,30 @@ Database::updateObjectDescriptor(const Ice::ObjectPrx& proxy) } } -ObjectDescriptor -Database::getObjectDescriptor(const Ice::Identity& id) +Ice::ObjectPrx +Database::getObjectProxy(const Ice::Identity& id, string& adapterId) { + try + { + ObjectEntryPtr object = _objectCache.get(id); + adapterId = object->getAdapterId(); + return object->getProxy(); + } + catch(ObjectNotExistException&) + { + } + Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectDescDict objects(connection, _objectDbName); - IdentityObjectDescDict::const_iterator p = objects.find(id); + IdentityObjectInfoDict objects(connection, _objectDbName); + IdentityObjectInfoDict::const_iterator p = objects.find(id); if(p == objects.end()) { ObjectNotExistException ex; ex.id = id; throw ex; } - return p->second; + adapterId = ""; + return p->second.proxy; } Ice::ObjectPrx @@ -1023,10 +1009,11 @@ Database::getObjectByType(const string& type) Ice::ObjectProxySeq Database::getObjectsWithType(const string& type) { + Ice::ObjectProxySeq proxies = _objectCache.getObjectsWithType(type); + Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectDescDict objects(connection, _objectDbName); - Ice::ObjectProxySeq proxies; - for(IdentityObjectDescDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p) + IdentityObjectInfoDict objects(connection, _objectDbName); + for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p) { proxies.push_back(p->second.proxy); } @@ -1037,27 +1024,63 @@ Database::getObjectsWithType(const string& type) return proxies; } -ObjectDescriptorSeq -Database::getAllObjectDescriptors(const string& expression) +ObjectInfo +Database::getObjectInfo(const Ice::Identity& id) +{ + try + { + ObjectEntryPtr object = _objectCache.get(id); + return object->getObjectInfo(); + } + catch(ObjectNotExistException&) + { + } + + Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + IdentityObjectInfoDict objects(connection, _objectDbName); + IdentityObjectInfoDict::const_iterator p = objects.find(id); + if(p == objects.end()) + { + ObjectNotExistException ex; + ex.id = id; + throw ex; + } + return p->second; +} + +ObjectInfoSeq +Database::getAllObjectInfos(const string& expression) { + ObjectInfoSeq infos = _objectCache.getAll(expression); + Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectDescDict objects(connection, _objectDbName); - ObjectDescriptorSeq descriptors; - for(IdentityObjectDescDict::const_iterator p = objects.begin(); p != objects.end(); ++p) + IdentityObjectInfoDict objects(connection, _objectDbName); + for(IdentityObjectInfoDict::const_iterator p = objects.begin(); p != objects.end(); ++p) { - if(expression.empty() || - IceUtil::match(Ice::identityToString(p->second.proxy->ice_getIdentity()), expression, true)) + if(expression.empty() || IceUtil::match(Ice::identityToString(p->first), expression, true)) { - descriptors.push_back(p->second); + infos.push_back(p->second); } } - return descriptors; + return infos; +} + +const TraceLevelsPtr& +Database::getTraceLevels() const +{ + return _traceLevels; +} + +int +Database::getNodeSessionTimeout() const +{ + return _nodeSessionTimeout; } void Database::checkServerForAddition(const string& name) { - if(_servers.find(name) != _servers.end()) + if(_serverCache.has(name)) { ServerExistsException ex; ex.name = name; @@ -1068,7 +1091,7 @@ Database::checkServerForAddition(const string& name) void Database::checkAdapterForAddition(const string& id) { - if(_serversByAdapterId.find(id) != _serversByAdapterId.end() || _adapters.find(id) != _adapters.end()) + if(_adapterCache.has(id) || _adapters.find(id) != _adapters.end()) { AdapterExistsException ex; ex.id = id; @@ -1079,7 +1102,7 @@ Database::checkAdapterForAddition(const string& id) void Database::checkObjectForAddition(const Ice::Identity& objectId) { - if(_objects.find(objectId) != _objects.end()) + if(_objectCache.has(objectId) || _objects.find(objectId) != _objects.end()) { ObjectExistsException ex; ex.id = objectId; @@ -1147,440 +1170,20 @@ Database::removeServers(const string& application, const ServerInstanceDescripto } } -Database::ServerEntryPtr +ServerEntryPtr Database::addServer(const string& application, const ServerInstanceDescriptor& instance) { - ServerEntryPtr entry; - map::const_iterator q = _servers.find(instance.descriptor->name); - if(q != _servers.end()) - { - entry = q->second; - entry->update(instance); - } - else - { - entry = new ServerEntry(*this, instance); - _servers.insert(make_pair(instance.descriptor->name, entry)); - } - - map >::iterator p = _serversByNode.find(instance.node); - if(p == _serversByNode.end()) - { - p = _serversByNode.insert(make_pair(instance.node, set())).first; - } - p->second.insert(p->second.begin(), instance.descriptor->name); - - _applicationsByServerName.insert(make_pair(instance.descriptor->name, application)); - - forEachComponent(AddComponent(*this, entry))(instance); - return entry; + return _serverCache.add(instance.descriptor->name, instance, application); } -Database::ServerEntryPtr +ServerEntryPtr Database::updateServer(const ServerInstanceDescriptor& instance) { - // - // Get the server entry and the current descriptor then check - // if the server descriptor really changed. - // - ServerEntryPtr entry; - map::const_iterator q = _servers.find(instance.descriptor->name); - assert(q != _servers.end()); - - entry = q->second; - ServerInstanceDescriptor old = entry->getDescriptor(); - - // - // If the node changed, move the server from the old node to the - // new one. - // - if(old.node != instance.node) - { - map >::iterator p = _serversByNode.find(old.node); - assert(p != _serversByNode.end()); - p->second.erase(instance.descriptor->name); - if(p->second.empty()) - { - _serversByNode.erase(p); - } - p = _serversByNode.find(instance.node); - if(p == _serversByNode.end()) - { - p = _serversByNode.insert(make_pair(instance.node, set())).first; - } - p->second.insert(p->second.begin(), instance.descriptor->name); - } - - // - // Remove the object adapters and objects from the old descriptor. - // - forEachComponent(objFunc(*this, &Database::removeComponent))(old); - - // - // Update the server entry. - // - entry->update(instance); - - // - // Add the new object adapters and objects. - // - forEachComponent(AddComponent(*this, entry))(instance); - return entry; + return _serverCache.update(instance); } -Database::ServerEntryPtr +ServerEntryPtr Database::removeServer(const string& application, const ServerInstanceDescriptor& instance) { - ServerEntryPtr entry; - map::iterator q = _servers.find(instance.descriptor->name); - assert(q != _servers.end()); - - map >::iterator p = _serversByNode.find(instance.node); - assert(p != _serversByNode.end()); - p->second.erase(instance.descriptor->name); - if(p->second.empty()) - { - _serversByNode.erase(p); - } - - entry = q->second; - entry->destroy(); - - _applicationsByServerName.erase(instance.descriptor->name); - - // - // Remove the object adapters and objects. - // - forEachComponent(objFunc(*this, &Database::removeComponent))(instance); - return entry; -} - -void -Database::clearServer(const std::string& name) -{ - Lock sync(*this); - map::iterator p = _servers.find(name); - if(p != _servers.end()) - { - if(p->second->canRemove()) - { - _servers.erase(p); - } - } -} - -void -Database::addComponent(const ServerEntryPtr& entry, const ComponentDescriptorPtr& component) -{ - for(AdapterDescriptorSeq::const_iterator q = component->adapters.begin() ; q != component->adapters.end(); ++q) - { - _serversByAdapterId.insert(make_pair(q->id, entry)); - for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r) - { - _objects.put(make_pair(r->proxy->ice_getIdentity(), *r)); - } - } -} - -void -Database::removeComponent(const ComponentDescriptorPtr& component) -{ - for(AdapterDescriptorSeq::const_iterator q = component->adapters.begin() ; q != component->adapters.end(); ++q) - { - _serversByAdapterId.erase(q->id); - for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r) - { - _objects.erase(r->proxy->ice_getIdentity()); - } - } -} - -Database::ServerEntry::ServerEntry(Database& database, const ServerInstanceDescriptor& descriptor) : - _database(database), - _synchronizing(false) -{ - _load.reset(new ServerInstanceDescriptor()); - *_load = descriptor; -} - -void -Database::ServerEntry::sync() -{ - map adapters; - int at, dt; - try - { - sync(adapters, at, dt); - } - catch(const NodeUnreachableException&) - { - } -} - -bool -Database::ServerEntry::needsSync() const -{ - Lock sync(*this); - return _failed; -} - -void -Database::ServerEntry::update(const ServerInstanceDescriptor& instance) -{ - Lock sync(*this); - - auto_ptr descriptor(new ServerInstanceDescriptor()); - *descriptor = instance; - - if(_loaded.get() && descriptor->node != _loaded->node) - { - assert(!_destroy.get()); - _destroy = _loaded; - } - else if(_load.get() && descriptor->node != _load->node) - { - assert(!_destroy.get()); - _destroy = _load; - } - - _load = descriptor; - _loaded.reset(0); - _proxy = 0; - _adapters.clear(); -} - -void -Database::ServerEntry::destroy() -{ - Lock sync(*this); - if(_loaded.get()) - { - assert(!_destroy.get()); - _destroy = _loaded; - } - else if(_load.get()) - { - assert(!_destroy.get()); - _destroy = _load; - } - - _load.reset(0); - _loaded.reset(0); - _proxy = 0; - _adapters.clear(); -} - -ServerInstanceDescriptor -Database::ServerEntry::getDescriptor() -{ - Lock sync(*this); - if(_proxy) - { - return *_loaded.get(); - } - else - { - return *_load.get(); - } -} - -ServerPrx -Database::ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout) -{ - ServerPrx proxy; - { - Lock sync(*this); - if(_proxy) // Synced - { - proxy = _proxy; - activationTimeout = _activationTimeout; - deactivationTimeout = _deactivationTimeout; - } - } - - if(proxy) - { - try - { - proxy->ice_ping(); - return proxy; - } - catch(const Ice::LocalException& ex) - { - } - } - - StringAdapterPrxDict adapters; - return sync(adapters, activationTimeout, deactivationTimeout); -} - -AdapterPrx -Database::ServerEntry::getAdapter(const string& id) -{ - AdapterPrx proxy; - { - Lock sync(*this); - if(_proxy) // Synced - { - proxy = _adapters[id]; - } - } - - if(proxy) - { - try - { - proxy->ice_ping(); - return proxy; - } - catch(const Ice::LocalException& ex) - { - } - } - - StringAdapterPrxDict adapters; - int activationTimeout, deactivationTimeout; - sync(adapters, activationTimeout, deactivationTimeout); - return adapters[id]; -} - -ServerPrx -Database::ServerEntry::sync(map& adapters, int& activationTimeout, int& deactivationTimeout) -{ - ServerDescriptorPtr load; - string loadNode; - ServerDescriptorPtr destroy; - string destroyNode; - { - Lock sync(*this); - while(_synchronizing) - { - wait(); - } - - if(!_load.get() && !_destroy.get()) - { - _load = _loaded; // Re-load the current server. - } - - _synchronizing = true; - _failed = false; - if(_load.get()) - { - load = _load->descriptor; - loadNode = _load->node; - } - if(_destroy.get()) - { - destroy = _destroy->descriptor; - destroyNode = _destroy->node; - } - } - - ServerPrx proxy; - try - { - if(destroy) - { - try - { - _database.getNode(destroyNode)->destroyServer(destroy->name); - } - catch(const NodeNotExistException& ex) - { - if(!load) - { - throw NodeUnreachableException(); - } - } - catch(const Ice::LocalException& ex) - { - if(!load) - { - throw NodeUnreachableException(); - } - } - } - - if(load) - { - try - { - proxy = _database.getNode(loadNode)->loadServer(load, adapters, activationTimeout, deactivationTimeout); - proxy = ServerPrx::uncheckedCast(proxy->ice_collocationOptimization(false)); - } - catch(const NodeNotExistException& ex) - { - throw NodeUnreachableException(); - } - catch(const DeploymentException& ex) - { - Ice::Warning out(_database._traceLevels->logger); - out << "failed to load server on node `" << loadNode << "':\n" << ex; - throw NodeUnreachableException(); - } - catch(const Ice::LocalException& ex) - { - Ice::Warning out(_database._traceLevels->logger); - out << "unexpected exception while loading on node `" << loadNode << "':\n" << ex; - throw NodeUnreachableException(); - } - } - } - catch(const NodeUnreachableException& ex) - { - { - Lock sync(*this); - _synchronizing = false; - _destroy.reset(0); - _failed = true; - notifyAll(); - } - if(!load && destroy) - { - _database.clearServer(destroy->name); - } - throw; - } - - { - Lock sync(*this); - _synchronizing = false; - _loaded = _load; - _load.reset(0); - _destroy.reset(0); - - // - // Set timeout on server and adapter proxies. Most of the - // calls on the proxies shouldn't block for longer than the - // node session timeout. Calls that might block for a longer - // time should set the correct timeout before invoking on the - // proxy (e.g.: server start/stop, adapter activate). - // - int timeout = _database._nodeSessionTimeout * 1000; // sec to ms - _proxy = proxy ? ServerPrx::uncheckedCast(proxy->ice_timeout(timeout)) : ServerPrx(); - _adapters.clear(); - for(StringAdapterPrxDict::const_iterator p = adapters.begin(); p != adapters.end(); ++p) - { - AdapterPrx adapter = AdapterPrx::uncheckedCast(p->second->ice_timeout(timeout)); - _adapters.insert(make_pair(p->first, adapter)); - } - activationTimeout += _database._nodeSessionTimeout; - deactivationTimeout += _database._nodeSessionTimeout; - _activationTimeout = activationTimeout; - _deactivationTimeout = deactivationTimeout; - notifyAll(); - } - if(!load && destroy) - { - _database.clearServer(destroy->name); - } - return proxy; -} - -bool -Database::ServerEntry::canRemove() -{ - Lock sync(*this); - return !_loaded.get() && !_load.get() && !_destroy.get(); + return _serverCache.remove(instance.descriptor->name); } -- cgit v1.2.3