summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicManagerI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicManagerI.cpp371
1 files changed, 139 insertions, 232 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp
index 6731a14ffdb..8e9cfe33de1 100644
--- a/cpp/src/IceStorm/TopicManagerI.cpp
+++ b/cpp/src/IceStorm/TopicManagerI.cpp
@@ -14,8 +14,6 @@
#include <IceStorm/NodeI.h>
#include <IceStorm/Observers.h>
#include <IceStorm/Subscriber.h>
-#include <IceStorm/SubscriberMap.h>
-#include <IceStorm/LLUMap.h>
#include <IceStorm/Util.h>
#include <Ice/SliceChecksums.h>
@@ -26,38 +24,25 @@ using namespace IceStorm;
using namespace IceStormElection;
using namespace IceStormInternal;
-using namespace Freeze;
-
namespace
{
-const string subscriberDbName = "subscribers";
-
void
-halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
+logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
{
- {
- Ice::Error error(com->getLogger());
- error << "fatal exception: " << ex << "\n*** Aborting application ***";
- }
-
- abort();
+ Ice::Error error(com->getLogger());
+ error << "LMDB error: " << ex;
}
class TopicManagerI : public TopicManagerInternal
{
public:
- TopicManagerI(const InstancePtr& instance, const TopicManagerImplPtr& impl) :
+ TopicManagerI(const PersistentInstancePtr& instance, const TopicManagerImplPtr& impl) :
_instance(instance), _impl(impl)
{
}
- ~TopicManagerI()
- {
- //cout << "~TopicManagerI" << endl;
- }
-
virtual TopicPrx create(const string& id, const Ice::Current&)
{
while(true)
@@ -130,7 +115,7 @@ private:
}
}
- const InstancePtr _instance;
+ const PersistentInstancePtr _instance;
const TopicManagerImplPtr _impl;
};
@@ -138,17 +123,12 @@ class ReplicaObserverI : public ReplicaObserver
{
public:
- ReplicaObserverI(const InstancePtr& instance, const TopicManagerImplPtr& impl) :
+ ReplicaObserverI(const PersistentInstancePtr& instance, const TopicManagerImplPtr& impl) :
_instance(instance),
_impl(impl)
{
}
- ~ReplicaObserverI()
- {
- //cout << "~ReplicaObserverI" << endl;
- }
-
virtual void init(const LogUpdate& llu, const TopicContentSeq& content, const Ice::Current&)
{
NodeIPtr node = _instance->node();
@@ -164,7 +144,6 @@ public:
try
{
ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
- //cout << "createTopic: " << llu.generation << " node generation: " << unlock.generation() << endl;
_impl->observerCreateTopic(llu, name);
}
catch(const ObserverInconsistencyException& e)
@@ -228,7 +207,7 @@ public:
private:
- const InstancePtr _instance;
+ const PersistentInstancePtr _instance;
const TopicManagerImplPtr _impl;
};
@@ -241,11 +220,6 @@ public:
{
}
- ~TopicManagerSyncI()
- {
- //cout << "~TopicManagerSyncI" << endl;
- }
-
virtual void getContent(LogUpdate& llu, TopicContentSeq& content, const Ice::Current&)
{
_impl->getContent(llu, content);
@@ -258,42 +232,10 @@ private:
}
-namespace IceStorm
-{
-
-string
-identityToTopicName(const Ice::Identity& id)
-{
- //
- // Work out the topic name. If the category is empty then we're in
- // backwards compatibility mode and the name is just
- // identity.name. Otherwise identity.name is topic.<topicname>.
- //
- if(id.category.empty())
- {
- return id.name;
- }
-
- assert(id.name.length() > 6 && id.name.compare(0, 6, "topic.") == 0);
- return id.name.substr(6);
-}
-
-Ice::Identity
-nameToIdentity(const InstancePtr& instance, const string& name)
-{
- // Identity is instanceName>/topic.<topicname>
- Ice::Identity id;
- id.category = instance->instanceName();
- id.name = "topic." + name;
-
- return id;
-}
-
-}
-
-TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) :
+TopicManagerImpl::TopicManagerImpl(const PersistentInstancePtr& instance) :
_instance(instance),
- _connection(Freeze::createConnection(instance->communicator(), instance->serviceName()))
+ _lluMap(instance->lluMap()),
+ _subscriberMap(instance->subscriberMap())
{
try
{
@@ -309,7 +251,6 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) :
// here.
_managerImpl = new TopicManagerI(instance, this);
- Ice::PropertiesPtr properties = _instance->communicator()->getProperties();
// If there is no node adapter we don't need to start the
// observer, nor sync since we're not replicating.
if(_instance->nodeAdapter())
@@ -320,35 +261,43 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) :
_sync = _instance->nodeAdapter()->addWithUUID(_syncImpl);
}
-
-
- // Ensure that the llu counter is present in the log.
- LogUpdate empty = {0, 0};
- putLLU(_connection, empty);
-
- // Recreate each of the topics.
- SubscriberMap subscriberMap(_connection, subscriberDbName);
- SubscriberMap::const_iterator p = subscriberMap.begin();
- while(p != subscriberMap.end())
{
- // This record has to be a place holder record, otherwise
- // there is a database bug.
- assert(p->first.id.name.empty() && p->first.id.category.empty());
+ IceDB::ReadWriteTxn txn(_instance->dbEnv());
- Ice::Identity topic = p->first.topic;
+ // Ensure that the llu counter is present in the log.
+ LogUpdate empty = {0, 0};
+ _instance->lluMap().put(txn, lluDbKey, empty);
- // Skip the place holder.
- ++p;
- SubscriberRecordSeq content;
- while(p != subscriberMap.end() && p->first.topic == topic)
+ // Recreate each of the topics.
+ SubscriberRecordKey k;
+ SubscriberRecord v;
+
+ SubscriberMapRWCursor cursor(_subscriberMap, txn);
+ if(cursor.get(k, v, MDB_FIRST))
{
- content.push_back(p->second);
- ++p;
+ bool moreTopics = false;
+ do
+ {
+ // This record has to be a place holder record, otherwise
+ // there is a database bug.
+ assert(k.id.name.empty() && k.id.category.empty());
+
+ Ice::Identity topic = k.topic;
+
+ bool moreTopics;
+ SubscriberRecordSeq content;
+ while((moreTopics = cursor.get(k, v, MDB_NEXT)) && k.topic == topic)
+ {
+ content.push_back(v);
+ }
+
+ string name = identityToTopicName(topic);
+ installTopic(name, topic, false, content);
+ } while(moreTopics);
}
- string name = identityToTopicName(topic);
- installTopic(name, topic, false, content);
+ txn.commit();
}
}
catch(...)
@@ -360,11 +309,6 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) :
__setNoDelete(false);
}
-TopicManagerImpl::~TopicManagerImpl()
-{
- //cout << "~TopicManagerImpl" << endl;
-}
-
TopicPrx
TopicManagerImpl::create(const string& name)
{
@@ -378,40 +322,30 @@ TopicManagerImpl::create(const string& name)
throw ex;
}
- // Identity is instanceName>/topic.<topicname>
+ // Identity is <instanceName>/topic.<topicname>
Ice::Identity id = nameToIdentity(_instance, name);
- LogUpdate llu;
- for(;;)
+ LogUpdate llu;
+ try
{
- try
- {
- TransactionHolder txn(_connection);
+ IceDB::ReadWriteTxn txn(_instance->dbEnv());
- SubscriberRecordKey key;
- key.topic = id;
- SubscriberRecord rec;
- rec.link = false;
- rec.cost = 0;
+ SubscriberRecordKey key;
+ key.topic = id;
+ SubscriberRecord rec;
+ rec.link = false;
+ rec.cost = 0;
- SubscriberMap subscriberMap(_connection, subscriberDbName);
- subscriberMap.put(SubscriberMap::value_type(key, rec));
+ _subscriberMap.put(txn, key, rec);
- llu = getLLU(_connection);
- llu.iteration++;
- putLLU(_connection, llu);
+ llu = getIncrementedLLU(txn, _lluMap);
- txn.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_instance->communicator(), ex);
- }
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_instance->communicator(), ex);
+ throw; // will become UnknownException in caller
}
_instance->observers()->createTopic(llu, name);
@@ -484,47 +418,39 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont
// First we update the database state, and then we update our
// internal state.
- for(;;)
+ try
{
- try
- {
- TransactionHolder txn(_connection);
+ IceDB::ReadWriteTxn txn(_instance->dbEnv());
- putLLU(_connection, llu);
+ _lluMap.put(txn, lluDbKey, llu);
+
+ _subscriberMap.clear(txn);
+
+ for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
+ {
+ SubscriberRecordKey key;
+ key.topic = p->id;
+ SubscriberRecord rec;
+ rec.link = false;
+ rec.cost = 0;
- SubscriberMap subscriberMap(_connection, subscriberDbName);
- subscriberMap.clear();
+ _subscriberMap.put(txn, key, rec);
- for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
+ for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q)
{
SubscriberRecordKey key;
key.topic = p->id;
- SubscriberRecord rec;
- rec.link = false;
- rec.cost = 0;
-
- subscriberMap.put(SubscriberMap::value_type(key, rec));
+ key.id = q->id;
- for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q)
- {
- SubscriberRecordKey key;
- key.topic = p->id;
- key.id = q->id;
-
- subscriberMap.put(SubscriberMap::value_type(key, *q));
- }
+ _subscriberMap.put(txn, key, *q);
}
- txn.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_instance->communicator(), ex);
}
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_instance->communicator(), ex);
+ throw; // will become UnknownException in caller
}
// We do this with two scans. The first runs through the topics
@@ -586,39 +512,32 @@ TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name)
Lock sync(*this);
Ice::Identity id = nameToIdentity(_instance, name);
- for(;;)
+ try
{
- try
- {
- TransactionHolder txn(_connection);
-
- SubscriberRecordKey key;
- key.topic = id;
- SubscriberRecord rec;
- rec.link = false;
- rec.cost = 0;
+ IceDB::ReadWriteTxn txn(_instance->dbEnv());
- SubscriberMap subscriberMap(_connection, subscriberDbName);
- if(subscriberMap.find(key) != subscriberMap.end())
- {
- throw ObserverInconsistencyException("topic exists: " + name);
- }
- subscriberMap.put(SubscriberMap::value_type(key, rec));
+ SubscriberRecordKey key;
+ key.topic = id;
+ SubscriberRecord rec;
+ rec.link = false;
+ rec.cost = 0;
- putLLU(_connection, llu);
-
- txn.commit();
- break;
- }
- catch(const DeadlockException&)
+ if(_subscriberMap.find(txn, key))
{
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_instance->communicator(), ex);
+ throw ObserverInconsistencyException("topic exists: " + name);
}
+ _subscriberMap.put(txn, key, rec);
+
+ _lluMap.put(txn, lluDbKey, llu);
+
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_instance->communicator(), ex);
+ throw; // will become UnknownException in caller
}
+
installTopic(name, id, true);
}
@@ -681,48 +600,41 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content)
reap();
}
- for(;;)
+ try
{
- try
+ content.clear();
+ for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
{
- content.clear();
- for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
- {
- TopicContent rec = p->second->getContent();
- content.push_back(rec);
- }
- llu = getLLU(_connection);
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_instance->communicator(), ex);
+ TopicContent rec = p->second->getContent();
+ content.push_back(rec);
}
+
+ IceDB::ReadOnlyTxn txn(_instance->dbEnv());
+ _lluMap.get(txn, lluDbKey, llu);
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_instance->communicator(), ex);
+ throw; // will become UnknownException in caller
}
}
LogUpdate
TopicManagerImpl::getLastLogUpdate() const
{
- for(;;)
+ LogUpdate llu;
+ try
{
- try
- {
- return getLLU(_connection);
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_instance->communicator(), ex);
- }
+ IceDB::ReadOnlyTxn txn(_instance->dbEnv());
+ _lluMap.get(txn, lluDbKey, llu);
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_instance->communicator(), ex);
+ throw; // will become UnknownException in caller
}
+
+ return llu;
}
void
@@ -757,31 +669,26 @@ TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate&
// elected and gets the latest database state it immediately
// updates the llu stamp.
//
- for(;;)
+ try
{
- try
- {
- content.clear();
+ content.clear();
- TransactionHolder txn(_connection);
+ IceDB::ReadWriteTxn txn(_instance->dbEnv());
- for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
- {
- TopicContent rec = p->second->getContent();
- content.push_back(rec);
- }
- putLLU(_connection, llu);
- txn.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
+ for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
{
- halt(_instance->communicator(), ex);
+ TopicContent rec = p->second->getContent();
+ content.push_back(rec);
}
+
+ _lluMap.put(txn, lluDbKey, llu);
+
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_instance->communicator(), ex);
+ throw; // will become UnknownException in caller
}
// Now initialize the observers.