diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.cpp | 371 |
1 files changed, 139 insertions, 232 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp index 6731a14ffdb..8e9cfe33de1 100644 --- a/cpp/src/IceStorm/TopicManagerI.cpp +++ b/cpp/src/IceStorm/TopicManagerI.cpp @@ -14,8 +14,6 @@ #include <IceStorm/NodeI.h> #include <IceStorm/Observers.h> #include <IceStorm/Subscriber.h> -#include <IceStorm/SubscriberMap.h> -#include <IceStorm/LLUMap.h> #include <IceStorm/Util.h> #include <Ice/SliceChecksums.h> @@ -26,38 +24,25 @@ using namespace IceStorm; using namespace IceStormElection; using namespace IceStormInternal; -using namespace Freeze; - namespace { -const string subscriberDbName = "subscribers"; - void -halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex) +logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex) { - { - Ice::Error error(com->getLogger()); - error << "fatal exception: " << ex << "\n*** Aborting application ***"; - } - - abort(); + Ice::Error error(com->getLogger()); + error << "LMDB error: " << ex; } class TopicManagerI : public TopicManagerInternal { public: - TopicManagerI(const InstancePtr& instance, const TopicManagerImplPtr& impl) : + TopicManagerI(const PersistentInstancePtr& instance, const TopicManagerImplPtr& impl) : _instance(instance), _impl(impl) { } - ~TopicManagerI() - { - //cout << "~TopicManagerI" << endl; - } - virtual TopicPrx create(const string& id, const Ice::Current&) { while(true) @@ -130,7 +115,7 @@ private: } } - const InstancePtr _instance; + const PersistentInstancePtr _instance; const TopicManagerImplPtr _impl; }; @@ -138,17 +123,12 @@ class ReplicaObserverI : public ReplicaObserver { public: - ReplicaObserverI(const InstancePtr& instance, const TopicManagerImplPtr& impl) : + ReplicaObserverI(const PersistentInstancePtr& instance, const TopicManagerImplPtr& impl) : _instance(instance), _impl(impl) { } - ~ReplicaObserverI() - { - //cout << "~ReplicaObserverI" << endl; - } - virtual void init(const LogUpdate& llu, const TopicContentSeq& content, const Ice::Current&) { NodeIPtr node = _instance->node(); @@ -164,7 +144,6 @@ public: try { ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__); - //cout << "createTopic: " << llu.generation << " node generation: " << unlock.generation() << endl; _impl->observerCreateTopic(llu, name); } catch(const ObserverInconsistencyException& e) @@ -228,7 +207,7 @@ public: private: - const InstancePtr _instance; + const PersistentInstancePtr _instance; const TopicManagerImplPtr _impl; }; @@ -241,11 +220,6 @@ public: { } - ~TopicManagerSyncI() - { - //cout << "~TopicManagerSyncI" << endl; - } - virtual void getContent(LogUpdate& llu, TopicContentSeq& content, const Ice::Current&) { _impl->getContent(llu, content); @@ -258,42 +232,10 @@ private: } -namespace IceStorm -{ - -string -identityToTopicName(const Ice::Identity& id) -{ - // - // Work out the topic name. If the category is empty then we're in - // backwards compatibility mode and the name is just - // identity.name. Otherwise identity.name is topic.<topicname>. - // - if(id.category.empty()) - { - return id.name; - } - - assert(id.name.length() > 6 && id.name.compare(0, 6, "topic.") == 0); - return id.name.substr(6); -} - -Ice::Identity -nameToIdentity(const InstancePtr& instance, const string& name) -{ - // Identity is instanceName>/topic.<topicname> - Ice::Identity id; - id.category = instance->instanceName(); - id.name = "topic." + name; - - return id; -} - -} - -TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) : +TopicManagerImpl::TopicManagerImpl(const PersistentInstancePtr& instance) : _instance(instance), - _connection(Freeze::createConnection(instance->communicator(), instance->serviceName())) + _lluMap(instance->lluMap()), + _subscriberMap(instance->subscriberMap()) { try { @@ -309,7 +251,6 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) : // here. _managerImpl = new TopicManagerI(instance, this); - Ice::PropertiesPtr properties = _instance->communicator()->getProperties(); // If there is no node adapter we don't need to start the // observer, nor sync since we're not replicating. if(_instance->nodeAdapter()) @@ -320,35 +261,43 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) : _sync = _instance->nodeAdapter()->addWithUUID(_syncImpl); } - - - // Ensure that the llu counter is present in the log. - LogUpdate empty = {0, 0}; - putLLU(_connection, empty); - - // Recreate each of the topics. - SubscriberMap subscriberMap(_connection, subscriberDbName); - SubscriberMap::const_iterator p = subscriberMap.begin(); - while(p != subscriberMap.end()) { - // This record has to be a place holder record, otherwise - // there is a database bug. - assert(p->first.id.name.empty() && p->first.id.category.empty()); + IceDB::ReadWriteTxn txn(_instance->dbEnv()); - Ice::Identity topic = p->first.topic; + // Ensure that the llu counter is present in the log. + LogUpdate empty = {0, 0}; + _instance->lluMap().put(txn, lluDbKey, empty); - // Skip the place holder. - ++p; - SubscriberRecordSeq content; - while(p != subscriberMap.end() && p->first.topic == topic) + // Recreate each of the topics. + SubscriberRecordKey k; + SubscriberRecord v; + + SubscriberMapRWCursor cursor(_subscriberMap, txn); + if(cursor.get(k, v, MDB_FIRST)) { - content.push_back(p->second); - ++p; + bool moreTopics = false; + do + { + // This record has to be a place holder record, otherwise + // there is a database bug. + assert(k.id.name.empty() && k.id.category.empty()); + + Ice::Identity topic = k.topic; + + bool moreTopics; + SubscriberRecordSeq content; + while((moreTopics = cursor.get(k, v, MDB_NEXT)) && k.topic == topic) + { + content.push_back(v); + } + + string name = identityToTopicName(topic); + installTopic(name, topic, false, content); + } while(moreTopics); } - string name = identityToTopicName(topic); - installTopic(name, topic, false, content); + txn.commit(); } } catch(...) @@ -360,11 +309,6 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) : __setNoDelete(false); } -TopicManagerImpl::~TopicManagerImpl() -{ - //cout << "~TopicManagerImpl" << endl; -} - TopicPrx TopicManagerImpl::create(const string& name) { @@ -378,40 +322,30 @@ TopicManagerImpl::create(const string& name) throw ex; } - // Identity is instanceName>/topic.<topicname> + // Identity is <instanceName>/topic.<topicname> Ice::Identity id = nameToIdentity(_instance, name); - LogUpdate llu; - for(;;) + LogUpdate llu; + try { - try - { - TransactionHolder txn(_connection); + IceDB::ReadWriteTxn txn(_instance->dbEnv()); - SubscriberRecordKey key; - key.topic = id; - SubscriberRecord rec; - rec.link = false; - rec.cost = 0; + SubscriberRecordKey key; + key.topic = id; + SubscriberRecord rec; + rec.link = false; + rec.cost = 0; - SubscriberMap subscriberMap(_connection, subscriberDbName); - subscriberMap.put(SubscriberMap::value_type(key, rec)); + _subscriberMap.put(txn, key, rec); - llu = getLLU(_connection); - llu.iteration++; - putLLU(_connection, llu); + llu = getIncrementedLLU(txn, _lluMap); - txn.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_instance->communicator(), ex); - } + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_instance->communicator(), ex); + throw; // will become UnknownException in caller } _instance->observers()->createTopic(llu, name); @@ -484,47 +418,39 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont // First we update the database state, and then we update our // internal state. - for(;;) + try { - try - { - TransactionHolder txn(_connection); + IceDB::ReadWriteTxn txn(_instance->dbEnv()); - putLLU(_connection, llu); + _lluMap.put(txn, lluDbKey, llu); + + _subscriberMap.clear(txn); + + for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p) + { + SubscriberRecordKey key; + key.topic = p->id; + SubscriberRecord rec; + rec.link = false; + rec.cost = 0; - SubscriberMap subscriberMap(_connection, subscriberDbName); - subscriberMap.clear(); + _subscriberMap.put(txn, key, rec); - for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p) + for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q) { SubscriberRecordKey key; key.topic = p->id; - SubscriberRecord rec; - rec.link = false; - rec.cost = 0; - - subscriberMap.put(SubscriberMap::value_type(key, rec)); + key.id = q->id; - for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q) - { - SubscriberRecordKey key; - key.topic = p->id; - key.id = q->id; - - subscriberMap.put(SubscriberMap::value_type(key, *q)); - } + _subscriberMap.put(txn, key, *q); } - txn.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_instance->communicator(), ex); } + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_instance->communicator(), ex); + throw; // will become UnknownException in caller } // We do this with two scans. The first runs through the topics @@ -586,39 +512,32 @@ TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name) Lock sync(*this); Ice::Identity id = nameToIdentity(_instance, name); - for(;;) + try { - try - { - TransactionHolder txn(_connection); - - SubscriberRecordKey key; - key.topic = id; - SubscriberRecord rec; - rec.link = false; - rec.cost = 0; + IceDB::ReadWriteTxn txn(_instance->dbEnv()); - SubscriberMap subscriberMap(_connection, subscriberDbName); - if(subscriberMap.find(key) != subscriberMap.end()) - { - throw ObserverInconsistencyException("topic exists: " + name); - } - subscriberMap.put(SubscriberMap::value_type(key, rec)); + SubscriberRecordKey key; + key.topic = id; + SubscriberRecord rec; + rec.link = false; + rec.cost = 0; - putLLU(_connection, llu); - - txn.commit(); - break; - } - catch(const DeadlockException&) + if(_subscriberMap.find(txn, key)) { - continue; - } - catch(const DatabaseException& ex) - { - halt(_instance->communicator(), ex); + throw ObserverInconsistencyException("topic exists: " + name); } + _subscriberMap.put(txn, key, rec); + + _lluMap.put(txn, lluDbKey, llu); + + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_instance->communicator(), ex); + throw; // will become UnknownException in caller } + installTopic(name, id, true); } @@ -681,48 +600,41 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content) reap(); } - for(;;) + try { - try + content.clear(); + for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) { - content.clear(); - for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) - { - TopicContent rec = p->second->getContent(); - content.push_back(rec); - } - llu = getLLU(_connection); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_instance->communicator(), ex); + TopicContent rec = p->second->getContent(); + content.push_back(rec); } + + IceDB::ReadOnlyTxn txn(_instance->dbEnv()); + _lluMap.get(txn, lluDbKey, llu); + } + catch(const IceDB::LMDBException& ex) + { + logError(_instance->communicator(), ex); + throw; // will become UnknownException in caller } } LogUpdate TopicManagerImpl::getLastLogUpdate() const { - for(;;) + LogUpdate llu; + try { - try - { - return getLLU(_connection); - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) - { - halt(_instance->communicator(), ex); - } + IceDB::ReadOnlyTxn txn(_instance->dbEnv()); + _lluMap.get(txn, lluDbKey, llu); + } + catch(const IceDB::LMDBException& ex) + { + logError(_instance->communicator(), ex); + throw; // will become UnknownException in caller } + + return llu; } void @@ -757,31 +669,26 @@ TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate& // elected and gets the latest database state it immediately // updates the llu stamp. // - for(;;) + try { - try - { - content.clear(); + content.clear(); - TransactionHolder txn(_connection); + IceDB::ReadWriteTxn txn(_instance->dbEnv()); - for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) - { - TopicContent rec = p->second->getContent(); - content.push_back(rec); - } - putLLU(_connection, llu); - txn.commit(); - break; - } - catch(const DeadlockException&) - { - continue; - } - catch(const DatabaseException& ex) + for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) { - halt(_instance->communicator(), ex); + TopicContent rec = p->second->getContent(); + content.push_back(rec); } + + _lluMap.put(txn, lluDbKey, llu); + + txn.commit(); + } + catch(const IceDB::LMDBException& ex) + { + logError(_instance->communicator(), ex); + throw; // will become UnknownException in caller } // Now initialize the observers. |