summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicManagerI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicManagerI.cpp80
1 files changed, 32 insertions, 48 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp
index 3a065b6498d..56e166031a8 100644
--- a/cpp/src/IceStorm/TopicManagerI.cpp
+++ b/cpp/src/IceStorm/TopicManagerI.cpp
@@ -15,10 +15,8 @@
#include <IceStorm/NodeI.h>
#include <IceStorm/Observers.h>
#include <IceStorm/Subscriber.h>
+#include <IceStorm/DB.h>
#include <Ice/SliceChecksums.h>
-#ifdef QTSQL
-# include <Ice/Instance.h>
-#endif
#include <functional>
@@ -26,11 +24,7 @@ using namespace std;
using namespace IceStorm;
using namespace IceStormElection;
-#ifdef QTSQL
-using namespace IceSQL;
-#else
-using namespace Freeze;
-#endif
+using namespace IceDB;
namespace
{
@@ -41,9 +35,6 @@ halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
{
Ice::Error error(com->getLogger());
error << "fatal exception: " << ex << "\n*** Aborting application ***";
-#ifdef __GNUC__
- error << "\n" << ex.ice_stackTrace();
-#endif
}
abort();
@@ -298,7 +289,7 @@ nameToIdentity(const InstancePtr& instance, const string& name)
TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) :
_instance(instance),
- _databaseCache(new IceStorm::DatabaseCache(instance))
+ _databaseCache(instance->databaseCache())
{
try
{
@@ -323,18 +314,15 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) :
DatabaseConnectionPtr connection = _databaseCache->getConnection();
// Ensure that the llu counter is present in the log.
- LLUWrapper llumap(_databaseCache, connection);
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
LogUpdate empty = {0, 0};
- llumap.put(empty);
+ lluWrapper->put(empty);
// Recreate each of the topics.
- SubscriberMapWrapper subWrap(_databaseCache, connection);
-#ifdef QTSQL
- SubscriberMap subscriberMap = subWrap.getMap();
-#else
- SubscriberMap& subscriberMap = subWrap.getMap();
-#endif
- SubscriberMap::const_iterator p = subscriberMap.begin();
+ SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
+ map<SubscriberRecordKey, SubscriberRecord> subscriberMap = subscribersWrapper->getMap();
+
+ map<SubscriberRecordKey, SubscriberRecord>::const_iterator p = subscriberMap.begin();
while(p != subscriberMap.end())
{
// This record has to be a place holder record, otherwise
@@ -401,13 +389,13 @@ TopicManagerImpl::create(const string& name)
rec.link = false;
rec.cost = 0;
- SubscriberMapWrapper subscriberMap(_databaseCache, connection);
- subscriberMap.put(key, rec);
+ SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
+ subscribersWrapper->put(key, rec);
- LLUWrapper llumap(_databaseCache, connection);
- llu = llumap.get();
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
+ llu = lluWrapper->get();
llu.iteration++;
- llumap.put(llu);
+ lluWrapper->put(llu);
txn.commit();
break;
@@ -496,11 +484,11 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont
DatabaseConnectionPtr connection = _databaseCache->getConnection();
TransactionHolder txn(connection);
- LLUWrapper llumap(_databaseCache, connection);
- llumap.put(llu);
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
+ lluWrapper->put(llu);
- SubscriberMapWrapper subscriberMap(_databaseCache, connection);
- subscriberMap.clear();
+ SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
+ subscribersWrapper->clear();
for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
{
@@ -510,7 +498,7 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont
rec.link = false;
rec.cost = 0;
- subscriberMap.put(key, rec);
+ subscribersWrapper->put(key, rec);
for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q)
{
@@ -518,7 +506,7 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont
key.topic = p->id;
key.id = q->id;
- subscriberMap.put(key, *q);
+ subscribersWrapper->put(key, *q);
}
}
txn.commit();
@@ -606,19 +594,19 @@ TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name)
rec.link = false;
rec.cost = 0;
- SubscriberMapWrapper subscriberMap(_databaseCache, connection);
+ SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
try
{
- subscriberMap.find(key);
+ subscribersWrapper->find(key);
throw ObserverInconsistencyException("topic exists: " + name);
}
catch(const NotFoundException&)
{
}
- subscriberMap.put(key, rec);
+ subscribersWrapper->put(key, rec);
- LLUWrapper llumap(_databaseCache, connection);
- llumap.put(llu);
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
+ lluWrapper->put(llu);
txn.commit();
break;
@@ -707,8 +695,8 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content)
content.push_back(rec);
}
- LLUWrapper llumap(_databaseCache, connection);
- llu = llumap.get();
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
+ llu = lluWrapper->get();
break;
}
catch(const DeadlockException&)
@@ -731,8 +719,8 @@ TopicManagerImpl::getLastLogUpdate() const
{
try
{
- LLUWrapper llumap(_databaseCache, connection);
- return llumap.get();
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
+ return lluWrapper->get();
}
catch(const DeadlockException&)
{
@@ -792,8 +780,8 @@ TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate&
content.push_back(rec);
}
- LLUWrapper llumap(_databaseCache, connection);
- llumap.put(llu);
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
+ lluWrapper->put(llu);
txn.commit();
break;
@@ -892,11 +880,7 @@ TopicManagerImpl::installTopic(const string& name, const Ice::Identity& id, bool
}
// Create topic implementation
-#ifdef QTSQL
- TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers, _databaseCache);
-#else
- TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers, new IceStorm::DatabaseCache(_instance));
-#endif
+ TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers);
// The identity is the name of the Topic.
_topics.insert(map<string, TopicImplPtr>::value_type(name, topicImpl));