diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-07-27 13:20:03 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-07-27 13:20:03 +0000 |
commit | dad0bda0d462b4730b168befd187ca43446f12e7 (patch) | |
tree | 1a964dc5a3bd2c945e0ee165ddda55c3e4281157 /cpp/src/IceGrid/Database.cpp | |
parent | Improved __checkMode (diff) | |
download | ice-dad0bda0d462b4730b168befd187ca43446f12e7.tar.bz2 ice-dad0bda0d462b4730b168befd187ca43446f12e7.tar.xz ice-dad0bda0d462b4730b168befd187ca43446f12e7.zip |
More IceGrid replication improvements.
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 168 |
1 files changed, 121 insertions, 47 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 7fee927b7cd..aa683e5344b 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -15,6 +15,7 @@ #include <IceGrid/Util.h> #include <IceGrid/DescriptorHelper.h> #include <IceGrid/NodeSessionI.h> +#include <IceGrid/ReplicaSessionI.h> #include <IceGrid/Session.h> #include <IceGrid/Topics.h> @@ -116,10 +117,8 @@ struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::Ob Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, const IceStorm::TopicManagerPrx& topicManager, - const Ice::ObjectPrx& clientProxy, - const Ice::ObjectPrx& serverProxy, const string& instanceName, - int nodeSessionTimeout, + int sessionTimeout, const TraceLevelsPtr& traceLevels) : _communicator(registryAdapter->getCommunicator()), _internalAdapter(registryAdapter), @@ -127,11 +126,14 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _envName("Registry"), _instanceName(instanceName), _traceLevels(traceLevels), - _replicaCache(_communicator, topicManager, instanceName, clientProxy, serverProxy), - _nodeCache(_communicator, _replicaCache, nodeSessionTimeout), + _sessionTimeout(sessionTimeout), + _replicaCache(_communicator, topicManager), + _nodeCache(_communicator, _replicaCache), _objectCache(_communicator), _allocatableObjectCache(_communicator), - _serverCache(_communicator, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache), + _serverCache(_communicator, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache, sessionTimeout), + _clientProxy(_communicator->stringToProxy("dummy")), + _serverProxy(_communicator->stringToProxy("dummy")), _connection(Freeze::createConnection(registryAdapter->getCommunicator(), _envName)), _descriptors(_connection, _descriptorDbName), _objects(_connection, _objectDbName), @@ -153,6 +155,13 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, } } + _serverCache.setTraceLevels(_traceLevels); + _nodeCache.setTraceLevels(_traceLevels); + _replicaCache.setTraceLevels(_traceLevels); + _adapterCache.setTraceLevels(_traceLevels); + _objectCache.setTraceLevels(_traceLevels); + _allocatableObjectCache.setTraceLevels(_traceLevels); + // // Register a default servant to manage manually registered object adapters. // @@ -177,6 +186,7 @@ void Database::destroy() { _nodeCache.destroy(); // Break cyclic reference count. + _replicaCache.destroy(); } std::string @@ -207,12 +217,6 @@ Database::clearTopics() _nodeObserverTopic = 0; } -int -Database::getSessionTimeout() const -{ - return _nodeCache.getSessionTimeout(); -} - void Database::checkSessionLock(AdminSessionI* session) { @@ -261,13 +265,6 @@ Database::initMaster() { Lock sync(*this); - _serverCache.setTraceLevels(_traceLevels); - _nodeCache.setTraceLevels(_traceLevels); - _replicaCache.setTraceLevels(_traceLevels); - _adapterCache.setTraceLevels(_traceLevels); - _objectCache.setTraceLevels(_traceLevels); - _allocatableObjectCache.setTraceLevels(_traceLevels); - _nodeObserverTopic = new NodeObserverTopic(_internalAdapter, _topicManager); _registryObserverTopic = new RegistryObserverTopic(_internalAdapter, _topicManager); _serial = 0; @@ -298,19 +295,6 @@ Database::initReplica(int masterSerial, { Lock sync(*this); - if(_serial < 0) - { - _serverCache.setTraceLevels(_traceLevels); - _nodeCache.setTraceLevels(_traceLevels); - _replicaCache.setTraceLevels(_traceLevels); - _adapterCache.setTraceLevels(_traceLevels); - _objectCache.setTraceLevels(_traceLevels); - _allocatableObjectCache.setTraceLevels(_traceLevels); - } - else - { -// assert(_serial <= masterSerial); // TODO: Master might have been restarted. - } _serial = masterSerial; ServerEntrySeq entries; @@ -365,6 +349,47 @@ Database::initReplica(int masterSerial, } void +Database::setClientProxy(const Ice::ObjectPrx& proxy) +{ + Lock sync(*this); + _clientProxy = proxy; +} + +void +Database::setServerProxy(const Ice::ObjectPrx& proxy) +{ + Lock sync(*this); + _serverProxy = proxy; +} + +Ice::ObjectPrx +Database::getClientProxy() const +{ + Lock sync(*this); + return _clientProxy; +} + +Ice::ObjectPrx +Database::getServerProxy() const +{ + Lock sync(*this); + return _serverProxy; +} + +void +Database::updateReplicatedWellKnownObjects() +{ + Ice::ObjectPrx clientProxy = _replicaCache.getClientProxy(getClientProxy()); + Ice::Identity id; + id.category = _instanceName; + id.name = "Query"; + ObjectInfo info; + info.type = Query::ice_staticId(); + info.proxy = clientProxy->ice_identity(id); + addObject(info, true); +} + +void Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& desc, int masterSerial) { ServerEntrySeq entries; @@ -640,6 +665,11 @@ void Database::addNode(const string& name, const NodeSessionIPtr& session) { _nodeCache.get(name, true)->setSession(session); + + ObjectInfo info; + info.type = Node::ice_staticId(); + info.proxy = session->getNode(); + addObject(info, true); } NodePrx @@ -655,16 +685,22 @@ Database::getNodeInfo(const string& name) const } void -Database::removeNode(const string& name) +Database::removeNode(const string& name, const NodeSessionIPtr& session, bool shutdown) { + if(!shutdown) + { + removeObject(session->getNode()->ice_getIdentity()); + } + // // We must notify the observer first (there's an assert in the // observer to ensure that only nodes which are up are teared // down). // - if(_nodeObserverTopic) + NodeObserverTopicPtr topic = getNodeObserverTopic(); + if(topic) { - _nodeObserverTopic->getPublisher()->nodeDown(name); + topic->getPublisher()->nodeDown(name); } _nodeCache.get(name)->setSession(0); @@ -673,19 +709,35 @@ Database::removeNode(const string& name) void Database::addReplica(const string& name, const ReplicaSessionIPtr& session) { - _replicaCache.add(name, session, this); -} + ReplicaEntryPtr entry = _replicaCache.add(name, session); -InternalRegistryPrxSeq -Database::getReplicas() const -{ - return _replicaCache.getAll(); + ObjectInfo info; + info.type = InternalRegistry::ice_staticId(); + info.proxy = session->getProxy(); + addObject(info, true); + + RegistryObserverTopicPtr topic = getRegistryObserverTopic(); + if(topic) + { + topic->subscribe(session->getObserver()); + } } void -Database::removeReplica(const string& name) +Database::removeReplica(const string& name, const ReplicaSessionIPtr& session, bool shutdown) { - _replicaCache.remove(name, this); + if(!shutdown) + { + removeObject(session->getProxy()->ice_getIdentity()); + } + + RegistryObserverTopicPtr topic = getRegistryObserverTopic(); + if(topic) + { + topic->unsubscribe(session->getObserver()); + } + + _replicaCache.remove(name); } Ice::StringSeq @@ -1046,6 +1098,7 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase, int { int serial; const Ice::Identity id = info.proxy->ice_getIdentity(); + bool update = false; { Lock sync(*this); if(_objectCache.has(id)) @@ -1053,9 +1106,16 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase, int throw ObjectExistsException(id); } - if(!replaceIfExistsInDatabase && _objects.find(id) != _objects.end()) + if(_objects.find(id) != _objects.end()) { - throw ObjectExistsException(id); + if(!replaceIfExistsInDatabase) + { + throw ObjectExistsException(id); + } + else + { + update = true; + } } _objects.put(IdentityObjectInfoDict::value_type(id, info)); @@ -1067,13 +1127,27 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase, int // if(_registryObserverTopic) { - _registryObserverTopic->getPublisher()->objectAdded(serial, info); + if(!update) + { + _registryObserverTopic->getPublisher()->objectAdded(serial, info); + } + else + { + _registryObserverTopic->getPublisher()->objectUpdated(serial, info); + } } if(_traceLevels->object > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); - out << "added object `" << _communicator->identityToString(id) << "'"; + if(!update) + { + out << "added object `" << _communicator->identityToString(id) << "'"; + } + else + { + out << "updated object `" << _communicator->identityToString(id) << "'"; + } } } |