diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 92 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/Internal.ice | 18 | ||||
-rw-r--r-- | cpp/src/IceGrid/InternalRegistryI.cpp | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/InternalRegistryI.h | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaCache.cpp | 76 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaCache.h | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionI.cpp | 56 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionI.h | 6 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.cpp | 117 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.h | 28 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionManager.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 111 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.h | 8 | ||||
-rw-r--r-- | cpp/src/IceGrid/WellKnownObjectsManager.cpp | 28 | ||||
-rw-r--r-- | cpp/src/IceGrid/WellKnownObjectsManager.h | 4 | ||||
-rw-r--r-- | cpp/test/IceGrid/allocation/Makefile | 12 | ||||
-rw-r--r-- | cpp/test/IceGrid/allocation/Makefile.mak | 13 | ||||
-rw-r--r-- | cpp/test/IceGrid/allocation/application.xml | 8 |
19 files changed, 374 insertions, 218 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 084c01b2004..8ebbc069ed2 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -195,7 +195,6 @@ void Database::destroy() { _nodeCache.destroy(); // Break cyclic reference count. - _replicaCache.destroy(); } std::string @@ -285,6 +284,7 @@ Database::syncApplications(const ApplicationInfoSeq& applications) { Lock sync(*this); + Freeze::TransactionHolder txHolder(_connection); ServerEntrySeq entries; set<string> names; for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p) @@ -325,6 +325,7 @@ Database::syncApplications(const ApplicationInfoSeq& applications) } } serial = ++_applicationSerial; + txHolder.commit(); } _applicationObserverTopic->applicationInit(serial, applications); @@ -336,14 +337,16 @@ Database::syncAdapters(const AdapterInfoSeq& adapters) int serial; { Lock sync(*this); + + Freeze::TransactionHolder txHolder(_connection); _adapters.clear(); for(AdapterInfoSeq::const_iterator r = adapters.end(); r != adapters.end(); ++r) { _adapters.put(StringAdapterInfoDict::value_type(r->id, *r)); } serial = ++_adapterSerial; - } - + txHolder.commit(); + } _adapterObserverTopic->adapterInit(serial, adapters); } @@ -354,8 +357,19 @@ Database::syncObjects(const ObjectInfoSeq& objects) int serial; { Lock sync(*this); + + Freeze::TransactionHolder txHolder(_connection); + + ObjectInfoSeq nodes; + for(IdentityObjectInfoDict::const_iterator p = _objects.findByType(Node::ice_staticId()); p != _objects.end(); + ++p) + { + nodes.push_back(p->second); + } + _objects.clear(); - for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) + ObjectInfoSeq::const_iterator q; + for(q = objects.begin(); q != objects.end(); ++q) { const Ice::Identity& id = q->proxy->ice_getIdentity(); if(id.category != _instanceName || id.name.find("Node-") != 0) @@ -365,8 +379,18 @@ Database::syncObjects(const ObjectInfoSeq& objects) _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); } } + for(q = nodes.begin(); q != nodes.end(); ++q) + { + const Ice::Identity& id = q->proxy->ice_getIdentity(); + if(id.category == _instanceName || id.name.find("Node-") == 0) + { + _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); + } + } serial = ++_objectSerial; + txHolder.commit(); } + _objectObserverTopic->objectInit(serial, objects); } @@ -720,12 +744,7 @@ void Database::addReplica(const string& name, const ReplicaSessionIPtr& session) { _replicaCache.add(name, session); - _registryObserverTopic->registryUp(session->getInfo()); - - _applicationObserverTopic->subscribe(session->getObserver(), name); - _adapterObserverTopic->subscribe(session->getObserver(), name); - _objectObserverTopic->subscribe(session->getObserver(), name); } InternalRegistryPrx @@ -768,15 +787,10 @@ Database::waitForApplicationReplication(const AMD_NodeSession_waitForApplication } void -Database::removeReplica(const string& name, const ReplicaSessionIPtr& session) +Database::removeReplica(const string& name, const ReplicaSessionIPtr& session, bool shutdown) { - _applicationObserverTopic->unsubscribe(session->getObserver(), name); - _adapterObserverTopic->unsubscribe(session->getObserver(), name); - _objectObserverTopic->unsubscribe(session->getObserver(), name); - _registryObserverTopic->registryDown(name); - - _replicaCache.remove(name); + _replicaCache.remove(name, shutdown); } Ice::StringSeq @@ -1271,39 +1285,11 @@ Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects) updated.push_back(_objects.find(p->proxy->ice_getIdentity()) != _objects.end()); _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p)); } - serial = _objectSerial; - _objectSerial += static_cast<int>(static_cast<int>(objects.size())); + serial = ++_objectSerial; txHolder.commit(); } - vector<bool>::const_iterator q = updated.begin(); - for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p, ++q) - { - // - // TODO: Add a better observer call? - // - if(!*q) - { - _objectObserverTopic->objectAdded(++serial, *p); - } - else - { - _objectObserverTopic->objectUpdated(++serial, *p); - } - - if(_traceLevels->object > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); - if(!*q) - { - out << "added object `" << _communicator->identityToString(p->proxy->ice_getIdentity()) << "'"; - } - else - { - out << "updated object `" << _communicator->identityToString(p->proxy->ice_getIdentity()) << "'"; - } - } - } + _objectObserverTopic->objectsAddedOrUpdated(serial, objects); } void @@ -1317,21 +1303,11 @@ Database::removeObjectsInDatabase(const ObjectInfoSeq& objects) { _objects.erase(p->proxy->ice_getIdentity()); } - serial = _objectSerial; - _objectSerial += static_cast<int>(static_cast<int>(objects.size())); + serial = ++_objectSerial; txHolder.commit(); } - for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) - { - _objectObserverTopic->objectRemoved(++serial, p->proxy->ice_getIdentity()); - - if(_traceLevels->object > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); - out << "removed object `" << _communicator->identityToString(p->proxy->ice_getIdentity()) << "'"; - } - } + _objectObserverTopic->objectsRemoved(serial, objects); } void diff --git a/cpp/src/IceGrid/Database.h b/cpp/src/IceGrid/Database.h index 9f93d00cdc8..af6fbd0a268 100644 --- a/cpp/src/IceGrid/Database.h +++ b/cpp/src/IceGrid/Database.h @@ -94,7 +94,7 @@ public: void replicaReceivedUpdate(const std::string&, TopicName, int, const std::string&); void waitForApplicationReplication(const AMD_NodeSession_waitForApplicationReplicationPtr&, const std::string&, int); - void removeReplica(const std::string&, const ReplicaSessionIPtr&); + void removeReplica(const std::string&, const ReplicaSessionIPtr&, bool); Ice::StringSeq getAllReplicas(const std::string& = std::string()); ServerInfo getServerInfo(const std::string&, bool = false); diff --git a/cpp/src/IceGrid/Internal.ice b/cpp/src/IceGrid/Internal.ice index 163b0c87931..4820c2de6ed 100644 --- a/cpp/src/IceGrid/Internal.ice +++ b/cpp/src/IceGrid/Internal.ice @@ -351,6 +351,10 @@ enum TopicName ObjectObserverTopicName }; +interface DatabaseObserver extends ApplicationObserver, ObjectObserver, AdapterObserver +{ +}; + interface ReplicaSession { /** @@ -369,6 +373,14 @@ interface ReplicaSession /** * + * Set the database observer. Once the observer is subscribed, it + * will receive the database and database updates. + * + **/ + idempotent void setDatabaseObserver(DatabaseObserver* dbObs); + + /** + * * This method sets the endpoints of the replica. This allows the * master to create proxies with multiple endpoints for replicated * objects (e.g.: IceGrid::Query object). @@ -410,10 +422,6 @@ interface ReplicaSession void destroy(); }; -interface DatabaseObserver extends ApplicationObserver, ObjectObserver, AdapterObserver -{ -}; - interface InternalRegistry { /** @@ -437,7 +445,7 @@ interface InternalRegistry NodeSession* registerNode(string name, Node* nd, NodeInfo info) throws NodeActiveException; - ReplicaSession* registerReplica(string name, RegistryInfo info, InternalRegistry* prx, DatabaseObserver* dbObs) + ReplicaSession* registerReplica(string name, RegistryInfo info, InternalRegistry* prx) throws ReplicaActiveException; void registerWithReplica(InternalRegistry* prx); diff --git a/cpp/src/IceGrid/InternalRegistryI.cpp b/cpp/src/IceGrid/InternalRegistryI.cpp index acebca37ed8..0e773b8755b 100644 --- a/cpp/src/IceGrid/InternalRegistryI.cpp +++ b/cpp/src/IceGrid/InternalRegistryI.cpp @@ -132,13 +132,12 @@ ReplicaSessionPrx InternalRegistryI::registerReplica(const std::string& name, const RegistryInfo& info, const InternalRegistryPrx& registry, - const DatabaseObserverPrx& dbObserver, const Ice::Current& current) { try { ReplicaSessionIPtr session = new ReplicaSessionI(_database, _wellKnownObjects, name, info, registry, - dbObserver, _replicaSessionTimeout); + _replicaSessionTimeout); ReplicaSessionPrx proxy = ReplicaSessionPrx::uncheckedCast(current.adapter->addWithUUID(session)); _reaper->add(new SessionReapable<ReplicaSessionI>(current.adapter, session, proxy), _replicaSessionTimeout); return proxy; diff --git a/cpp/src/IceGrid/InternalRegistryI.h b/cpp/src/IceGrid/InternalRegistryI.h index 1287b633be4..fd30e1b4baa 100644 --- a/cpp/src/IceGrid/InternalRegistryI.h +++ b/cpp/src/IceGrid/InternalRegistryI.h @@ -39,8 +39,8 @@ public: virtual ~InternalRegistryI(); virtual NodeSessionPrx registerNode(const std::string&, const NodePrx&, const NodeInfo&, const Ice::Current&); - virtual ReplicaSessionPrx registerReplica(const std::string&, const RegistryInfo&, const InternalRegistryPrx&, - const DatabaseObserverPrx&, const Ice::Current&); + virtual ReplicaSessionPrx registerReplica(const std::string&, const RegistryInfo&, const InternalRegistryPrx&, + const Ice::Current&); virtual void registerWithReplica(const InternalRegistryPrx&, const Ice::Current&); diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp index c217a205ea7..28bf4bd21cd 100644 --- a/cpp/src/IceGrid/ReplicaCache.cpp +++ b/cpp/src/IceGrid/ReplicaCache.cpp @@ -34,12 +34,6 @@ ReplicaCache::ReplicaCache(const Ice::CommunicatorPtr& communicator, const IceSt const_cast<NodePrx&>(_nodes) = NodePrx::uncheckedCast(_topic->getPublisher()); } -void -ReplicaCache::destroy() -{ - _entries.clear(); -} - ReplicaEntryPtr ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session) { @@ -83,16 +77,25 @@ ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session) { _nodes->replicaAdded(session->getInternalRegistry()); } - catch(const Ice::LocalException&) + catch(const Ice::ConnectionRefusedException&) { - // TODO: XXX + // Expected if the replica is being shutdown. + } + catch(const Ice::LocalException& ex) + { + TraceLevelsPtr traceLevels = getTraceLevels(); + if(traceLevels) + { + Ice::Warning out(traceLevels->logger); + out << "unexpected exception while publishing `replicaAdded' update:\n" << ex; + } } return entry; } ReplicaEntryPtr -ReplicaCache::remove(const string& name) +ReplicaCache::remove(const string& name, bool shutdown) { ReplicaEntryPtr entry; { @@ -108,14 +111,26 @@ ReplicaCache::remove(const string& name) out << "replica `" << name << "' down"; } } - - try - { - _nodes->replicaRemoved(entry->getSession()->getInternalRegistry()); - } - catch(const Ice::LocalException&) + + if(!shutdown) { - // TODO: XXX + try + { + _nodes->replicaRemoved(entry->getSession()->getInternalRegistry()); + } + catch(const Ice::ConnectionRefusedException&) + { + // Expected if the replica is being shutdown. + } + catch(const Ice::LocalException& ex) + { + TraceLevelsPtr traceLevels = getTraceLevels(); + if(traceLevels) + { + Ice::Warning out(traceLevels->logger); + out << "unexpected exception while publishing `replicaRemoved' update:\n" << ex; + } + } } return entry; @@ -144,9 +159,18 @@ ReplicaCache::nodeAdded(const NodePrx& node) { _topic->subscribe(qos, node); } - catch(const Ice::LocalException&) + catch(const Ice::ConnectionRefusedException& ex) { - // TODO: XXX + // The replica is being shutdown. + } + catch(const Ice::LocalException& ex) + { + TraceLevelsPtr traceLevels = getTraceLevels(); + if(traceLevels) + { + Ice::Warning out(traceLevels->logger); + out << "unexpected exception while subscribing node from replica observer topic:\n" << ex; + } } } @@ -157,13 +181,18 @@ ReplicaCache::nodeRemoved(const NodePrx& node) { _topic->unsubscribe(node); } - catch(const Ice::ConnectionRefusedException&) + catch(const Ice::ConnectionRefusedException& ex) { // The replica is being shutdown. } - catch(const Ice::LocalException&) + catch(const Ice::LocalException& ex) { - // TODO: XXX + TraceLevelsPtr traceLevels = getTraceLevels(); + if(traceLevels) + { + Ice::Warning out(traceLevels->logger); + out << "unexpected exception while unsubscribing node from replica observer topic:\n" << ex; + } } } @@ -178,6 +207,7 @@ ReplicaCache::getEndpoints(const string& name, const Ice::ObjectPrx& proxy) cons endpoints.insert(endpoints.end(), endpts.begin(), endpts.end()); } + Lock sync(*this); for(map<string, ReplicaEntryPtr>::const_iterator p = _entries.begin(); p != _entries.end(); ++p) { Ice::ObjectPrx prx = p->second->getSession()->getEndpoint(name); @@ -197,6 +227,10 @@ ReplicaEntry::ReplicaEntry(const std::string& name, const ReplicaSessionIPtr& se { } +ReplicaEntry::~ReplicaEntry() +{ +} + const ReplicaSessionIPtr& ReplicaEntry::getSession() const { diff --git a/cpp/src/IceGrid/ReplicaCache.h b/cpp/src/IceGrid/ReplicaCache.h index 2bc776ca108..bcb0c982d3a 100644 --- a/cpp/src/IceGrid/ReplicaCache.h +++ b/cpp/src/IceGrid/ReplicaCache.h @@ -29,6 +29,7 @@ class ReplicaEntry : public IceUtil::Shared public: ReplicaEntry(const std::string&, const ReplicaSessionIPtr&); + virtual ~ReplicaEntry(); bool canRemove() const { return true; } const ReplicaSessionIPtr& getSession() const; @@ -47,10 +48,9 @@ class ReplicaCache : public CacheByString<ReplicaEntry> public: ReplicaCache(const Ice::CommunicatorPtr&, const IceStorm::TopicManagerPrx&); - void destroy(); ReplicaEntryPtr add(const std::string&, const ReplicaSessionIPtr&); - ReplicaEntryPtr remove(const std::string&); + ReplicaEntryPtr remove(const std::string&, bool); ReplicaEntryPtr get(const std::string&) const; void nodeAdded(const NodePrx&); diff --git a/cpp/src/IceGrid/ReplicaSessionI.cpp b/cpp/src/IceGrid/ReplicaSessionI.cpp index 89e0ee02c9e..26da8a7138d 100644 --- a/cpp/src/IceGrid/ReplicaSessionI.cpp +++ b/cpp/src/IceGrid/ReplicaSessionI.cpp @@ -21,14 +21,12 @@ ReplicaSessionI::ReplicaSessionI(const DatabasePtr& database, const string& name, const RegistryInfo& info, const InternalRegistryPrx& proxy, - const DatabaseObserverPrx& databaseObserver, int timeout) : _database(database), _wellKnownObjects(wellKnownObjects), _traceLevels(database->getTraceLevels()), _name(name), _internalRegistry(InternalRegistryPrx::uncheckedCast(proxy->ice_timeout(timeout * 1000))), - _databaseObserver(databaseObserver), _info(info), _timeout(timeout), _timestamp(IceUtil::Time::now()), @@ -72,6 +70,20 @@ ReplicaSessionI::getTimeout(const Ice::Current& current) const } void +ReplicaSessionI::setDatabaseObserver(const DatabaseObserverPrx& observer, const Ice::Current& current) +{ + Lock sync(*this); + if(_destroy) + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + _observer = observer; + _database->getObserverTopic(ApplicationObserverTopicName)->subscribe(_observer, _name); + _database->getObserverTopic(AdapterObserverTopicName)->subscribe(_observer, _name); + _database->getObserverTopic(ObjectObserverTopicName)->subscribe(_observer, _name); +} + +void ReplicaSessionI::setEndpoints(const StringObjectProxyDict& endpoints, const Ice::Current& current) { { @@ -88,15 +100,13 @@ ReplicaSessionI::setEndpoints(const StringObjectProxyDict& endpoints, const Ice: void ReplicaSessionI::registerWellKnownObjects(const ObjectInfoSeq& objects, const Ice::Current& current) { + Lock sync(*this); + if(_destroy) { - Lock sync(*this); - if(_destroy) - { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); - } - _replicaWellKnownObjects = objects; + throw Ice::ObjectNotExistException(__FILE__, __LINE__); } - _wellKnownObjects->registerWellKnownObjects(objects); + _replicaWellKnownObjects = objects; + _database->addOrUpdateObjectsInDatabase(objects); } void @@ -126,17 +136,29 @@ ReplicaSessionI::destroy(const Ice::Current& current) } _destroy = true; } - _database->removeReplica(_name, this); - _wellKnownObjects->unregisterWellKnownObjects(_replicaWellKnownObjects); - if(shutdown) + if(_observer) { - ObjectInfo info; - info.type = InternalRegistry::ice_staticId(); - info.proxy = _internalRegistry; - _database->addObject(info, true); + _database->getObserverTopic(ApplicationObserverTopicName)->unsubscribe(_observer, _name); + _database->getObserverTopic(AdapterObserverTopicName)->unsubscribe(_observer, _name); + _database->getObserverTopic(ObjectObserverTopicName)->unsubscribe(_observer, _name); } - else + + if(!_replicaWellKnownObjects.empty()) + { + _database->removeObjectsInDatabase(_replicaWellKnownObjects); + if(shutdown) + { + ObjectInfo info; + info.type = InternalRegistry::ice_staticId(); + info.proxy = _internalRegistry; + _database->addObject(info, true); + } + } + + _database->removeReplica(_name, this, shutdown); + + if(!shutdown) { _wellKnownObjects->updateReplicatedWellKnownObjects(); // No need to update these if we're shutting down. } diff --git a/cpp/src/IceGrid/ReplicaSessionI.h b/cpp/src/IceGrid/ReplicaSessionI.h index db30d6f0330..25553f0843a 100644 --- a/cpp/src/IceGrid/ReplicaSessionI.h +++ b/cpp/src/IceGrid/ReplicaSessionI.h @@ -30,10 +30,11 @@ class ReplicaSessionI : public ReplicaSession, public IceUtil::Mutex public: ReplicaSessionI(const DatabasePtr&, const WellKnownObjectsManagerPtr&, const std::string&, const RegistryInfo&, - const InternalRegistryPrx&, const DatabaseObserverPrx&, int); + const InternalRegistryPrx&, int); virtual void keepAlive(const Ice::Current&); virtual int getTimeout(const Ice::Current&) const; + virtual void setDatabaseObserver(const DatabaseObserverPrx&, const Ice::Current&); virtual void setEndpoints(const StringObjectProxyDict&, const Ice::Current&); virtual void registerWellKnownObjects(const ObjectInfoSeq&, const Ice::Current&); virtual void setAdapterDirectProxy(const std::string&, const std::string&, const Ice::ObjectPrx&, @@ -44,7 +45,6 @@ public: virtual IceUtil::Time timestamp() const; const InternalRegistryPrx& getInternalRegistry() const { return _internalRegistry; } - const DatabaseObserverPrx& getObserver() const { return _databaseObserver; } const RegistryInfo& getInfo() const { return _info; } Ice::ObjectPrx getEndpoint(const std::string&); @@ -57,9 +57,9 @@ private: const TraceLevelsPtr _traceLevels; const std::string _name; const InternalRegistryPrx _internalRegistry; - const DatabaseObserverPrx _databaseObserver; const RegistryInfo _info; const int _timeout; + DatabaseObserverPrx _observer; ObjectInfoSeq _replicaWellKnownObjects; StringObjectProxyDict _replicaEndpoints; IceUtil::Time _timestamp; diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp index 6feb9f41403..705dd2e3eb9 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.cpp +++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp @@ -23,9 +23,12 @@ class MasterDatabaseObserverI : public DatabaseObserver, public IceUtil::Mutex { public: - MasterDatabaseObserverI(const DatabasePtr& database, ReplicaSessionManager& manager) : + MasterDatabaseObserverI(const ReplicaSessionManager::ThreadPtr& thread, + const DatabasePtr& database, + const ReplicaSessionPrx& session) : + _thread(thread), _database(database), - _manager(manager) + _session(session) { } @@ -33,6 +36,7 @@ public: applicationInit(int, const ApplicationInfoSeq& applications, const Ice::Current& current) { _database->syncApplications(applications); + receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx)); } virtual void @@ -49,7 +53,7 @@ public: os << ex << ":\n" << ex.reason; failure = os.str(); } - _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -66,7 +70,7 @@ public: os << ex << ":\napplication: " << ex.name; failure = os.str(); } - _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -89,13 +93,14 @@ public: os << ex << ":\napplication: " << ex.name; failure = os.str(); } - _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); } virtual void adapterInit(const AdapterInfoSeq& adapters, const Ice::Current& current) { _database->syncAdapters(adapters); + receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx)); } virtual void @@ -106,7 +111,7 @@ public: { failure = "adapter `" + info.id + "' already exists and belongs to an application"; } - _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -117,7 +122,7 @@ public: { failure = "adapter `" + info.id + "' already exists and belongs to an application"; } - _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -128,13 +133,14 @@ public: { failure = "adapter `" + id + "' already exists and belongs to an application"; } - _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); } virtual void objectInit(const ObjectInfoSeq& objects, const Ice::Current& current) { _database->syncObjects(objects); + receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx)); } virtual void @@ -158,7 +164,7 @@ public: os << "id: " << info.proxy->ice_getCommunicator()->identityToString(info.proxy->ice_getIdentity()); failure = os.str(); } - _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -181,7 +187,7 @@ public: os << ex << ":\n" << ex.reason; failure = os.str(); } - _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -206,7 +212,7 @@ public: catch(const ObjectNotRegisteredException&) { } - _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); } private: @@ -225,8 +231,25 @@ private: return -1; } + void + receivedUpdate(TopicName name, int serial, const string& failure = string()) + { + try + { + _session->receivedUpdate(name, serial, failure); + } + catch(const Ice::LocalException&) + { + } + if(!failure.empty()) + { + _thread->destroyActiveSession(); + } + } + + const ReplicaSessionManager::ThreadPtr _thread; const DatabasePtr _database; - ReplicaSessionManager& _manager; + const ReplicaSessionPrx _session; }; @@ -243,19 +266,13 @@ ReplicaSessionManager::create(const string& name, const WellKnownObjectsManagerPtr& wellKnownObjects, const InternalRegistryPrx& internalRegistry) { - Ice::CommunicatorPtr communicator = database->getCommunicator(); - string instanceName = communicator->getDefaultLocator()->ice_getIdentity().category; + Ice::CommunicatorPtr comm = database->getCommunicator(); + string instName = comm->getDefaultLocator()->ice_getIdentity().category; { Lock sync(*this); - _master = - InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(instanceName + "/InternalRegistry-Master")); - - Ice::ObjectPrx obsv = - database->getInternalAdapter()->addWithUUID(new MasterDatabaseObserverI(database, *this)); - - _observer = DatabaseObserverPrx::uncheckedCast(obsv); + _master = InternalRegistryPrx::uncheckedCast(comm->stringToProxy(instName + "/InternalRegistry-Master")); _name = name; _info = info; @@ -269,7 +286,7 @@ ReplicaSessionManager::create(const string& name, notifyAll(); } - _thread->tryCreateSession(_master); + _thread->tryCreateSession(0); } void @@ -318,27 +335,6 @@ ReplicaSessionManager::destroy() _thread->terminate(); _thread->getThreadControl().join(); - _thread = 0; -} - -void -ReplicaSessionManager::receivedUpdate(TopicName name, int serial, const string& failure) -{ - ReplicaSessionPrx session = _thread->getSession(); - if(session) - { - try - { - session->receivedUpdate(name, serial, failure); - } - catch(const Ice::LocalException&) - { - } - } - if(!failure.empty()) - { - _thread->destroyActiveSession(); - } } void @@ -403,12 +399,21 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti out << "trying to establish session with master replica"; } - ReplicaSessionPrx session = registry->registerReplica(_name, _info, _internalRegistry, _observer); + ReplicaSessionPrx session = registry->registerReplica(_name, _info, _internalRegistry); int t = session->getTimeout(); if(t > 0) { timeout = IceUtil::Time::seconds(t / 2); } + + // + // Create a new database observer servant and give its proxy + // to the session so that it can subscribe it. This call only + // returns once the observer is subscribed and initialized. + // + DatabaseObserverPtr servant = new MasterDatabaseObserverI(_thread, _database, session); + _observer = DatabaseObserverPrx::uncheckedCast(_database->getInternalAdapter()->addWithUUID(servant)); + session->setDatabaseObserver(_observer); // // Register all the well-known objects with the replica session. @@ -433,6 +438,18 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti } catch(const Ice::LocalException& ex) { + if(_observer) + { + try + { + _database->getInternalAdapter()->remove(_observer->ice_getIdentity()); + } + catch(const Ice::LocalException&) + { + } + _observer = 0; + } + // // Re-register all the well known objects with the local database. // @@ -468,5 +485,17 @@ ReplicaSessionManager::destroySession(const ReplicaSessionPrx& session) out << "couldn't destroy master replica session:\n" << ex; } } + + if(_observer) + { + try + { + _database->getInternalAdapter()->remove(_observer->ice_getIdentity()); + } + catch(const Ice::LocalException&) + { + } + _observer = 0; + } } diff --git a/cpp/src/IceGrid/ReplicaSessionManager.h b/cpp/src/IceGrid/ReplicaSessionManager.h index 6f895755ec6..15c918b8bbd 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.h +++ b/cpp/src/IceGrid/ReplicaSessionManager.h @@ -32,21 +32,6 @@ typedef IceUtil::Handle<TraceLevels> TraceLevelsPtr; class ReplicaSessionManager : public IceUtil::Monitor<IceUtil::Mutex> { public: - - ReplicaSessionManager(); - - void create(const std::string&, const RegistryInfo&, const DatabasePtr&, const WellKnownObjectsManagerPtr&, - const InternalRegistryPrx&); - void create(const InternalRegistryPrx&); - NodePrxSeq getNodes() const; - void destroy(); - - void receivedUpdate(TopicName, int, const std::string&); - void registerAllWellKnownObjects(); - ReplicaSessionPrx getSession() const { return _thread->getSession(); } - -private: - class Thread : public SessionKeepAliveThread<ReplicaSessionPrx, InternalRegistryPrx> { public: @@ -83,6 +68,19 @@ private: }; typedef IceUtil::Handle<Thread> ThreadPtr; + ReplicaSessionManager(); + + void create(const std::string&, const RegistryInfo&, const DatabasePtr&, const WellKnownObjectsManagerPtr&, + const InternalRegistryPrx&); + void create(const InternalRegistryPrx&); + NodePrxSeq getNodes() const; + void destroy(); + + void registerAllWellKnownObjects(); + ReplicaSessionPrx getSession() const { return _thread->getSession(); } + +private: + friend class Thread; ReplicaSessionPrx createSession(const InternalRegistryPrx&, IceUtil::Time&); diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h index fc358d19a87..ea656dccb42 100644 --- a/cpp/src/IceGrid/SessionManager.h +++ b/cpp/src/IceGrid/SessionManager.h @@ -74,7 +74,7 @@ public: } // - // If we failed to create the session with the factory an + // If we failed to create the session with the factory and // the factory proxy is a direct proxy, we check with the // Query interface if the factory proxy was updated. It's // possible that the factory was restarted for example. diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 48dbaea12dd..cbb3f7766dc 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -43,7 +43,7 @@ ObserverTopic::~ObserverTopic() } void -ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name, int serial) +ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name) { Lock sync(*this); if(!_topic) @@ -57,7 +57,9 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name, int ser if(!name.empty()) { + assert(_syncSubscribers.find(name) == _syncSubscribers.end()); _syncSubscribers.insert(name); + waitForSyncedSubscribers(_serial, name); } } @@ -72,6 +74,7 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name) if(!name.empty()) { + assert(_syncSubscribers.find(name) != _syncSubscribers.end()); _syncSubscribers.erase(name); map<int, set<string> >::iterator p = _waitForUpdates.begin(); @@ -109,7 +112,6 @@ void ObserverTopic::receivedUpdate(const string& name, int serial, const string& failure) { Lock sync(*this); - map<int, set<string> >::iterator p = _waitForUpdates.find(serial); if(p != _waitForUpdates.end()) { @@ -134,14 +136,22 @@ ObserverTopic::receivedUpdate(const string& name, int serial, const string& fail } void -ObserverTopic::waitForSyncedSubscribers(int serial) +ObserverTopic::waitForSyncedSubscribers(int serial, const string& name) { - if(_syncSubscribers.empty()) + if(_syncSubscribers.empty() && name.empty()) { return; } - _waitForUpdates.insert(make_pair(serial, _syncSubscribers)); + if(name.empty()) + { + assert(_waitForUpdates[serial].empty()); + _waitForUpdates[serial] = _syncSubscribers; + } + else + { + _waitForUpdates[serial].insert(name); + } // // Wait until all the updates are received. @@ -262,7 +272,7 @@ RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { registries.push_back(p->second); } - observer->registryInit(registries, getContext(-1)); + observer->registryInit(registries, getContext(_serial)); } NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -454,7 +464,7 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { nodes.push_back(p->second); } - observer->nodeInit(nodes, getContext(-1)); + observer->nodeInit(nodes, getContext(_serial)); } ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -596,7 +606,7 @@ ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { applications.push_back(p->second); } - observer->applicationInit(_serial, applications, getContext(-1)); + observer->applicationInit(_serial, applications, getContext(_serial)); } AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -707,7 +717,7 @@ AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { adapters.push_back(p->second); } - observer->adapterInit(adapters, getContext(-1)); + observer->adapterInit(adapters, getContext(_serial)); } ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -810,6 +820,87 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id) } void +ObjectObserverTopic::objectsAddedOrUpdated(int serial, const ObjectInfoSeq& infos) +{ + Lock sync(*this); + if(!_topic) + { + return; + } + updateSerial(serial); + + for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) + { + map<Ice::Identity, ObjectInfo>::iterator q = _objects.find(p->proxy->ice_getIdentity()); + if(q != _objects.end()) + { + q->second = *p; + try + { + _publisher->objectUpdated(*p, getContext(serial)); + } + catch(const Ice::LocalException& ex) + { + Ice::Warning out(_logger); + out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; + } + } + else + { + _objects.insert(make_pair(p->proxy->ice_getIdentity(), *p)); + try + { + _publisher->objectAdded(*p, getContext(serial)); + } + catch(const Ice::LocalException& ex) + { + Ice::Warning out(_logger); + out << "unexpected exception while publishing `objectAdded' update:\n" << ex; + } + } + } + + // + // We don't need to wait for the update to be received by the + // replicas here. This operation is only called internaly by + // IceGrid. + // + //waitForSyncedSubscribers(serial); +} + +void +ObjectObserverTopic::objectsRemoved(int serial, const ObjectInfoSeq& infos) +{ + Lock sync(*this); + if(!_topic) + { + return; + } + updateSerial(serial); + + for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) + { + _objects.erase(p->proxy->ice_getIdentity()); + try + { + _publisher->objectRemoved(p->proxy->ice_getIdentity(), getContext(serial)); + } + catch(const Ice::LocalException& ex) + { + Ice::Warning out(_logger); + out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; + } + } + + // + // We don't need to wait for the update to be received by the + // replicas here. This operation is only called internaly by + // IceGrid. + // + //waitForSyncedSubscribers(serial); +} + +void ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv); @@ -818,5 +909,5 @@ ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) { objects.push_back(p->second); } - observer->objectInit(objects, getContext(-1)); + observer->objectInit(objects, getContext(_serial)); } diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h index f8fc0312c44..56554537ce9 100644 --- a/cpp/src/IceGrid/Topics.h +++ b/cpp/src/IceGrid/Topics.h @@ -28,7 +28,7 @@ public: ObserverTopic(const IceStorm::TopicManagerPrx&, const std::string&); virtual ~ObserverTopic(); - void subscribe(const Ice::ObjectPrx&, const std::string& = std::string(), int = -1); + void subscribe(const Ice::ObjectPrx&, const std::string& = std::string()); void unsubscribe(const Ice::ObjectPrx&, const std::string& = std::string()); void destroy(); @@ -38,14 +38,13 @@ public: protected: - void waitForSyncedSubscribers(int); + void waitForSyncedSubscribers(int, const std::string& = std::string()); void updateSerial(int); Ice::Context getContext(int) const; Ice::LoggerPtr _logger; IceStorm::TopicPrx _topic; Ice::ObjectPrx _basePublisher; - std::set<Ice::Identity> _waitForSubscribe; int _serial; std::set<std::string> _syncSubscribers; @@ -148,6 +147,9 @@ public: void objectUpdated(int, const ObjectInfo&); void objectRemoved(int, const Ice::Identity&); + void objectsAddedOrUpdated(int, const ObjectInfoSeq&); + void objectsRemoved(int, const ObjectInfoSeq&); + virtual void initObserver(const Ice::ObjectPrx&); private: diff --git a/cpp/src/IceGrid/WellKnownObjectsManager.cpp b/cpp/src/IceGrid/WellKnownObjectsManager.cpp index 25e0c9e52f6..fde27fc5bdb 100644 --- a/cpp/src/IceGrid/WellKnownObjectsManager.cpp +++ b/cpp/src/IceGrid/WellKnownObjectsManager.cpp @@ -72,19 +72,7 @@ WellKnownObjectsManager::registerAll() // If initialized, the endpoints and well known objects are immutable. // updateReplicatedWellKnownObjects(); - registerWellKnownObjects(_wellKnownObjects); -} - -void -WellKnownObjectsManager::registerWellKnownObjects(const ObjectInfoSeq& objects) -{ - _database->addOrUpdateObjectsInDatabase(objects); -} - -void -WellKnownObjectsManager::unregisterWellKnownObjects(const ObjectInfoSeq& objects) -{ - _database->removeObjectsInDatabase(objects); + _database->addOrUpdateObjectsInDatabase(_wellKnownObjects); } void @@ -103,6 +91,8 @@ WellKnownObjectsManager::updateReplicatedWellKnownObjects() ObjectInfo info; ObjectInfoSeq objects; + Lock sync(*this); + Ice::ObjectPrx replicatedClientProxy = _database->getReplicatedEndpoints("Client", _endpoints["Client"]); id.name = "Query"; @@ -115,18 +105,6 @@ WellKnownObjectsManager::updateReplicatedWellKnownObjects() info.proxy = replicatedClientProxy->ice_identity(id); objects.push_back(info); - Ice::ObjectPrx replicatedInternalProxy = _database->getReplicatedEndpoints("Internal", _endpoints["Internal"]); - - id.name = "NullPermissionsVerifier"; - info.type = Glacier2::PermissionsVerifier::ice_staticId(); - info.proxy = replicatedInternalProxy->ice_identity(id); - objects.push_back(info); - - id.name = "NullSSLPermissionsVerifier"; - info.type = Glacier2::SSLPermissionsVerifier::ice_staticId(); - info.proxy = replicatedInternalProxy->ice_identity(id); - objects.push_back(info); - _database->addOrUpdateObjectsInDatabase(objects); } diff --git a/cpp/src/IceGrid/WellKnownObjectsManager.h b/cpp/src/IceGrid/WellKnownObjectsManager.h index b17a077e0b5..43112c42b4d 100644 --- a/cpp/src/IceGrid/WellKnownObjectsManager.h +++ b/cpp/src/IceGrid/WellKnownObjectsManager.h @@ -30,10 +30,6 @@ public: void registerAll(); void registerAll(const ReplicaSessionPrx&); - - void registerWellKnownObjects(const ObjectInfoSeq&); - void unregisterWellKnownObjects(const ObjectInfoSeq&); - void updateReplicatedWellKnownObjects(); Ice::ObjectPrx getEndpoints(const std::string&); diff --git a/cpp/test/IceGrid/allocation/Makefile b/cpp/test/IceGrid/allocation/Makefile index a821b57fb1d..8aad13a7af0 100644 --- a/cpp/test/IceGrid/allocation/Makefile +++ b/cpp/test/IceGrid/allocation/Makefile @@ -11,8 +11,9 @@ top_srcdir = ../../.. CLIENT = client SERVER = server +VERIFIER = verifier -TARGETS = $(CLIENT) $(SERVER) +TARGETS = $(CLIENT) $(SERVER) $(VERIFIER) OBJS = Test.o @@ -22,9 +23,12 @@ COBJS = Client.o \ SOBJS = TestI.o \ Server.o +VOBJS = PermissionsVerifier.o + SRCS = $(OBJS:.o=.cpp) \ $(COBJS:.o=.cpp) \ - $(SOBJS:.o=.cpp) + $(SOBJS:.o=.cpp) \ + $(VOBJS:.obj=.cpp) SLICE_SRCS = Test.ice @@ -41,6 +45,10 @@ $(SERVER): $(OBJS) $(SOBJS) rm -f $@ $(CXX) $(LDFLAGS) -o $@ $(OBJS) $(SOBJS) $(LIBS) +$(VERIFIER): $(VOBJS) + rm -f $@ + $(CXX) $(LDFLAGS) -o $@ $(VOBJS) -lGlacier2 $(LIBS) + clean:: rm -rf db/node db/registry db/node-1 db/node-2 diff --git a/cpp/test/IceGrid/allocation/Makefile.mak b/cpp/test/IceGrid/allocation/Makefile.mak index 4717db7af67..f8b9c45d334 100644 --- a/cpp/test/IceGrid/allocation/Makefile.mak +++ b/cpp/test/IceGrid/allocation/Makefile.mak @@ -11,8 +11,9 @@ top_srcdir = ..\..\.. CLIENT = client.exe SERVER = server.exe +VERIFIER = verifier.exe -TARGETS = $(CLIENT) $(SERVER) +TARGETS = $(CLIENT) $(SERVER) $(VERIFIER) COBJS = Test.obj \ Client.obj \ @@ -22,8 +23,11 @@ SOBJS = Test.obj \ TestI.obj \ Server.obj +VOBJS = PermissionsVerifier.obj + SRCS = $(COBJS:.obj=.cpp) \ - $(SOBJS:.obj=.cpp) + $(SOBJS:.obj=.cpp) \ + $(VOBJS:.obj=.cpp) !include $(top_srcdir)/config/Make.rules.mak @@ -33,6 +37,7 @@ LINKWITH = $(LIBS) icegrid$(LIBSUFFIX).lib glacier2$(LIBSUFFIX).lib !if "$(BORLAND_HOME)" == "" & "$(OPTIMIZE)" != "yes" CPDBFLAGS = /pdb:$(CLIENT:.exe=.pdb) SPDBFLAGS = /pdb:$(SERVER:.exe=.pdb) +VPDBFLAGS = /pdb:$(VERIFIER:.exe=.pdb) !endif $(CLIENT): $(COBJS) @@ -43,6 +48,10 @@ $(SERVER): $(SOBJS) del /q $@ $(LINK) $(LD_EXEFLAGS) $(SPDBFLAGS) $(SOBJS) $(PREOUT)$@ $(PRELIBS)$(LIBS) +$(VERIFIER): $(VOBJS) + del /q $@ + $(LINK) $(LD_EXEFLAGS) $(VPDBFLAGS) $(VOBJS) $(PREOUT)$@ $(PRELIBS)$(LINKWITH) + clean:: del /q Test.cpp Test.h diff --git a/cpp/test/IceGrid/allocation/application.xml b/cpp/test/IceGrid/allocation/application.xml index 4f49206d7e7..cb603838a55 100644 --- a/cpp/test/IceGrid/allocation/application.xml +++ b/cpp/test/IceGrid/allocation/application.xml @@ -25,7 +25,7 @@ <server-instance template="Glacier2" id="Glacier2" endpoints="default -p 12347 -h 127.0.0.1 -t 10000" - verifier="IceGrid/NullPermissionsVerifier" + verifier="PermissionsVerifier" manager="IceGrid/SessionManager"/> <server id="ObjectAllocation" exe="${test.dir}/server" activation="on-demand" pwd="."> @@ -38,6 +38,12 @@ </adapter> </server> + <server id="PermissionsVerifier" exe="${test.dir}/verifier" activation="on-demand"> + <adapter name="PermissionsVerifier" endpoints="default"> + <object identity="PermissionsVerifier" type="::Glacier2::PermissionsVerifier"/> + </adapter> + </server> + <server id="ServerAllocation" exe="${test.dir}/server" activation="on-demand" pwd="." allocatable="true"> <adapter name="Server" endpoints="default" id="ServerAlloc"> <allocatable identity="allocatable3" type="::TestServer1"/> |