summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp104
1 files changed, 53 insertions, 51 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index b3847acf56e..81f404c1b0b 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -8,13 +8,15 @@
// **********************************************************************
#include <IceUtil/DisableWarnings.h>
+#include <Freeze/Freeze.h>
#include <IceStorm/TopicI.h>
#include <IceStorm/Instance.h>
#include <IceStorm/Subscriber.h>
#include <IceStorm/TraceLevels.h>
#include <IceStorm/NodeI.h>
#include <IceStorm/Observers.h>
-#include <IceStorm/DB.h>
+#include <IceStorm/SubscriberMap.h>
+#include <IceStorm/LLUMap.h>
#include <IceStorm/Util.h>
#include <Ice/LoggerUtil.h>
#include <algorithm>
@@ -22,12 +24,15 @@
using namespace std;
using namespace IceStorm;
using namespace IceStormElection;
+using namespace IceStormInternal;
-using namespace IceDB;
+using namespace Freeze;
namespace
{
+const string subscriberDbName = "subscribers";
+
void
halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
{
@@ -405,7 +410,6 @@ TopicImpl::TopicImpl(
_instance(instance),
_name(name),
_id(id),
- _connectionPool(instance->connectionPool()),
_destroyed(false)
{
try
@@ -650,20 +654,20 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(
+ _instance->communicator(), _instance->serviceName());
TransactionHolder txn(connection);
SubscriberRecordKey key;
key.topic = _id;
key.id = record.id;
- SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection);
- subscribersWrapper->erase(key);
+ SubscriberMap subscriberMap(connection, subscriberDbName);
+ subscriberMap.erase(key);
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
- llu = lluWrapper->get();
+ llu = getLLU(connection);
llu.iteration++;
- lluWrapper->put(llu);
+ putLLU(connection, llu);
txn.commit();
break;
@@ -687,21 +691,20 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
TransactionHolder txn(connection);
SubscriberRecordKey key;
key.topic = _id;
key.id = subscriber->id();
- SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection);
- subscribersWrapper->put(key, record);
+ SubscriberMap subscriberMap(connection, subscriberDbName);
+ subscriberMap.put(SubscriberMap::value_type(key, record));
// Update the LLU.
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
- llu = lluWrapper->get();
+ llu = getLLU(connection);
llu.iteration++;
- lluWrapper->put(llu);
+ putLLU(connection, llu);
txn.commit();
break;
@@ -772,20 +775,19 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
TransactionHolder txn(connection);
SubscriberRecordKey key;
key.topic = _id;
key.id = subscriber->id();
- SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection);
- subscribersWrapper->put(key, record);
+ SubscriberMap subscriberMap(connection, subscriberDbName);
+ subscriberMap.put(SubscriberMap::value_type(key, record));
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
- llu = lluWrapper->get();
+ llu = getLLU(connection);
llu.iteration++;
- lluWrapper->put(llu);
+ putLLU(connection, llu);
txn.commit();
break;
@@ -896,20 +898,19 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
TransactionHolder txn(connection);
SubscriberRecordKey key;
key.topic = _id;
key.id = id;
- SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection);
- subscribersWrapper->put(key, record);
+ SubscriberMap subscriberMap(connection, subscriberDbName);
+ subscriberMap.put(SubscriberMap::value_type(key, record));
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
- llu = lluWrapper->get();
+ llu = getLLU(connection);
llu.iteration++;
- lluWrapper->put(llu);
+ putLLU(connection, llu);
txn.commit();
break;
@@ -1320,19 +1321,18 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
TransactionHolder txn(connection);
SubscriberRecordKey key;
key.topic = _id;
key.id = subscriber->id();
- SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection);
- subscribersWrapper->put(key, record);
+ SubscriberMap subscriberMap(connection, subscriberDbName);
+ subscriberMap.put(SubscriberMap::value_type(key, record));
// Update the LLU.
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
- lluWrapper->put(llu);
+ putLLU(connection, llu);
txn.commit();
break;
@@ -1389,7 +1389,7 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
TransactionHolder txn(connection);
for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
@@ -1398,13 +1398,10 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq
key.topic = _id;
key.id = *id;
- SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection);
- subscribersWrapper->erase(key);
+ SubscriberMap subscriberMap(connection, subscriberDbName);
+ subscriberMap.erase(key);
}
-
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
- lluWrapper->put(llu);
-
+ putLLU(connection, llu);
txn.commit();
break;
}
@@ -1485,25 +1482,31 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
TransactionHolder txn(connection);
// Erase all subscriber records and the topic record.
- SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection);
- subscribersWrapper->eraseTopic(_id);
+ SubscriberMap subscriberMap(connection, subscriberDbName);
+
+ IceStorm::SubscriberRecordKey key;
+ key.topic = _id;
+ SubscriberMap::iterator p = subscriberMap.find(key);
+ while(p != subscriberMap.end() && p->first.topic == key.topic)
+ {
+ subscriberMap.erase(p++);
+ }
// Update the LLU.
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
if(master)
{
- llu = lluWrapper->get();
+ llu = getLLU(connection);
llu.iteration++;
}
else
{
llu = origLLU;
}
- lluWrapper->put(llu);
+ putLLU(connection, llu);
txn.commit();
break;
@@ -1560,7 +1563,7 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
{
try
{
- DatabaseConnectionPtr connection = _connectionPool->newConnection();
+ ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), _instance->serviceName());
TransactionHolder txn(connection);
for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
@@ -1569,14 +1572,13 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
key.topic = _id;
key.id = *id;
- SubscribersWrapperPtr subscribersWrapper = _connectionPool->getSubscribers(connection);
- subscribersWrapper->erase(key);
+ SubscriberMap subscriberMap(connection, subscriberDbName);
+ subscriberMap.erase(key);
}
- LLUWrapperPtr lluWrapper = _connectionPool->getLLU(connection);
- llu = lluWrapper->get();
+ llu = getLLU(connection);
llu.iteration++;
- lluWrapper->put(llu);
+ putLLU(connection, llu);
txn.commit();
break;