diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.cpp | 158 |
1 files changed, 93 insertions, 65 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp index 56d33b07a15..3a065b6498d 100644 --- a/cpp/src/IceStorm/TopicManagerI.cpp +++ b/cpp/src/IceStorm/TopicManagerI.cpp @@ -12,13 +12,13 @@ #include <IceStorm/TopicI.h> #include <IceStorm/TraceLevels.h> #include <IceStorm/Instance.h> -#include <Freeze/Freeze.h> - #include <IceStorm/NodeI.h> #include <IceStorm/Observers.h> #include <IceStorm/Subscriber.h> - #include <Ice/SliceChecksums.h> +#ifdef QTSQL +# include <Ice/Instance.h> +#endif #include <functional> @@ -26,15 +26,24 @@ using namespace std; using namespace IceStorm; using namespace IceStormElection; +#ifdef QTSQL +using namespace IceSQL; +#else +using namespace Freeze; +#endif + namespace { void -halt(const Ice::CommunicatorPtr& com, const Freeze::DatabaseException& ex) +halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex) { { Ice::Error error(com->getLogger()); error << "fatal exception: " << ex << "\n*** Aborting application ***"; +#ifdef __GNUC__ + error << "\n" << ex.ice_stackTrace(); +#endif } abort(); @@ -288,11 +297,8 @@ nameToIdentity(const InstancePtr& instance, const string& name) } TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) : - _instance(instance), - _connection(Freeze::createConnection(instance->communicator(), instance->serviceName())), - _llumap(_connection, "llu"), - _subscriberMap(_connection, "subscribers") + _databaseCache(new IceStorm::DatabaseCache(instance)) { try { @@ -314,17 +320,22 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) : _sync = _instance->nodeAdapter()->addWithUUID(_syncImpl); } + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + // Ensure that the llu counter is present in the log. - LLUMap::const_iterator ci = _llumap.find("_manager"); - if(ci == _llumap.end()) - { - LogUpdate empty = {0, 0}; - _llumap.put(LLUMap::value_type("_manager", empty)); - } + LLUWrapper llumap(_databaseCache, connection); + LogUpdate empty = {0, 0}; + llumap.put(empty); // Recreate each of the topics. - SubscriberMap::const_iterator p = _subscriberMap.begin(); - while(p != _subscriberMap.end()) + SubscriberMapWrapper subWrap(_databaseCache, connection); +#ifdef QTSQL + SubscriberMap subscriberMap = subWrap.getMap(); +#else + SubscriberMap& subscriberMap = subWrap.getMap(); +#endif + SubscriberMap::const_iterator p = subscriberMap.begin(); + while(p != subscriberMap.end()) { // This record has to be a place holder record, otherwise // there is a database bug. @@ -336,7 +347,7 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) : ++p; SubscriberRecordSeq content; - while(p != _subscriberMap.end() && p->first.topic == topic) + while(p != subscriberMap.end() && p->first.topic == topic) { content.push_back(p->second); ++p; @@ -381,28 +392,31 @@ TopicManagerImpl::create(const string& name) { try { + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txn(connection); - Freeze::TransactionHolder txn(_connection); SubscriberRecordKey key; key.topic = id; SubscriberRecord rec; rec.link = false; rec.cost = 0; - _subscriberMap.put(SubscriberMap::value_type(key, rec)); - LLUMap::iterator ci = _llumap.find("_manager"); - assert(ci != _llumap.end()); - llu = ci->second; + + SubscriberMapWrapper subscriberMap(_databaseCache, connection); + subscriberMap.put(key, rec); + + LLUWrapper llumap(_databaseCache, connection); + llu = llumap.get(); llu.iteration++; - ci.set(llu); + llumap.put(llu); txn.commit(); break; } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } @@ -479,10 +493,14 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont { try { - Freeze::TransactionHolder txn(_connection); - _subscriberMap.clear(); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txn(connection); - _llumap.put(LLUMap::value_type("_manager", llu)); + LLUWrapper llumap(_databaseCache, connection); + llumap.put(llu); + + SubscriberMapWrapper subscriberMap(_databaseCache, connection); + subscriberMap.clear(); for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p) { @@ -491,24 +509,26 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont SubscriberRecord rec; rec.link = false; rec.cost = 0; - _subscriberMap.put(SubscriberMap::value_type(key, rec)); + + subscriberMap.put(key, rec); + for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q) { SubscriberRecordKey key; key.topic = p->id; key.id = q->id; - _subscriberMap.put(SubscriberMap::value_type(key, *q)); + + subscriberMap.put(key, *q); } } - txn.commit(); break; } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } @@ -577,27 +597,37 @@ TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name) { try { - Freeze::TransactionHolder txn(_connection); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txn(connection); + SubscriberRecordKey key; key.topic = id; SubscriberRecord rec; rec.link = false; rec.cost = 0; - SubscriberMap::const_iterator q = _subscriberMap.find(key); - if(q != _subscriberMap.end()) + + SubscriberMapWrapper subscriberMap(_databaseCache, connection); + try { + subscriberMap.find(key); throw ObserverInconsistencyException("topic exists: " + name); } - _subscriberMap.put(SubscriberMap::value_type(key, rec)); - _llumap.put(LLUMap::value_type("_manager", llu)); + catch(const NotFoundException&) + { + } + subscriberMap.put(key, rec); + + LLUWrapper llumap(_databaseCache, connection); + llumap.put(llu); + txn.commit(); break; } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } @@ -664,34 +694,28 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content) reap(); } - // Reads are not synchronized and therefore must use a separate - // connection. - const Freeze::ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), - _instance->serviceName()); - const LLUMap llumap(connection, "llu"); + DatabaseConnectionPtr connection = _databaseCache->newConnection(); for(;;) { try { content.clear(); - Freeze::TransactionHolder txn(connection); for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) { TopicContent rec = p->second->getContent(); content.push_back(rec); } - - LLUMap::const_iterator ci = llumap.find("_manager"); - assert(ci != llumap.end()); - llu = ci->second; + + LLUWrapper llumap(_databaseCache, connection); + llu = llumap.get(); break; } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } @@ -701,22 +725,20 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content) LogUpdate TopicManagerImpl::getLastLogUpdate() const { - const Freeze::ConnectionPtr connection = Freeze::createConnection( - _instance->communicator(), _instance->serviceName()); - const LLUMap llumap(connection, "llu"); + DatabaseConnectionPtr connection = _databaseCache->newConnection(); for(;;) { try { - LLUMap::const_iterator ci = llumap.find("_manager"); - return ci->second; + LLUWrapper llumap(_databaseCache, connection); + return llumap.get(); } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } @@ -761,24 +783,26 @@ TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate& { content.clear(); - Freeze::TransactionHolder txn(_connection); + DatabaseConnectionPtr connection = _databaseCache->getConnection(); + TransactionHolder txn(connection); + for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) { TopicContent rec = p->second->getContent(); content.push_back(rec); } - LLUMap::iterator ci = _llumap.find("_manager"); - ci.set(llu); + LLUWrapper llumap(_databaseCache, connection); + llumap.put(llu); txn.commit(); break; } - catch(const Freeze::DeadlockException&) + catch(const DeadlockException&) { continue; } - catch(const Freeze::DatabaseException& ex) + catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); } @@ -868,7 +892,11 @@ TopicManagerImpl::installTopic(const string& name, const Ice::Identity& id, bool } // Create topic implementation - TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers); +#ifdef QTSQL + TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers, _databaseCache); +#else + TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers, new IceStorm::DatabaseCache(_instance)); +#endif // The identity is the name of the Topic. _topics.insert(map<string, TopicImplPtr>::value_type(name, topicImpl)); |