summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp387
1 files changed, 164 insertions, 223 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index 88dc8af7acd..55f52489158 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -13,8 +13,6 @@
#include <IceStorm/TraceLevels.h>
#include <IceStorm/NodeI.h>
#include <IceStorm/Observers.h>
-#include <IceStorm/SubscriberMap.h>
-#include <IceStorm/LLUMap.h>
#include <IceStorm/Util.h>
#include <Ice/LoggerUtil.h>
#include <algorithm>
@@ -24,22 +22,14 @@ using namespace IceStorm;
using namespace IceStormElection;
using namespace IceStormInternal;
-using namespace Freeze;
-
namespace
{
-const string subscriberDbName = "subscribers";
-
void
-halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
+logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
{
- {
- Ice::Error error(com->getLogger());
- error << "fatal exception: " << ex << "\n*** Aborting application ***";
- }
-
- abort();
+ Ice::Error error(com->getLogger());
+ error << "LMDB error: " << ex;
}
//
@@ -50,7 +40,7 @@ class PublisherI : public Ice::BlobjectArray
{
public:
- PublisherI(const TopicImplPtr& topic, const InstancePtr& instance) :
+ PublisherI(const TopicImplPtr& topic, const PersistentInstancePtr& instance) :
_topic(topic), _instance(instance)
{
}
@@ -80,7 +70,7 @@ public:
private:
const TopicImplPtr _topic;
- const InstancePtr _instance;
+ const PersistentInstancePtr _instance;
};
//
@@ -91,7 +81,7 @@ class TopicLinkI : public TopicLink
{
public:
- TopicLinkI(const TopicImplPtr& impl, const InstancePtr& instance) :
+ TopicLinkI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) :
_impl(impl), _instance(instance)
{
}
@@ -106,14 +96,14 @@ public:
private:
const TopicImplPtr _impl;
- const InstancePtr _instance;
+ const PersistentInstancePtr _instance;
};
class TopicI : public TopicInternal
{
public:
- TopicI(const TopicImplPtr& impl, const InstancePtr& instance) :
+ TopicI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) :
_impl(impl), _instance(instance)
{
}
@@ -343,26 +333,22 @@ private:
}
const TopicImplPtr _impl;
- const InstancePtr _instance;
+ const PersistentInstancePtr _instance;
};
}
-namespace IceStorm
-{
-extern string identityToTopicName(const Ice::Identity& id);
-}
-
TopicImpl::TopicImpl(
- const InstancePtr& instance,
+ const PersistentInstancePtr& instance,
const string& name,
const Ice::Identity& id,
const SubscriberRecordSeq& subscribers) :
_instance(instance),
- _connection(Freeze::createConnection(instance->communicator(), instance->serviceName())),
_name(name),
_id(id),
- _destroyed(false)
+ _destroyed(false),
+ _lluMap(_instance->lluMap()),
+ _subscriberMap(_instance->subscriberMap())
{
try
{
@@ -455,11 +441,6 @@ TopicImpl::TopicImpl(
__setNoDelete(false);
}
-TopicImpl::~TopicImpl()
-{
- //cout << "~TopicImpl" << endl;
-}
-
string
TopicImpl::getName() const
{
@@ -496,7 +477,7 @@ TopicImpl::getNonReplicatedPublisher() const
namespace
{
void
-trace(Ice::Trace& out, const InstancePtr& instance, const vector<SubscriberPtr>& s)
+trace(Ice::Trace& out, const PersistentInstancePtr& instance, const vector<SubscriberPtr>& s)
{
out << '[';
for(vector<SubscriberPtr>::const_iterator p = s.begin(); p != s.end(); ++p)
@@ -568,34 +549,24 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
LogUpdate llu;
SubscriberPtr subscriber = Subscriber::create(_instance, record);
- for(;;)
+ try
{
- try
- {
- TransactionHolder txn(_connection);
+ IceDB::ReadWriteTxn txn(_instance->dbEnv());
- SubscriberRecordKey key;
- key.topic = _id;
- key.id = subscriber->id();
+ SubscriberRecordKey key;
+ key.topic = _id;
+ key.id = subscriber->id();
- SubscriberMap subscriberMap(_connection, subscriberDbName);
- subscriberMap.put(SubscriberMap::value_type(key, record));
+ _subscriberMap.put(txn, key, record);
- llu = getLLU(_connection);
- llu.iteration++;
- putLLU(_connection, llu);
+ llu = getIncrementedLLU(txn, _lluMap);
- txn.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_instance->communicator(), ex);
- }
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_instance->communicator(), ex);
+ throw; // will become UnknownException in caller
}
_subscribers.push_back(subscriber);
@@ -680,7 +651,7 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
if(p != _subscribers.end())
{
- string name = identityToTopicName(id);
+ string name = IceStormInternal::identityToTopicName(id);
LinkExists ex;
ex.name = name;
throw ex;
@@ -690,34 +661,24 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
SubscriberPtr subscriber = Subscriber::create(_instance, record);
- for(;;)
+ try
{
- try
- {
- TransactionHolder txn(_connection);
+ IceDB::ReadWriteTxn txn(_instance->dbEnv());
- SubscriberRecordKey key;
- key.topic = _id;
- key.id = id;
+ SubscriberRecordKey key;
+ key.topic = _id;
+ key.id = id;
- SubscriberMap subscriberMap(_connection, subscriberDbName);
- subscriberMap.put(SubscriberMap::value_type(key, record));
+ _subscriberMap.put(txn, key, record);
- llu = getLLU(_connection);
- llu.iteration++;
- putLLU(_connection, llu);
+ llu = getIncrementedLLU(txn, _lluMap);
- txn.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_instance->communicator(), ex);
- }
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_instance->communicator(), ex);
+ throw; // will become UnknownException in caller
}
_subscribers.push_back(subscriber);
@@ -739,7 +700,7 @@ TopicImpl::unlink(const TopicPrx& topic)
vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(), id);
if(p == _subscribers.end())
{
- string name = identityToTopicName(id);
+ string name = IceStormInternal::identityToTopicName(id);
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
@@ -814,7 +775,7 @@ TopicImpl::getLinkInfoSeq() const
if(record.link && !(*p)->errored())
{
LinkInfo info;
- info.name = identityToTopicName(record.theTopic->ice_getIdentity());
+ info.name = IceStormInternal::identityToTopicName(record.theTopic->ice_getIdentity());
info.cost = record.cost;
info.theTopic = record.theTopic;
seq.push_back(info);
@@ -974,7 +935,7 @@ class TopicInternalReapCB : public IceUtil::Shared
{
public:
- TopicInternalReapCB(const InstancePtr& instance, Ice::Long generation) :
+ TopicInternalReapCB(const PersistentInstancePtr& instance, Ice::Long generation) :
_instance(instance), _generation(generation)
{
}
@@ -992,7 +953,7 @@ public:
private:
- const InstancePtr _instance;
+ const PersistentInstancePtr _instance;
const Ice::Long _generation;
};
@@ -1112,33 +1073,25 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r
}
SubscriberPtr subscriber = Subscriber::create(_instance, record);
- for(;;)
+ try
{
- try
- {
- TransactionHolder txn(_connection);
+ IceDB::ReadWriteTxn txn(_instance->dbEnv());
- SubscriberRecordKey key;
- key.topic = _id;
- key.id = subscriber->id();
+ SubscriberRecordKey key;
+ key.topic = _id;
+ key.id = subscriber->id();
- SubscriberMap subscriberMap(_connection, subscriberDbName);
- subscriberMap.put(SubscriberMap::value_type(key, record));
+ _subscriberMap.put(txn, key, record);
- // Update the LLU.
- putLLU(_connection, llu);
+ // Update the LLU.
+ _lluMap.put(txn, lluDbKey, llu);
- txn.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_instance->communicator(), ex);
- }
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_instance->communicator(), ex);
+ throw; // will become UnknownException in caller
}
_subscribers.push_back(subscriber);
@@ -1165,7 +1118,32 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq
IceUtil::Mutex::Lock sync(_subscribersMutex);
- // Remove the subscriber from the subscribers list. If the
+
+ // First remove from the database.
+ try
+ {
+ IceDB::ReadWriteTxn txn(_instance->dbEnv());
+
+ for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
+ {
+ SubscriberRecordKey key;
+ key.topic = _id;
+ key.id = *id;
+
+ _subscriberMap.del(txn, key);
+ }
+
+ _lluMap.put(txn, lluDbKey, llu);
+
+ txn.commit();
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_instance->communicator(), ex);
+ throw; // will become UnknownException in caller
+ }
+
+ // Then remove the subscriber from the subscribers list. If the
// subscriber had a local failure and was removed from the
// subscriber list it could already be gone. That's not a problem.
for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
@@ -1177,36 +1155,6 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq
_subscribers.erase(p);
}
}
-
- // Next remove from the database.
- for(;;)
- {
- try
- {
- TransactionHolder txn(_connection);
-
- for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
- {
- SubscriberRecordKey key;
- key.topic = _id;
- key.id = *id;
-
- SubscriberMap subscriberMap(_connection, subscriberDbName);
- subscriberMap.erase(key);
- }
- putLLU(_connection, llu);
- txn.commit();
- break;
- }
- catch(const DeadlockException&)
- {
- continue;
- }
- catch(const DatabaseException& ex)
- {
- halt(_instance->communicator(), ex);
- }
- }
}
void
@@ -1259,60 +1207,59 @@ TopicImpl::updateSubscriberObservers()
LogUpdate
TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
{
- _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
- _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
- _instance->topicReaper()->add(_name);
-
- // Destroy each of the subscribers.
- for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
- {
- (*p)->destroy();
- }
- _subscribers.clear();
// Clear out the database records related to this topic.
LogUpdate llu;
- for(;;)
+ try
{
- try
- {
- TransactionHolder txn(_connection);
+ IceDB::ReadWriteTxn txn(_instance->dbEnv());
- // Erase all subscriber records and the topic record.
- SubscriberMap subscriberMap(_connection, subscriberDbName);
+ // Erase all subscriber records and the topic record.
+ SubscriberRecordKey key;
+ key.topic = _id;
- IceStorm::SubscriberRecordKey key;
- key.topic = _id;
- SubscriberMap::iterator p = subscriberMap.find(key);
- while(p != subscriberMap.end() && p->first.topic == key.topic)
- {
- subscriberMap.erase(p++);
- }
+ SubscriberMapRWCursor cursor(_subscriberMap, txn);
+ if(cursor.find(key))
+ {
+ _subscriberMap.del(txn, key);
- // Update the LLU.
- if(master)
+ SubscriberRecordKey k;
+ SubscriberRecord v;
+ while(cursor.get(k, v, MDB_NEXT) && k.topic == key.topic)
{
- llu = getLLU(_connection);
- llu.iteration++;
+ _subscriberMap.del(txn, k);
}
- else
- {
- llu = origLLU;
- }
- putLLU(_connection, llu);
-
- txn.commit();
- break;
}
- catch(const DeadlockException&)
+
+ // Update the LLU.
+ if(master)
{
- continue;
+ llu = getIncrementedLLU(txn, _lluMap);
}
- catch(const DatabaseException& ex)
+ else
{
- halt(_instance->communicator(), ex);
+ llu = origLLU;
+ _lluMap.put(txn, lluDbKey, llu);
}
+
+ txn.commit();
}
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_instance->communicator(), ex);
+ throw; // will become UnknownException in caller
+ }
+
+ _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
+ _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
+ _instance->topicReaper()->add(_name);
+
+ // Destroy each of the subscribers.
+ for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ {
+ (*p)->destroy();
+ }
+ _subscribers.clear();
_instance->topicAdapter()->remove(_id);
@@ -1324,66 +1271,60 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
void
TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
{
- Ice::IdentitySeq removed;
-
- // First remove the subscriber from the subscribers list. Its
- // possible that some of these subscribers have already been
- // removed (consider, for example, a concurrent reap call from two
- // replicas on the same subscriber). To avoid sending unnecessary
- // observer updates keep track of the observers that are actually
- // removed.
- for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
- {
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
- if(p != _subscribers.end())
- {
- (*p)->destroy();
- _subscribers.erase(p);
- removed.push_back(*id);
- }
- }
+ // First update the database
- // If there is no further work to do we are done.
- if(removed.empty())
- {
- return;
- }
-
- // Next update the database and send the notification to any
- // slaves.
LogUpdate llu;
- for(;;)
+ bool found = false;
+ try
{
- try
+ IceDB::ReadWriteTxn txn(_instance->dbEnv());
+
+ for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
{
- TransactionHolder txn(_connection);
+ SubscriberRecordKey key;
+ key.topic = _id;
+ key.id = *id;
- for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
+ if(_subscriberMap.del(txn, key))
{
- SubscriberRecordKey key;
- key.topic = _id;
- key.id = *id;
-
- SubscriberMap subscriberMap(_connection, subscriberDbName);
- subscriberMap.erase(key);
+ found = true;
}
+ }
- llu = getLLU(_connection);
- llu.iteration++;
- putLLU(_connection, llu);
-
+ if(found)
+ {
+ llu = getIncrementedLLU(txn, _lluMap);
txn.commit();
- break;
}
- catch(const DeadlockException&)
+ else
{
- continue;
+ txn.rollback();
}
- catch(const DatabaseException& ex)
+ }
+ catch(const IceDB::LMDBException& ex)
+ {
+ logError(_instance->communicator(), ex);
+ throw; // will become UnknownException in caller
+ }
+
+ if(found)
+ {
+ // Then remove the subscriber from the subscribers list. Its
+ // possible that some of these subscribers have already been
+ // removed (consider, for example, a concurrent reap call from two
+ // replicas on the same subscriber). To avoid sending unnecessary
+ // observer updates keep track of the observers that are actually
+ // removed.
+ for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
{
- halt(_instance->communicator(), ex);
+ vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
+ if(p != _subscribers.end())
+ {
+ (*p)->destroy();
+ _subscribers.erase(p);
+ }
}
- }
- _instance->observers()->removeSubscriber(llu, _name, ids);
+ _instance->observers()->removeSubscriber(llu, _name, ids);
+ }
}