diff options
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 328 |
1 files changed, 227 insertions, 101 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 1ee45b553e8..bb7e5a1e86f 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -16,6 +16,7 @@ #include <IceGrid/DescriptorHelper.h> #include <IceGrid/NodeSessionI.h> #include <IceGrid/Session.h> +#include <IceGrid/Topics.h> #include <algorithm> #include <functional> @@ -108,59 +109,45 @@ struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::Ob } -Database::Database(const Ice::ObjectAdapterPtr& adapter, - const string& envName, +Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, + const IceStorm::TopicManagerPrx& topicManager, + const Ice::ObjectPrx& clientProxy, + const Ice::ObjectPrx& serverProxy, const string& instanceName, int nodeSessionTimeout, const TraceLevelsPtr& traceLevels) : - _communicator(adapter->getCommunicator()), - _internalAdapter(adapter), - _envName(envName), + _communicator(registryAdapter->getCommunicator()), + _internalAdapter(registryAdapter), + _topicManager(topicManager), + _envName("Registry"), _instanceName(instanceName), - _traceLevels(traceLevels), - _nodeCache(_communicator, nodeSessionTimeout), + _traceLevels(traceLevels), + _replicaCache(_communicator, topicManager, instanceName, clientProxy, serverProxy), + _nodeCache(_communicator, _replicaCache, nodeSessionTimeout), _objectCache(_communicator), _allocatableObjectCache(_communicator), _serverCache(_communicator, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache), - _connection(Freeze::createConnection(adapter->getCommunicator(), envName)), + _connection(Freeze::createConnection(registryAdapter->getCommunicator(), _envName)), _descriptors(_connection, _descriptorDbName), _objects(_connection, _objectDbName), _adapters(_connection, _adapterDbName), _lock(0), - _serial(0) + _serial(-1) { // - // Cache the servers & adapters. + // Register a default servant to manage manually registered object adapters. // - ServerEntrySeq entries; - for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p) + __setNoDelete(true); + try { - try - { - load(ApplicationHelper(_communicator, p->second), entries); - } - catch(const DeploymentException& ex) - { - Ice::Warning warn(_traceLevels->logger); - warn << "invalid application `" << p->first << "':\n" << ex.reason; - } + _internalAdapter->addServantLocator(new AdapterServantLocator(this), "IceGridAdapter"); } - - _serverCache.setTraceLevels(_traceLevels); - _nodeCache.setTraceLevels(_traceLevels); - _adapterCache.setTraceLevels(_traceLevels); - _objectCache.setTraceLevels(_traceLevels); - _allocatableObjectCache.setTraceLevels(_traceLevels); - - // - // Register a default servant to manage manually registered object adapters. - // - // NOTE: This must be done only once we're sure this constructor - // won't throw. The servant locator is holding a handle on this - // object and if an exception was thrown a bogus database object - // won't be referenced from the servant locator. - // - _internalAdapter->addServantLocator(new AdapterServantLocator(this), "IceGridAdapter"); + catch(...) + { + __setNoDelete(false); + throw; + } + __setNoDelete(false); } Database::~Database() @@ -179,41 +166,32 @@ Database::getInstanceName() const return _instanceName; } -void -Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeObserverPrx& nodeObserver) +RegistryObserverTopicPtr +Database::getRegistryObserverTopic() const { - int serial; - ApplicationDescriptorSeq applications; - AdapterInfoSeq adapters; - ObjectInfoSeq objects; + Lock sync(*this); + return _registryObserverTopic; +} - _registryObserver = registryObserver; - _nodeObserver = nodeObserver; - serial = _serial; - - for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p) - { - applications.push_back(p->second); - } - - for(StringAdapterInfoDict::const_iterator q = _adapters.begin(); q != _adapters.end(); ++q) - { - adapters.push_back(q->second); - if(adapters.back().id.empty()) - { - adapters.back().id = q->first; - } - } - - for(IdentityObjectInfoDict::const_iterator r = _objects.begin(); r != _objects.end(); ++r) - { - objects.push_back(r->second); - } +NodeObserverTopicPtr +Database::getNodeObserverTopic() const +{ + Lock sync(*this); + return _nodeObserverTopic; +} - // - // Notify the observers. - // - _registryObserver->init(serial, applications, adapters, objects); +void +Database::clearTopics() +{ + Lock sync(*this); + _registryObserverTopic = 0; + _nodeObserverTopic = 0; +} + +int +Database::getSessionTimeout() const +{ + return _nodeCache.getSessionTimeout(); } void @@ -260,7 +238,103 @@ Database::unlock(AdminSessionI* session) } void -Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& desc) +Database::init(int serial) +{ + ApplicationDescriptorSeq applications; + AdapterInfoSeq adapters; + ObjectInfoSeq objects; + + // + // Cache the servers & adapters. + // + ServerEntrySeq entries; + for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p) + { + applications.push_back(p->second); + try + { + load(ApplicationHelper(_communicator, p->second), entries); + } + catch(const DeploymentException& ex) + { + Ice::Warning warn(_traceLevels->logger); + warn << "invalid application `" << p->first << "':\n" << ex.reason; + } + } + + for(StringAdapterInfoDict::const_iterator q = _adapters.begin(); q != _adapters.end(); ++q) + { + adapters.push_back(q->second); + if(adapters.back().id.empty()) + { + adapters.back().id = q->first; + } + } + + for(IdentityObjectInfoDict::const_iterator r = _objects.begin(); r != _objects.end(); ++r) + { + objects.push_back(r->second); + } + + _serverCache.setTraceLevels(_traceLevels); + _nodeCache.setTraceLevels(_traceLevels); + _replicaCache.setTraceLevels(_traceLevels); + _adapterCache.setTraceLevels(_traceLevels); + _objectCache.setTraceLevels(_traceLevels); + _allocatableObjectCache.setTraceLevels(_traceLevels); + + _serial = serial; + + if(_registryObserverTopic) + { + // + // Initialize the topic cache. + // + _registryObserverTopic->getPublisher()->init(_serial, applications, adapters, objects); + } +} + +void +Database::initMaster() +{ + Lock sync(*this); + _nodeObserverTopic = new NodeObserverTopic(_internalAdapter, _topicManager); + _registryObserverTopic = new RegistryObserverTopic(_internalAdapter, _topicManager); + init(0); +} + +void +Database::initReplica(int masterSerial, + const ApplicationDescriptorSeq& applications, + const AdapterInfoSeq& adapters, + const ObjectInfoSeq& objects) +{ + Lock sync(*this); + + _descriptors.clear(); + for(ApplicationDescriptorSeq::const_iterator p = applications.begin(); p != applications.end(); ++p) + { + _descriptors.put(StringApplicationDescriptorDict::value_type(p->name, *p)); + } + + _objects.clear(); + for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) + { + _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); + } + + _adapters.clear(); + for(AdapterInfoSeq::const_iterator r = adapters.end(); r != adapters.end(); ++r) + { + _adapters.put(StringAdapterInfoDict::value_type(r->id, *r)); + } + + init(masterSerial); + notifyAll(); +} + +void +Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& desc, int masterSerial) { ServerEntrySeq entries; { @@ -326,7 +400,10 @@ Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDesc // // Notify the observers. // - _registryObserver->applicationAdded(serial, desc); + if(_registryObserverTopic) + { + _registryObserverTopic->getPublisher()->applicationAdded(serial, desc); + } if(_traceLevels->application > 0) { @@ -336,7 +413,8 @@ Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDesc } void -Database::updateApplicationDescriptor(AdminSessionI* session, const ApplicationUpdateDescriptor& update) +Database::updateApplicationDescriptor(AdminSessionI* session, const ApplicationUpdateDescriptor& update, + int masterSerial) { ServerEntrySeq entries; ApplicationDescriptor oldDesc; @@ -449,7 +527,7 @@ Database::instantiateServer(AdminSessionI* session, } void -Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& name) +Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& name, int masterSerial) { ServerEntrySeq entries; int serial; @@ -490,7 +568,10 @@ Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& // // Notify the observers // - _registryObserver->applicationRemoved(serial, name); + if(_registryObserverTopic) + { + _registryObserverTopic->getPublisher()->applicationRemoved(serial, name); + } if(_traceLevels->application > 0) { @@ -527,6 +608,17 @@ Database::getAllApplications(const string& expression) void Database::addNode(const string& name, const NodeSessionIPtr& session) { + { + // + // Wait for the database to be initialized before to add a + // node. + // + Lock sync(*this); + while(_serial < 0) + { + wait(); + } + } _nodeCache.get(name, true)->setSession(session); } @@ -550,10 +642,26 @@ Database::removeNode(const string& name) // observer to ensure that only nodes which are up are teared // down). // - _nodeObserver->nodeDown(name); + if(_nodeObserverTopic) + { + _nodeObserverTopic->getPublisher()->nodeDown(name); + } + _nodeCache.get(name)->setSession(0); } +void +Database::addReplica(const string& name, const ReplicaSessionIPtr& session) +{ + _replicaCache.add(name, session, this); +} + +void +Database::removeReplica(const string& name) +{ + _replicaCache.remove(name, this); +} + Ice::StringSeq Database::getAllNodes(const string& expression) { @@ -593,7 +701,8 @@ Database::getAllNodeServers(const string& node) } bool -Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy) +Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy, + int masterSerial) { AdapterInfo info; int serial; @@ -646,21 +755,24 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr } } - if(proxy) + if(_registryObserverTopic) { - if(updated) + if(proxy) { - _registryObserver->adapterUpdated(serial, info); + if(updated) + { + _registryObserverTopic->getPublisher()->adapterUpdated(serial, info); + } + else + { + _registryObserverTopic->getPublisher()->adapterAdded(serial, info); + } } else { - _registryObserver->adapterAdded(serial, info); + _registryObserverTopic->getPublisher()->adapterRemoved(serial, adapterId); } } - else - { - _registryObserver->adapterRemoved(serial, adapterId); - } return true; } @@ -736,16 +848,19 @@ Database::removeAdapter(const string& adapterId) Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'"; } - - if(infos.empty()) - { - _registryObserver->adapterRemoved(serial, adapterId); - } - else + + if(_registryObserverTopic) { - for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) + if(infos.empty()) { - _registryObserver->adapterUpdated(++serial, *p); + _registryObserverTopic->getPublisher()->adapterRemoved(serial, adapterId); + } + else + { + for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) + { + _registryObserverTopic->getPublisher()->adapterUpdated(++serial, *p); + } } } } @@ -901,7 +1016,7 @@ Database::getAllAdapters(const string& expression) } void -Database::addObject(const ObjectInfo& info) +Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase, int masterSerial) { int serial; const Ice::Identity id = info.proxy->ice_getIdentity(); @@ -912,7 +1027,7 @@ Database::addObject(const ObjectInfo& info) throw ObjectExistsException(id); } - if(_objects.find(id) != _objects.end()) + if(!replaceIfExistsInDatabase && _objects.find(id) != _objects.end()) { throw ObjectExistsException(id); } @@ -924,7 +1039,10 @@ Database::addObject(const ObjectInfo& info) // // Notify the observers. // - _registryObserver->objectAdded(serial, info); + if(_registryObserverTopic) + { + _registryObserverTopic->getPublisher()->objectAdded(serial, info); + } if(_traceLevels->object > 0) { @@ -934,7 +1052,7 @@ Database::addObject(const ObjectInfo& info) } void -Database::removeObject(const Ice::Identity& id) +Database::removeObject(const Ice::Identity& id, int masterSerial) { int serial; { @@ -964,7 +1082,10 @@ Database::removeObject(const Ice::Identity& id) // // Notify the observers. // - _registryObserver->objectRemoved(serial, id); + if(_registryObserverTopic) + { + _registryObserverTopic->getPublisher()->objectRemoved(serial, id); + } if(_traceLevels->object > 0) { @@ -974,7 +1095,7 @@ Database::removeObject(const Ice::Identity& id) } void -Database::updateObject(const Ice::ObjectPrx& proxy) +Database::updateObject(const Ice::ObjectPrx& proxy, int masterSerial) { const Ice::Identity id = proxy->ice_getIdentity(); int serial; @@ -1009,7 +1130,10 @@ Database::updateObject(const Ice::ObjectPrx& proxy) // // Notify the observers. // - _registryObserver->objectUpdated(serial, info); + if(_registryObserverTopic) + { + _registryObserverTopic->getPublisher()->objectUpdated(serial, info); + } if(_traceLevels->object > 0) { @@ -1464,13 +1588,15 @@ Database::finishUpdate(ServerEntrySeq& entries, // // Notify the observers. // - _registryObserver->applicationUpdated(serial, update); + if(_registryObserverTopic) + { + _registryObserverTopic->getPublisher()->applicationUpdated(serial, update); + } if(_traceLevels->application > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); out << "updated application `" << update.name << "'"; } - } |