summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicManagerI.cpp
diff options
context:
space:
mode:
authorJose <jose@zeroc.com>2014-04-18 18:31:48 +0200
committerJose <jose@zeroc.com>2014-04-18 18:31:48 +0200
commite7333297345efda9379045495d17aadb571ddd50 (patch)
tree5bfa86a78d29665e1ef50a9575b76114be1145e1 /cpp/src/IceStorm/TopicManagerI.cpp
parentFixed replicaGroup test issue (diff)
downloadice-e7333297345efda9379045495d17aadb571ddd50.tar.bz2
ice-e7333297345efda9379045495d17aadb571ddd50.tar.xz
ice-e7333297345efda9379045495d17aadb571ddd50.zip
Fixed (ICE-4858) - Eliminate IceDB
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicManagerI.cpp83
1 files changed, 35 insertions, 48 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp
index 9ea638b877e..70150c848bc 100644
--- a/cpp/src/IceStorm/TopicManagerI.cpp
+++ b/cpp/src/IceStorm/TopicManagerI.cpp
@@ -8,6 +8,7 @@
// **********************************************************************
#include <IceUtil/DisableWarnings.h>
+#include <Freeze/Freeze.h>
#include <IceStorm/TopicManagerI.h>
#include <IceStorm/TopicI.h>
#include <IceStorm/TraceLevels.h>
@@ -15,7 +16,8 @@
#include <IceStorm/NodeI.h>
#include <IceStorm/Observers.h>
#include <IceStorm/Subscriber.h>
-#include <IceStorm/DB.h>
+#include <IceStorm/SubscriberMap.h>
+#include <IceStorm/LLUMap.h>
#include <IceStorm/Util.h>
#include <Ice/SliceChecksums.h>
@@ -24,12 +26,15 @@
using namespace std;
using namespace IceStorm;
using namespace IceStormElection;
+using namespace IceStormInternal;
-using namespace IceDB;
+using namespace Freeze;
namespace
{
+const string subscriberDbName = "subscribers";
+
void
halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
{
@@ -289,8 +294,7 @@ nameToIdentity(const InstancePtr& instance, const string& name)
}
TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) :
- _instance(instance),
- _connectionPool(instance->connectionPool())
+ _instance(instance)
{
try
{
@@ -317,18 +321,15 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) :
_sync = _instance->nodeAdapter()->addWithUUID(_syncImpl);
}
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
// Ensure that the llu counter is present in the log.
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
LogUpdate empty = {0, 0};
- lluWrapper->put(empty);
+ putLLU(connection, empty);
// Recreate each of the topics.
- SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection);
- map<SubscriberRecordKey, SubscriberRecord> subscriberMap = subscribersWrapper->getMap();
-
- map<SubscriberRecordKey, SubscriberRecord>::const_iterator p = subscriberMap.begin();
+ SubscriberMap subscriberMap(connection, subscriberDbName);
+ SubscriberMap::const_iterator p = subscriberMap.begin();
while(p != subscriberMap.end())
{
// This record has to be a place holder record, otherwise
@@ -386,7 +387,7 @@ TopicManagerImpl::create(const string& name)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
TransactionHolder txn(connection);
SubscriberRecordKey key;
@@ -395,13 +396,12 @@ TopicManagerImpl::create(const string& name)
rec.link = false;
rec.cost = 0;
- SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection);
- subscribersWrapper->put(key, rec);
+ SubscriberMap subscriberMap(connection, subscriberDbName);
+ subscriberMap.put(SubscriberMap::value_type(key, rec));
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
- llu = lluWrapper->get();
+ llu = getLLU(connection);
llu.iteration++;
- lluWrapper->put(llu);
+ putLLU(connection, llu);
txn.commit();
break;
@@ -417,7 +417,6 @@ TopicManagerImpl::create(const string& name)
}
_instance->observers()->createTopic(llu, name);
-
return installTopic(name, id, true);
}
@@ -491,14 +490,13 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
TransactionHolder txn(connection);
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
- lluWrapper->put(llu);
+ putLLU(connection, llu);
- SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection);
- subscribersWrapper->clear();
+ SubscriberMap subscriberMap(connection, subscriberDbName);
+ subscriberMap.clear();
for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
{
@@ -508,7 +506,7 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont
rec.link = false;
rec.cost = 0;
- subscribersWrapper->put(key, rec);
+ subscriberMap.put(SubscriberMap::value_type(key, rec));
for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q)
{
@@ -516,7 +514,7 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont
key.topic = p->id;
key.id = q->id;
- subscribersWrapper->put(key, *q);
+ subscriberMap.put(SubscriberMap::value_type(key, *q));
}
}
txn.commit();
@@ -595,7 +593,7 @@ TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
TransactionHolder txn(connection);
SubscriberRecordKey key;
@@ -604,19 +602,14 @@ TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name)
rec.link = false;
rec.cost = 0;
- SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection);
- try
+ SubscriberMap subscriberMap(connection, subscriberDbName);
+ if(subscriberMap.find(key) != subscriberMap.end())
{
- subscribersWrapper->find(key);
throw ObserverInconsistencyException("topic exists: " + name);
}
- catch(const NotFoundException&)
- {
- }
- subscribersWrapper->put(key, rec);
+ subscriberMap.put(SubscriberMap::value_type(key, rec));
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
- lluWrapper->put(llu);
+ putLLU(connection, llu);
txn.commit();
break;
@@ -692,7 +685,7 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content)
reap();
}
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
for(;;)
{
@@ -704,9 +697,7 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content)
TopicContent rec = p->second->getContent();
content.push_back(rec);
}
-
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
- llu = lluWrapper->get();
+ llu = getLLU(connection);
break;
}
catch(const DeadlockException&)
@@ -716,21 +707,20 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content)
catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
- }
+ }
}
}
LogUpdate
TopicManagerImpl::getLastLogUpdate() const
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
for(;;)
{
try
{
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
- return lluWrapper->get();
+ return getLLU(connection);
}
catch(const DeadlockException&)
{
@@ -781,7 +771,7 @@ TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate&
{
content.clear();
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
TransactionHolder txn(connection);
for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
@@ -789,10 +779,7 @@ TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate&
TopicContent rec = p->second->getContent();
content.push_back(rec);
}
-
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
- lluWrapper->put(llu);
-
+ putLLU(connection, llu);
txn.commit();
break;
}