// ********************************************************************** // // Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. // // ********************************************************************** #include #include #include #include #include #include #include #include #include #include #include #include using namespace std; using namespace IceGrid; const string Database::_descriptorDbName = "applications"; const string Database::_adapterDbName = "adapters"; const string Database::_objectDbName = "objects"; pointer_to_unary_function Database::_rand(IceUtil::random); namespace IceGrid { // // A default servant for adapter objects registered directly in the // registry database. // class AdapterI : public Adapter { public: AdapterI(const DatabasePtr& database) : _database(database) { } virtual void activate_async(const AMD_Adapter_activatePtr& cb, const Ice::Current& current) { assert(false); } virtual Ice::ObjectPrx getDirectProxy(const Ice::Current& current) const { return _database->getAdapterDirectProxy(current.id.name); } virtual void setDirectProxy(const ::Ice::ObjectPrx& proxy, const ::Ice::Current& current) { assert(false); } private: const DatabasePtr _database; }; // // A servant locator for the default servant above. // class AdapterServantLocator : public Ice::ServantLocator { public: AdapterServantLocator(const DatabasePtr& database) : _adapter(new AdapterI(database)) { } virtual Ice::ObjectPtr locate(const Ice::Current& current, Ice::LocalObjectPtr& cookie) { return _adapter; } virtual void finished(const Ice::Current&, const Ice::ObjectPtr&, const Ice::LocalObjectPtr&) { } virtual void deactivate(const std::string&) { } private: const AdapterPtr _adapter; }; struct ObjectLoadCI : binary_function&, pair&, bool> { bool operator()(const pair& lhs, const pair& rhs) { return lhs.second < rhs.second; } }; } Database::Database(const Ice::ObjectAdapterPtr& adapter, const string& envName, const string& instanceName, int nodeSessionTimeout, const TraceLevelsPtr& traceLevels) : _communicator(adapter->getCommunicator()), _internalAdapter(adapter), _envName(envName), _instanceName(instanceName), _traceLevels(traceLevels), _nodeCache(_communicator, nodeSessionTimeout), _objectCache(_communicator, _adapterCache), _serverCache(_communicator, _nodeCache, _adapterCache, _objectCache), _connection(Freeze::createConnection(adapter->getCommunicator(), envName)), _descriptors(_connection, _descriptorDbName), _objects(_connection, _objectDbName), _adapters(_connection, _adapterDbName), _lock(0), _serial(0) { // // Cache the servers & adapters. // ServerEntrySeq entries; for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p) { try { load(ApplicationHelper(_communicator, p->second), entries); } catch(const DeploymentException& ex) { Ice::Warning warn(_traceLevels->logger); warn << "invalid application `" << p->first << "':\n" << ex.reason; } } _serverCache.setTraceLevels(_traceLevels); _nodeCache.setTraceLevels(_traceLevels); _adapterCache.setTraceLevels(_traceLevels); _objectCache.setTraceLevels(_traceLevels); // // Register a default servant to manage manually registered object adapters. // // NOTE: This must be done only once we're sure this constructor // won't throw. The servant locator is holding a handle on this // object and if an exception was thrown a bogus database object // won't be referenced from the servant locator. // _internalAdapter->addServantLocator(new AdapterServantLocator(this), "IceGridAdapter"); } Database::~Database() { } void Database::destroy() { _nodeCache.destroy(); // Break cyclic reference count. } std::string Database::getInstanceName() const { return _instanceName; } void Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeObserverPrx& nodeObserver) { int serial; ApplicationDescriptorSeq applications; AdapterInfoSeq adapters; ObjectInfoSeq objects; _registryObserver = registryObserver; _nodeObserver = nodeObserver; serial = _serial; for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p) { applications.push_back(p->second); } for(StringAdapterInfoDict::const_iterator q = _adapters.begin(); q != _adapters.end(); ++q) { adapters.push_back(q->second); if(adapters.back().id.empty()) { adapters.back().id = q->first; } } for(IdentityObjectInfoDict::const_iterator r = _objects.begin(); r != _objects.end(); ++r) { objects.push_back(r->second); } // // Notify the observers. // _registryObserver->init(serial, applications, adapters, objects); } void Database::checkSessionLock(AdminSessionI* session) { if(_lock == 0 && session) { throw AccessDeniedException(); // Sessions must first acquire the lock! } else if(_lock != 0 && session != _lock) { throw AccessDeniedException(_lockUserId); // Lock held by another session. } } int Database::lock(AdminSessionI* session, const string& userId) { Lock sync(*this); if(_lock != 0 && session != _lock) { throw AccessDeniedException(_lockUserId); // Lock held by another session. } assert(_lock == 0 || _lock == session); _lock = session; _lockUserId = userId; return _serial; } void Database::unlock(AdminSessionI* session) { Lock sync(*this); if(_lock != session) { throw AccessDeniedException(); } _lock = 0; _lockUserId.clear(); } void Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& desc) { ServerEntrySeq entries; { Lock sync(*this); checkSessionLock(session); while(_updating.find(desc.name) != _updating.end()) { wait(); } if(_descriptors.find(desc.name) != _descriptors.end()) { throw DeploymentException("application `" + desc.name + "' already exists"); } ApplicationHelper helper(_communicator, desc); checkForAddition(helper); load(helper, entries); _updating.insert(desc.name); } // // Synchronize the servers on the nodes. If a server couldn't be // deployed we unload the application and throw. // try { for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } catch(const DeploymentException& ex) { { Lock sync(*this); entries.clear(); unload(ApplicationHelper(_communicator, desc), entries); _updating.erase(desc.name); notifyAll(); } try { for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } catch(const DeploymentException& ex) { // TODO: warning? } throw ex; } // // Save the application descriptor. // int serial; { Lock sync(*this); _descriptors.put(StringApplicationDescriptorDict::value_type(desc.name, desc)); serial = ++_serial; _updating.erase(desc.name); notifyAll(); } // // Notify the observers. // _registryObserver->applicationAdded(serial, desc); if(_traceLevels->application > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); out << "added application `" << desc.name << "'"; } } void Database::updateApplicationDescriptor(AdminSessionI* session, const ApplicationUpdateDescriptor& update) { ServerEntrySeq entries; ApplicationDescriptor oldDesc; ApplicationDescriptor newDesc; { Lock sync(*this); checkSessionLock(session); while(_updating.find(update.name) != _updating.end()) { wait(); } StringApplicationDescriptorDict::const_iterator p = _descriptors.find(update.name); if(p == _descriptors.end()) { throw ApplicationNotExistException(update.name); } ApplicationHelper previous(_communicator, p->second); ApplicationHelper helper(_communicator, previous.update(update)); checkForUpdate(previous, helper); reload(previous, helper, entries); oldDesc = previous.getDefinition(); newDesc = helper.getDefinition(); _updating.insert(update.name); } finishUpdate(entries, update, oldDesc, newDesc); } void Database::syncApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& newDesc) { ServerEntrySeq entries; ApplicationUpdateDescriptor update; ApplicationDescriptor oldDesc; { Lock sync(*this); checkSessionLock(session); while(_updating.find(update.name) != _updating.end()) { wait(); } StringApplicationDescriptorDict::const_iterator p = _descriptors.find(newDesc.name); if(p == _descriptors.end()) { throw ApplicationNotExistException(newDesc.name); } ApplicationHelper previous(_communicator, p->second); ApplicationHelper helper(_communicator, newDesc); update = helper.diff(previous); checkForUpdate(previous, helper); reload(previous, helper, entries); oldDesc = previous.getDefinition(); _updating.insert(update.name); } finishUpdate(entries, update, oldDesc, newDesc); } void Database::instantiateServer(AdminSessionI* session, const string& application, const string& node, const ServerInstanceDescriptor& instance) { ServerEntrySeq entries; ApplicationUpdateDescriptor update; ApplicationDescriptor oldDesc; ApplicationDescriptor newDesc; { Lock sync(*this); checkSessionLock(session); while(_updating.find(update.name) != _updating.end()) { wait(); } StringApplicationDescriptorDict::const_iterator p = _descriptors.find(application); if(p == _descriptors.end()) { throw ApplicationNotExistException(application); } ApplicationHelper previous(_communicator, p->second); ApplicationHelper helper(_communicator, previous.instantiateServer(node, instance)); update = helper.diff(previous); checkForUpdate(previous, helper); reload(previous, helper, entries); oldDesc = previous.getDefinition(); newDesc = helper.getDefinition(); _updating.insert(update.name); } finishUpdate(entries, update, oldDesc, newDesc); } void Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& name) { ServerEntrySeq entries; int serial; { Lock sync(*this); checkSessionLock(session); while(_updating.find(name) != _updating.end()) { wait(); } StringApplicationDescriptorDict::iterator p = _descriptors.find(name); if(p == _descriptors.end()) { throw ApplicationNotExistException(name); } try { ApplicationHelper helper(_communicator, p->second); unload(helper, entries); } catch(const DeploymentException&) { // // For some reasons the application became invalid. If // it's invalid, it's most likely not loaded either. So we // ignore the error and erase the descriptor. // } _descriptors.erase(p); serial = ++_serial; } // // Notify the observers // _registryObserver->applicationRemoved(serial, name); if(_traceLevels->application > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); out << "removed application `" << name << "'"; } for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } ApplicationDescriptor Database::getApplicationDescriptor(const std::string& name) { Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); StringApplicationDescriptorDict descriptors(connection, _descriptorDbName); StringApplicationDescriptorDict::const_iterator p = descriptors.find(name); if(p == descriptors.end()) { throw ApplicationNotExistException(name); } return p->second; } Ice::StringSeq Database::getAllApplications(const string& expression) { Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); StringApplicationDescriptorDict descriptors(connection, _descriptorDbName); return getMatchingKeys(descriptors, expression); } void Database::addNode(const string& name, const NodeSessionIPtr& session) { _nodeCache.get(name, true)->setSession(session); } NodePrx Database::getNode(const string& name) const { return _nodeCache.get(name)->getProxy(); } NodeInfo Database::getNodeInfo(const string& name) const { return _nodeCache.get(name)->getInfo(); } void Database::removeNode(const string& name) { // // We must notify the observer first (there's an assert in the // observer to ensure that only nodes which are up are teared // down). // _nodeObserver->nodeDown(name); _nodeCache.get(name)->setSession(0); } Ice::StringSeq Database::getAllNodes(const string& expression) { return _nodeCache.getAll(expression); } ServerInfo Database::getServerInfo(const std::string& id, bool resolve) { return _serverCache.get(id)->getServerInfo(resolve); } ServerPrx Database::getServer(const string& id) { int activationTimeout, deactivationTimeout; string node; return getServerWithTimeouts(id, activationTimeout, deactivationTimeout, node); } ServerPrx Database::getServerWithTimeouts(const string& id, int& activationTimeout, int& deactivationTimeout, string& node) { return _serverCache.get(id)->getProxy(activationTimeout, deactivationTimeout, node); } Ice::StringSeq Database::getAllServers(const string& expression) { return _serverCache.getAll(expression); } Ice::StringSeq Database::getAllNodeServers(const string& node) { return _nodeCache.get(node)->getServers(); } bool Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy) { AdapterInfo info; int serial; bool updated = false; { Lock sync(*this); if(_adapterCache.has(adapterId)) { return false; } StringAdapterInfoDict::iterator p = _adapters.find(adapterId); if(proxy) { if(p != _adapters.end()) { info = p->second; info.proxy = proxy; info.replicaGroupId = replicaGroupId; p.set(info); updated = true; } else { info.id = adapterId; info.proxy = proxy; info.replicaGroupId = replicaGroupId; _adapters.put(StringAdapterInfoDict::value_type(adapterId, info)); } } else { if(p == _adapters.end()) { return true; } _adapters.erase(p); } serial = ++_serial; } if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << (proxy ? (updated ? "updated" : "added") : "removed") << " adapter `" << adapterId << "'"; if(!replicaGroupId.empty()) { out << " with replica group `" << replicaGroupId << "'"; } } if(proxy) { if(updated) { _registryObserver->adapterUpdated(serial, info); } else { _registryObserver->adapterAdded(serial, info); } } else { _registryObserver->adapterRemoved(serial, adapterId); } return true; } Ice::ObjectPrx Database::getAdapterDirectProxy(const string& adapterId) { Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); StringAdapterInfoDict adapters(connection, _adapterDbName); StringAdapterInfoDict::const_iterator p = adapters.find(adapterId); if(p != adapters.end()) { return p->second.proxy; } return 0; } void Database::removeAdapter(const string& adapterId) { AdapterInfoSeq infos; int serial; { Lock sync(*this); if(_adapterCache.has(adapterId)) { AdapterEntryPtr adpt = _adapterCache.get(adapterId); DeploymentException ex; ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n"; ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'"; throw ex; } Freeze::TransactionHolder txHolder(_connection); StringAdapterInfoDict::iterator p = _adapters.find(adapterId); if(p != _adapters.end()) { _adapters.erase(p); } else { p = _adapters.findByReplicaGroupId(adapterId, true); if(p == _adapters.end()) { throw AdapterNotExistException(adapterId); } while(p != _adapters.end()) { AdapterInfo info = p->second; info.replicaGroupId = ""; infos.push_back(info); _adapters.put(StringAdapterInfoDict::value_type(p->first, info)); ++p; } } txHolder.commit(); if(infos.empty()) { serial = ++_serial; } else { serial = _serial; _serial += infos.size(); } } if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'"; } if(infos.empty()) { _registryObserver->adapterRemoved(serial, adapterId); } else { for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) { _registryObserver->adapterUpdated(++serial, *p); } } } AdapterPrx Database::getAdapter(const string& id, const string& replicaGroupId) { return _adapterCache.getServerAdapter(id)->getProxy(replicaGroupId); } vector > Database::getAdapters(const string& id, int& endpointCount, bool& replicaGroup) { // // First we check if the given adapter id is associated to a // server, if that's the case we get the adapter proxy from the // server. // try { return _adapterCache.get(id)->getProxies(endpointCount, replicaGroup); } catch(AdapterNotExistException&) { } // // Otherwise, we check the adapter endpoint table -- if there's an // entry the adapter is managed by the registry itself. // Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); StringAdapterInfoDict adapters(connection, _adapterDbName); StringAdapterInfoDict::const_iterator p = adapters.find(id); if(p != adapters.end()) { vector > adpts; Ice::Identity identity; identity.category = "IceGridAdapter"; identity.name = id; Ice::ObjectPrx adpt = _internalAdapter->createDirectProxy(identity); adpts.push_back(make_pair(id, AdapterPrx::uncheckedCast(adpt))); replicaGroup = false; endpointCount = 1; return adpts; } // // If it's not a regular object adapter, perhaps it's a replica // group... // p = adapters.findByReplicaGroupId(id, true); if(p != adapters.end()) { vector > adpts; while(p != adapters.end()) { Ice::Identity identity; identity.category = "IceGridAdapter"; identity.name = p->first; AdapterPrx adpt = AdapterPrx::uncheckedCast(_internalAdapter->createDirectProxy(identity)); adpts.push_back(make_pair(p->first, adpt)); ++p; } random_shuffle(adpts.begin(), adpts.end(), _rand); replicaGroup = true; endpointCount = static_cast(adpts.size()); return adpts; } throw AdapterNotExistException(id); } AdapterInfoSeq Database::getAdapterInfo(const string& id) { // // First we check if the given adapter id is associated to a // server, if that's the case we get the adapter proxy from the // server. // try { return _adapterCache.get(id)->getAdapterInfo(); } catch(AdapterNotExistException&) { } // // Otherwise, we check the adapter endpoint table -- if there's an // entry the adapter is managed by the registry itself. // Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); StringAdapterInfoDict adapters(connection, _adapterDbName); StringAdapterInfoDict::const_iterator p = adapters.find(id); if(p != adapters.end()) { AdapterInfoSeq infos; infos.push_back(p->second); return infos; } // // If it's not a regular object adapter, perhaps it's a replica // group... // p = adapters.findByReplicaGroupId(id, true); if(p != adapters.end()) { AdapterInfoSeq infos; while(p != adapters.end()) { infos.push_back(p->second); ++p; } return infos; } throw AdapterNotExistException(id); } Ice::StringSeq Database::getAllAdapters(const string& expression) { Lock sync(*this); vector result; vector ids = _adapterCache.getAll(expression); result.swap(ids); set groups; for(StringAdapterInfoDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) { if(expression.empty() || IceUtil::match(p->first, expression, true)) { result.push_back(p->first); } string replicaGroupId = p->second.replicaGroupId; if(!replicaGroupId.empty() && (expression.empty() || IceUtil::match(replicaGroupId, expression, true))) { groups.insert(replicaGroupId); } } // // COMPILERFIX: We're not using result.insert() here, this doesn't compile on Sun. // //result.insert(result.end(), groups.begin(), groups.end()) for(set::const_iterator q = groups.begin(); q != groups.end(); ++q) { result.push_back(*q); } return result; } void Database::addObject(const ObjectInfo& info) { int serial; const Ice::Identity id = info.proxy->ice_getIdentity(); { Lock sync(*this); if(_objectCache.has(id)) { throw ObjectExistsException(id); } if(_objects.find(id) != _objects.end()) { throw ObjectExistsException(id); } _objects.put(IdentityObjectInfoDict::value_type(id, info)); serial = ++_serial; } // // Notify the observers. // _registryObserver->objectAdded(serial, info); if(_traceLevels->object > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); out << "added object `" << _communicator->identityToString(id) << "'"; } } void Database::removeObject(const Ice::Identity& id) { int serial; { Lock sync(*this); if(_objectCache.has(id)) { DeploymentException ex; ex.reason = "removing object `" + _communicator->identityToString(id) + "' is not allowed:\n"; ex.reason += "the object was added with the application descriptor `"; ex.reason += _objectCache.get(id)->getApplication(); ex.reason += "'"; throw ex; } IdentityObjectInfoDict::iterator p = _objects.find(id); if(p == _objects.end()) { ObjectNotRegisteredException ex; ex.id = id; throw ex; } _objects.erase(p); serial = ++_serial; } // // Notify the observers. // _registryObserver->objectRemoved(serial, id); if(_traceLevels->object > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); out << "removed object `" << _communicator->identityToString(id) << "'"; } } void Database::updateObject(const Ice::ObjectPrx& proxy) { const Ice::Identity id = proxy->ice_getIdentity(); int serial; ObjectInfo info; { Lock sync(*this); if(_objectCache.has(id)) { DeploymentException ex; ex.reason = "updating object `" + _communicator->identityToString(id) + "' is not allowed:\n"; ex.reason += "the object was added with the application descriptor `"; ex.reason += _objectCache.get(id)->getApplication(); ex.reason += "'"; throw ex; } IdentityObjectInfoDict::iterator p = _objects.find(id); if(p == _objects.end()) { ObjectNotRegisteredException ex; ex.id = id; throw ex; } info = p->second; info.proxy = proxy; p.set(info); serial = ++_serial; } // // Notify the observers. // _registryObserver->objectUpdated(serial, info); if(_traceLevels->object > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); out << "updated object `" << _communicator->identityToString(id) << "'"; } } void Database::allocateObject(const Ice::Identity& id, const ObjectAllocationRequestPtr& request) { _objectCache.get(id)->allocate(request); } void Database::allocateObjectByType(const string& type, const ObjectAllocationRequestPtr& request) { _objectCache.allocateByType(type, request); } void Database::releaseObject(const Ice::Identity& id, const SessionIPtr& session) { _objectCache.get(id)->release(session); } Ice::ObjectPrx Database::getObjectProxy(const Ice::Identity& id) { try { // // Only return proxies for non allocatable objects. // return _objectCache.get(id)->getProxy(); } catch(ObjectNotRegisteredException&) { } Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); IdentityObjectInfoDict objects(connection, _objectDbName); IdentityObjectInfoDict::const_iterator p = objects.find(id); if(p == objects.end()) { ObjectNotRegisteredException ex; ex.id = id; throw ex; } return p->second.proxy; } Ice::ObjectPrx Database::getObjectByType(const string& type) { Ice::ObjectProxySeq objs = getObjectsByType(type); return objs[IceUtil::random(objs.size())]; } Ice::ObjectPrx Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample) { Ice::ObjectProxySeq objs = getObjectsByType(type); random_shuffle(objs.begin(), objs.end(), _rand); vector > objectsWithLoad; objectsWithLoad.reserve(objs.size()); for(Ice::ObjectProxySeq::const_iterator p = objs.begin(); p != objs.end(); ++p) { float load = 1.0f; if(!(*p)->ice_getAdapterId().empty()) { try { load = _adapterCache.get((*p)->ice_getAdapterId())->getLeastLoadedNodeLoad(sample); } catch(const AdapterNotExistException&) { } } objectsWithLoad.push_back(make_pair(*p, load)); } return min_element(objectsWithLoad.begin(), objectsWithLoad.end(), ObjectLoadCI())->first; } Ice::ObjectProxySeq Database::getObjectsByType(const string& type) { Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type); Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); IdentityObjectInfoDict objects(connection, _objectDbName); for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p) { proxies.push_back(p->second.proxy); } if(proxies.empty()) { throw ObjectNotRegisteredException(); } return proxies; } ObjectInfo Database::getObjectInfo(const Ice::Identity& id) { try { ObjectEntryPtr object = _objectCache.get(id); return object->getObjectInfo(); } catch(ObjectNotRegisteredException&) { } Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); IdentityObjectInfoDict objects(connection, _objectDbName); IdentityObjectInfoDict::const_iterator p = objects.find(id); if(p == objects.end()) { throw ObjectNotRegisteredException(id); } return p->second; } ObjectInfoSeq Database::getAllObjectInfos(const string& expression) { ObjectInfoSeq infos = _objectCache.getAll(expression); Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); IdentityObjectInfoDict objects(connection, _objectDbName); for(IdentityObjectInfoDict::const_iterator p = objects.begin(); p != objects.end(); ++p) { if(expression.empty() || IceUtil::match(_communicator->identityToString(p->first), expression, true)) { infos.push_back(p->second); } } return infos; } ObjectInfoSeq Database::getObjectInfosByType(const string& type) { ObjectInfoSeq infos = _objectCache.getAllByType(type); Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); IdentityObjectInfoDict objects(connection, _objectDbName); for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p) { infos.push_back(p->second); } return infos; } void Database::checkForAddition(const ApplicationHelper& app) { set serverIds; set adapterIds; set objectIds; app.getIds(serverIds, adapterIds, objectIds); for_each(serverIds.begin(), serverIds.end(), objFunc(*this, &Database::checkServerForAddition)); for_each(adapterIds.begin(), adapterIds.end(), objFunc(*this, &Database::checkAdapterForAddition)); for_each(objectIds.begin(), objectIds.end(), objFunc(*this, &Database::checkObjectForAddition)); } void Database::checkForUpdate(const ApplicationHelper& origApp, const ApplicationHelper& newApp) { set oldSvrs, newSvrs; set oldAdpts, newAdpts; set oldObjs, newObjs; origApp.getIds(oldSvrs, oldAdpts, oldObjs); newApp.getIds(newSvrs, newAdpts, newObjs); Ice::StringSeq addedSvrs; set_difference(newSvrs.begin(), newSvrs.end(), oldSvrs.begin(), oldSvrs.end(), back_inserter(addedSvrs)); for_each(addedSvrs.begin(), addedSvrs.end(), objFunc(*this, &Database::checkServerForAddition)); Ice::StringSeq addedAdpts; set_difference(newAdpts.begin(), newAdpts.end(), oldAdpts.begin(), oldAdpts.end(), set_inserter(addedAdpts)); for_each(addedAdpts.begin(), addedAdpts.end(), objFunc(*this, &Database::checkAdapterForAddition)); vector addedObjs; set_difference(newObjs.begin(), newObjs.end(), oldObjs.begin(), oldObjs.end(), set_inserter(addedObjs)); for_each(addedObjs.begin(), addedObjs.end(), objFunc(*this, &Database::checkObjectForAddition)); } void Database::checkServerForAddition(const string& id) { if(_serverCache.has(id)) { DeploymentException ex; ex.reason = "server `" + id + "' is already registered"; throw ex; } } void Database::checkAdapterForAddition(const string& id) { if(_adapterCache.has(id) || _adapters.find(id) != _adapters.end() || _adapters.findByReplicaGroupId(id) != _adapters.end()) { DeploymentException ex; ex.reason = "adapter `" + id + "' is already registered"; throw ex; } } void Database::checkObjectForAddition(const Ice::Identity& objectId) { if(_objectCache.has(objectId) || _objects.find(objectId) != _objects.end()) { DeploymentException ex; ex.reason = "object `" + _communicator->identityToString(objectId) + "' is already registered"; throw ex; } } void Database::load(const ApplicationHelper& app, ServerEntrySeq& entries) { const NodeDescriptorDict& nodes = app.getInstance().nodes; const string application = app.getInstance().name; for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n) { _nodeCache.get(n->first, true)->addDescriptor(application, n->second); } const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups; for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r) { assert(!r->id.empty()); _adapterCache.addReplicaGroup(r->id, application, r->loadBalancing); for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) { ObjectInfo info; info.type = o->type; info.proxy = _communicator->stringToProxy(_communicator->identityToString(o->id) + "@" + r->id); _objectCache.add(info, application, false, 0); // Not allocatable } } map servers = app.getServerInfos(); for(map::const_iterator p = servers.begin(); p != servers.end(); ++p) { entries.push_back(_serverCache.add(p->second)); } } void Database::unload(const ApplicationHelper& app, ServerEntrySeq& entries) { map servers = app.getServerInfos(); for(map::const_iterator p = servers.begin(); p != servers.end(); ++p) { entries.push_back(_serverCache.remove(p->first)); } const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups; for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r) { for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) { _objectCache.remove(o->id); } _adapterCache.removeReplicaGroup(r->id); } const NodeDescriptorDict& nodes = app.getInstance().nodes; const string application = app.getInstance().name; for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n) { _nodeCache.get(n->first)->removeDescriptor(application); } } void Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newApp, ServerEntrySeq& entries) { const string application = oldApp.getInstance().name; // // Remove destroyed servers. // map oldServers = oldApp.getServerInfos(); map newServers = newApp.getServerInfos(); vector load; map::const_iterator p; for(p = newServers.begin(); p != newServers.end(); ++p) { map::const_iterator q = oldServers.find(p->first); if(q == oldServers.end()) { load.push_back(p->second); } else if(p->second.node != q->second.node || !descriptorEqual(_communicator, p->second.descriptor, q->second.descriptor)) { _serverCache.remove(p->first, false); // Don't destroy the server if it was updated. load.push_back(p->second); } } for(p = oldServers.begin(); p != oldServers.end(); ++p) { map::const_iterator q = newServers.find(p->first); if(q == newServers.end()) { entries.push_back(_serverCache.remove(p->first)); } } // // Remove destroyed replica groups. // const ReplicaGroupDescriptorSeq& oldAdpts = oldApp.getInstance().replicaGroups; const ReplicaGroupDescriptorSeq& newAdpts = newApp.getInstance().replicaGroups; ReplicaGroupDescriptorSeq::const_iterator r; for(r = oldAdpts.begin(); r != oldAdpts.end(); ++r) { ReplicaGroupDescriptorSeq::const_iterator t; for(t = newAdpts.begin(); t != newAdpts.end(); ++t) { if(t->id == r->id) { break; } } for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) { _objectCache.remove(o->id); } if(t == newAdpts.end()) { _adapterCache.removeReplicaGroup(r->id); } } // // Remove all the node descriptors. // const NodeDescriptorDict& oldNodes = oldApp.getInstance().nodes; NodeDescriptorDict::const_iterator n; for(n = oldNodes.begin(); n != oldNodes.end(); ++n) { _nodeCache.get(n->first)->removeDescriptor(application); } // // Add back node descriptors. // const NodeDescriptorDict& newNodes = newApp.getInstance().nodes; for(n = newNodes.begin(); n != newNodes.end(); ++n) { _nodeCache.get(n->first, true)->addDescriptor(application, n->second); } // // Add back replica groups. // for(r = newAdpts.begin(); r != newAdpts.end(); ++r) { try { ReplicaGroupEntryPtr entry = _adapterCache.getReplicaGroup(r->id); entry->update(r->loadBalancing); } catch(const AdapterNotExistException&) { _adapterCache.addReplicaGroup(r->id, application, r->loadBalancing); } for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o) { ObjectInfo info; info.type = o->type; info.proxy = _communicator->stringToProxy(_communicator->identityToString(o->id) + "@" + r->id); _objectCache.add(info, application, false, 0); // Not allocatable } } // // Add back servers. // for(vector::const_iterator q = load.begin(); q != load.end(); ++q) { entries.push_back(_serverCache.add(*q)); } } void Database::finishUpdate(ServerEntrySeq& entries, const ApplicationUpdateDescriptor& update, const ApplicationDescriptor& oldDesc, const ApplicationDescriptor& newDesc) { // // Synchronize the servers on the nodes. If a server couldn't be // deployed we unload the application and throw. // try { for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } catch(const DeploymentException& ex) { { Lock sync(*this); entries.clear(); ApplicationHelper previous(_communicator, newDesc); ApplicationHelper helper(_communicator, oldDesc); reload(previous, helper, entries); _updating.erase(newDesc.name); notifyAll(); } try { for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } catch(const DeploymentException& ex) { // TODO: warning? } throw ex; } // // Save the application descriptor. // int serial; { Lock sync(*this); _descriptors.put(StringApplicationDescriptorDict::value_type(update.name, newDesc)); serial = ++_serial; _updating.erase(update.name); notifyAll(); } // // Notify the observers. // _registryObserver->applicationUpdated(serial, update); if(_traceLevels->application > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); out << "updated application `" << update.name << "'"; } }