diff options
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 507 |
1 files changed, 267 insertions, 240 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 77b74d51b5d..d0f64b6e7b8 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -134,14 +134,14 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _objectCache(_communicator), _allocatableObjectCache(_communicator), _serverCache(_communicator, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache, sessionTimeout), - _clientProxy(_communicator->stringToProxy("dummy")), - _serverProxy(_communicator->stringToProxy("dummy")), _connection(Freeze::createConnection(registryAdapter->getCommunicator(), _envName)), _applications(_connection, _applicationDbName), _objects(_connection, _objectDbName), _adapters(_connection, _adapterDbName), _lock(0), - _serial(-1) + _applicationSerial(0), + _adapterSerial(0), + _objectSerial(0) { ServerEntrySeq entries; for(StringApplicationInfoDict::const_iterator p = _applications.begin(); p != _applications.end(); ++p) @@ -164,6 +164,12 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _objectCache.setTraceLevels(_traceLevels); _allocatableObjectCache.setTraceLevels(_traceLevels); + _nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter); + _registryObserverTopic = new RegistryObserverTopic(_topicManager); + _applicationObserverTopic = new ApplicationObserverTopic(_topicManager, _applications); + _adapterObserverTopic = new AdapterObserverTopic(_topicManager, _adapters); + _objectObserverTopic = new ObjectObserverTopic(_topicManager, _objects); + // // Register a default servant to manage manually registered object adapters. // @@ -197,26 +203,36 @@ Database::getInstanceName() const return _instanceName; } -RegistryObserverTopicPtr -Database::getRegistryObserverTopic() const -{ - Lock sync(*this); - return _registryObserverTopic; -} - -NodeObserverTopicPtr -Database::getNodeObserverTopic() const -{ - Lock sync(*this); - return _nodeObserverTopic; -} - void -Database::clearTopics() -{ - Lock sync(*this); - _registryObserverTopic = 0; - _nodeObserverTopic = 0; +Database::destroyTopics() +{ + _registryObserverTopic->destroy(); + _nodeObserverTopic->destroy(); + _applicationObserverTopic->destroy(); + _adapterObserverTopic->destroy(); + _objectObserverTopic->destroy(); +} + +ObserverTopicPtr +Database::getObserverTopic(TopicName name) const +{ + switch(name) + { + case RegistryObserverTopicName: + return _registryObserverTopic; + case NodeObserverTopicName: + return _nodeObserverTopic; + case ApplicationObserverTopicName: + return _applicationObserverTopic; + case AdapterObserverTopicName: + return _adapterObserverTopic; + case ObjectObserverTopicName: + return _objectObserverTopic; + default: + assert(false); + break; + } + return 0; // Keep the compiler happy. } void @@ -246,7 +262,7 @@ Database::lock(AdminSessionI* session, const string& userId) _lock = session; _lockUserId = userId; - return _serial; + return _applicationSerial; } void @@ -263,138 +279,99 @@ Database::unlock(AdminSessionI* session) } void -Database::initMaster() +Database::syncApplications(const ApplicationInfoSeq& applications) { - Lock sync(*this); - - _nodeObserverTopic = new NodeObserverTopic(_internalAdapter, _topicManager); - _registryObserverTopic = new RegistryObserverTopic(_internalAdapter, _topicManager); - _serial = 0; - - ApplicationInfoSeq applications; - for(StringApplicationInfoDict::const_iterator p = _applications.begin(); p != _applications.end(); ++p) - { - applications.push_back(p->second); - } - AdapterInfoSeq adapters; - for(StringAdapterInfoDict::const_iterator q = _adapters.begin(); q != _adapters.end(); ++q) - { - adapters.push_back(q->second); - } - ObjectInfoSeq objects; - for(IdentityObjectInfoDict::const_iterator r = _objects.begin(); r != _objects.end(); ++r) + int serial; { - objects.push_back(r->second); - } - _registryObserverTopic->getPublisher()->init(_serial, applications, adapters, objects); -} - -void -Database::initReplica(int masterSerial, - const ApplicationInfoSeq& applications, - const AdapterInfoSeq& adapters, - const ObjectInfoSeq& objects) -{ - Lock sync(*this); - - _serial = masterSerial; + Lock sync(*this); - ServerEntrySeq entries; - set<string> names; - for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p) - { - try + ServerEntrySeq entries; + set<string> names; + for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p) { - StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name); - if(s != _applications.end()) + try { - ApplicationHelper previous(_communicator, s->second.descriptor); - ApplicationHelper helper(_communicator, p->descriptor); - reload(previous, helper, entries, p->uuid, p->revision); + StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name); + if(s != _applications.end()) + { + ApplicationHelper previous(_communicator, s->second.descriptor); + ApplicationHelper helper(_communicator, p->descriptor); + reload(previous, helper, entries, p->uuid, p->revision); + } + else + { + load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision); + } } - else + catch(const DeploymentException& ex) { - load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision); + Ice::Warning warn(_traceLevels->logger); + warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason; } + _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p)); + names.insert(p->descriptor.name); } - catch(const DeploymentException& ex) + StringApplicationInfoDict::iterator s = _applications.begin(); + while(s != _applications.end()) { - Ice::Warning warn(_traceLevels->logger); - warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason; + if(names.find(s->first) == names.end()) + { + unload(ApplicationHelper(_communicator, s->second.descriptor), entries); + _applications.erase(s++); + } + else + { + ++s; + } } - _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p)); - names.insert(p->descriptor.name); + serial = ++_applicationSerial; } - StringApplicationInfoDict::iterator s = _applications.begin(); - while(s != _applications.end()) + + _applicationObserverTopic->applicationInit(serial, applications); +} + +void +Database::syncAdapters(const AdapterInfoSeq& adapters) +{ + int serial; { - if(names.find(s->first) == names.end()) - { - unload(ApplicationHelper(_communicator, s->second.descriptor), entries); - _applications.erase(s++); - } - else + Lock sync(*this); + _adapters.clear(); + for(AdapterInfoSeq::const_iterator r = adapters.end(); r != adapters.end(); ++r) { - ++s; + _adapters.put(StringAdapterInfoDict::value_type(r->id, *r)); } + serial = ++_adapterSerial; } - _objects.clear(); - for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) - { - _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); - } - - _adapters.clear(); - for(AdapterInfoSeq::const_iterator r = adapters.end(); r != adapters.end(); ++r) - { - _adapters.put(StringAdapterInfoDict::value_type(r->id, *r)); - } -} -void -Database::setClientProxy(const Ice::ObjectPrx& proxy) -{ - Lock sync(*this); - _clientProxy = proxy; + _adapterObserverTopic->adapterInit(serial, adapters); } void -Database::setServerProxy(const Ice::ObjectPrx& proxy) +Database::syncObjects(const ObjectInfoSeq& objects) { - Lock sync(*this); - _serverProxy = proxy; -} - -Ice::ObjectPrx -Database::getClientProxy() const -{ - Lock sync(*this); - return _clientProxy; + int serial; + { + Lock sync(*this); + _objects.clear(); + for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) + { + _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); + } + serial = ++_objectSerial; + } + _objectObserverTopic->objectInit(serial, objects); } Ice::ObjectPrx -Database::getServerProxy() const -{ - Lock sync(*this); - return _serverProxy; -} - -void -Database::updateReplicatedWellKnownObjects() +Database::getReplicatedEndpoints(const string& name, const Ice::ObjectPrx& proxy) { - Ice::ObjectPrx clientProxy = _replicaCache.getClientProxy(getClientProxy()); - Ice::Identity id; - id.category = _instanceName; - id.name = "Query"; - ObjectInfo info; - info.type = Query::ice_staticId(); - info.proxy = clientProxy->ice_identity(id); - addObject(info, true); + return _replicaCache.getEndpoints(name, proxy); } void -Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& desc, int masterSerial) +Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& desc) { ServerEntrySeq entries; string uuid = IceUtil::generateUUID(); @@ -462,7 +439,7 @@ Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDesc _applications.put(StringApplicationInfoDict::value_type(desc.name, info)); - serial = ++_serial; + serial = ++_applicationSerial; _updating.erase(desc.name); notifyAll(); } @@ -470,10 +447,7 @@ Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDesc // // Notify the observers. // - if(_registryObserverTopic) - { - _registryObserverTopic->getPublisher()->applicationAdded(serial, info); - } + _applicationObserverTopic->applicationAdded(serial, info); if(_traceLevels->application > 0) { @@ -483,9 +457,7 @@ Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDesc } void -Database::updateApplicationDescriptor(AdminSessionI* session, - const ApplicationUpdateDescriptor& update, - int masterSerial) +Database::updateApplicationDescriptor(AdminSessionI* session, const ApplicationUpdateDescriptor& update) { ServerEntrySeq entries; ApplicationInfo oldApp; @@ -597,7 +569,7 @@ Database::instantiateServer(AdminSessionI* session, } void -Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& name, int masterSerial) +Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& name) { ServerEntrySeq entries; int serial; @@ -632,16 +604,13 @@ Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& _applications.erase(p); - serial = ++_serial; + serial = ++_applicationSerial; } // // Notify the observers // - if(_registryObserverTopic) - { - _registryObserverTopic->getPublisher()->applicationRemoved(serial, name); - } + _applicationObserverTopic->applicationRemoved(serial, name); if(_traceLevels->application > 0) { @@ -680,10 +649,18 @@ Database::addNode(const string& name, const NodeSessionIPtr& session) { _nodeCache.get(name, true)->setSession(session); - ObjectInfo info; - info.type = Node::ice_staticId(); - info.proxy = session->getNode(); - addObject(info, true); +// // +// // Only the master adds the node well-known proxy to its +// // database. The well-known proxy will be transmitted to the +// // replicas through the replication of the database. +// // +// if(_master) +// { + ObjectInfo info; + info.type = Node::ice_staticId(); + info.proxy = session->getNode(); + addObject(info, true); +// } } NodePrx @@ -716,65 +693,53 @@ Database::removeNode(const string& name, const NodeSessionIPtr& session, bool sh // observer to ensure that only nodes which are up are teared // down). // - NodeObserverTopicPtr topic = getNodeObserverTopic(); - if(topic) - { - topic->getPublisher()->nodeDown(name); - } + _nodeObserverTopic->nodeDown(name); _nodeCache.get(name)->setSession(0); } +Ice::StringSeq +Database::getAllNodes(const string& expression) +{ + return _nodeCache.getAll(expression); +} + void Database::addReplica(const string& name, const ReplicaSessionIPtr& session) { _replicaCache.add(name, session); - // - // Only the master adds the node well-known proxy to its - // database. The well-known proxy will be transmitted to the - // replicas through the replication of the database. - // - if(_master) - { - ObjectInfo info; - info.type = InternalRegistry::ice_staticId(); - info.proxy = session->getProxy(); - addObject(info, true); - } + _applicationObserverTopic->subscribe(session->getObserver()); + _adapterObserverTopic->subscribe(session->getObserver()); + _objectObserverTopic->subscribe(session->getObserver()); +} - RegistryObserverTopicPtr topic = getRegistryObserverTopic(); - if(topic) - { - topic->subscribe(session->getObserver()); - } +InternalRegistryPrx +Database::getReplica(const string& name) const +{ + return _replicaCache.get(name)->getProxy(); } -void -Database::removeReplica(const string& name, const ReplicaSessionIPtr& session, bool shutdown) +RegistryInfo +Database::getReplicaInfo(const string& name) const { - // - // If this registry isn't being shutdown remove the replica - // well-known proxy from the database. - // - if(!shutdown) - { - removeObject(session->getProxy()->ice_getIdentity()); - } + return _replicaCache.get(name)->getInfo(); +} - RegistryObserverTopicPtr topic = getRegistryObserverTopic(); - if(topic) - { - topic->unsubscribe(session->getObserver()); - } +void +Database::removeReplica(const string& name, const ReplicaSessionIPtr& session) +{ + _applicationObserverTopic->unsubscribe(session->getObserver()); + _adapterObserverTopic->unsubscribe(session->getObserver()); + _objectObserverTopic->unsubscribe(session->getObserver()); _replicaCache.remove(name); } -Ice::StringSeq -Database::getAllNodes(const string& expression) +Ice::StringSeq +Database::getAllReplicas(const string& expression) { - return _nodeCache.getAll(expression); + return _replicaCache.getAll(expression); } ServerInfo @@ -810,8 +775,7 @@ Database::getAllNodeServers(const string& node) } bool -Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy, - int masterSerial) +Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy) { AdapterInfo info; int serial; @@ -851,7 +815,7 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr _adapters.erase(p); } - serial = ++_serial; + serial = ++_adapterSerial; } if(_traceLevels->adapter > 0) @@ -864,24 +828,21 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr } } - if(_registryObserverTopic) + if(proxy) { - if(proxy) + if(updated) { - if(updated) - { - _registryObserverTopic->getPublisher()->adapterUpdated(serial, info); - } - else - { - _registryObserverTopic->getPublisher()->adapterAdded(serial, info); - } + _adapterObserverTopic->adapterUpdated(serial, info); } else { - _registryObserverTopic->getPublisher()->adapterRemoved(serial, adapterId); + _adapterObserverTopic->adapterAdded(serial, info); } } + else + { + _adapterObserverTopic->adapterRemoved(serial, adapterId); + } return true; } @@ -943,12 +904,12 @@ Database::removeAdapter(const string& adapterId) if(infos.empty()) { - serial = ++_serial; + serial = ++_adapterSerial; } else { - serial = _serial; - _serial += static_cast<int>(static_cast<int>(infos.size())); + serial = _adapterSerial; + _adapterSerial += static_cast<int>(static_cast<int>(infos.size())); } } @@ -958,18 +919,15 @@ Database::removeAdapter(const string& adapterId) out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'"; } - if(_registryObserverTopic) + if(infos.empty()) { - if(infos.empty()) - { - _registryObserverTopic->getPublisher()->adapterRemoved(serial, adapterId); - } - else + _adapterObserverTopic->adapterRemoved(serial, adapterId); + } + else + { + for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) { - for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) - { - _registryObserverTopic->getPublisher()->adapterUpdated(++serial, *p); - } + _adapterObserverTopic->adapterUpdated(++serial, *p); } } } @@ -1125,7 +1083,7 @@ Database::getAllAdapters(const string& expression) } void -Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase, int masterSerial) +Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase) { int serial; const Ice::Identity id = info.proxy->ice_getIdentity(); @@ -1150,22 +1108,19 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase, int } _objects.put(IdentityObjectInfoDict::value_type(id, info)); - serial = ++_serial; + serial = ++_objectSerial; } // // Notify the observers. // - if(_registryObserverTopic) + if(!update) { - if(!update) - { - _registryObserverTopic->getPublisher()->objectAdded(serial, info); - } - else - { - _registryObserverTopic->getPublisher()->objectUpdated(serial, info); - } + _objectObserverTopic->objectAdded(serial, info); + } + else + { + _objectObserverTopic->objectUpdated(serial, info); } if(_traceLevels->object > 0) @@ -1183,7 +1138,7 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase, int } void -Database::removeObject(const Ice::Identity& id, int masterSerial) +Database::removeObject(const Ice::Identity& id) { int serial; { @@ -1207,16 +1162,13 @@ Database::removeObject(const Ice::Identity& id, int masterSerial) } _objects.erase(p); - serial = ++_serial; + serial = ++_objectSerial; } // // Notify the observers. // - if(_registryObserverTopic) - { - _registryObserverTopic->getPublisher()->objectRemoved(serial, id); - } + _objectObserverTopic->objectRemoved(serial, id); if(_traceLevels->object > 0) { @@ -1226,7 +1178,7 @@ Database::removeObject(const Ice::Identity& id, int masterSerial) } void -Database::updateObject(const Ice::ObjectPrx& proxy, int masterSerial) +Database::updateObject(const Ice::ObjectPrx& proxy) { const Ice::Identity id = proxy->ice_getIdentity(); int serial; @@ -1255,16 +1207,13 @@ Database::updateObject(const Ice::ObjectPrx& proxy, int masterSerial) info.proxy = proxy; p.set(info); - serial = ++_serial; + serial = ++_objectSerial; } // // Notify the observers. // - if(_registryObserverTopic) - { - _registryObserverTopic->getPublisher()->objectUpdated(serial, info); - } + _objectObserverTopic->objectUpdated(serial, info); if(_traceLevels->object > 0) { @@ -1274,6 +1223,88 @@ Database::updateObject(const Ice::ObjectPrx& proxy, int masterSerial) } void +Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects) +{ + int serial; + vector<bool> updated; + { + Lock sync(*this); + + Freeze::TransactionHolder txHolder(_connection); + for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) + { + updated.push_back(_objects.find(p->proxy->ice_getIdentity()) != _objects.end()); + _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p)); + } + serial = _objectSerial; + _objectSerial += static_cast<int>(static_cast<int>(objects.size())); + txHolder.commit(); + } + + // + // Notify the observers. + // + vector<bool>::const_iterator q = updated.begin(); + for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p, ++q) + { + // + // TODO: Add a better observer call? + // + if(!*q) + { + _objectObserverTopic->objectAdded(++serial, *p); + } + else + { + _objectObserverTopic->objectUpdated(++serial, *p); + } + + if(_traceLevels->object > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); + if(!*q) + { + out << "added object `" << _communicator->identityToString(p->proxy->ice_getIdentity()) << "'"; + } + else + { + out << "updated object `" << _communicator->identityToString(p->proxy->ice_getIdentity()) << "'"; + } + } + } +} + +void +Database::removeObjectsInDatabase(const ObjectInfoSeq& objects) +{ + int serial; + { + Freeze::TransactionHolder txHolder(_connection); + for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) + { + _objects.erase(p->proxy->ice_getIdentity()); + } + serial = _objectSerial; + _objectSerial += static_cast<int>(static_cast<int>(objects.size())); + txHolder.commit(); + } + + // + // Notify the observers. + // + for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) + { + _objectObserverTopic->objectRemoved(++serial, p->proxy->ice_getIdentity()); + + if(_traceLevels->object > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); + out << "removed object `" << _communicator->identityToString(p->proxy->ice_getIdentity()) << "'"; + } + } +} + +void Database::allocateObject(const Ice::Identity& id, const ObjectAllocationRequestPtr& request) { _allocatableObjectCache.get(id)->allocate(request); @@ -1741,7 +1772,7 @@ Database::finishUpdate(ServerEntrySeq& entries, updateInfo.descriptor = update; _applications.put(StringApplicationInfoDict::value_type(update.name, info)); - serial = ++_serial; + serial = ++_applicationSerial; _updating.erase(update.name); notifyAll(); } @@ -1749,10 +1780,7 @@ Database::finishUpdate(ServerEntrySeq& entries, // // Notify the observers. // - if(_registryObserverTopic) - { - _registryObserverTopic->getPublisher()->applicationUpdated(serial, updateInfo); - } + _applicationObserverTopic->applicationUpdated(serial, updateInfo); if(_traceLevels->application > 0) { @@ -1760,4 +1788,3 @@ Database::finishUpdate(ServerEntrySeq& entries, out << "updated application `" << update.name << "'"; } } - |