summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp99
1 files changed, 45 insertions, 54 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index 77b6ae00542..8c0dd7f602d 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -14,19 +14,15 @@
#include <IceStorm/TraceLevels.h>
#include <IceStorm/NodeI.h>
#include <IceStorm/Observers.h>
+#include <IceStorm/DB.h>
#include <Ice/LoggerUtil.h>
-
#include <algorithm>
using namespace std;
using namespace IceStorm;
using namespace IceStormElection;
-#ifdef QTSQL
-using namespace IceSQL;
-#else
-using namespace Freeze;
-#endif
+using namespace IceDB;
namespace
{
@@ -37,9 +33,6 @@ halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
{
Ice::Error error(com->getLogger());
error << "fatal exception: " << ex << "\n*** Aborting application ***";
-#ifdef __GNUC__
- error << "\n" << ex.ice_stackTrace();
-#endif
}
abort();
@@ -406,13 +399,11 @@ TopicImpl::TopicImpl(
const InstancePtr& instance,
const string& name,
const Ice::Identity& id,
- const SubscriberRecordSeq& subscribers,
- const IceStorm::DatabaseCachePtr& databaseCache
- ) :
+ const SubscriberRecordSeq& subscribers) :
_instance(instance),
_name(name),
_id(id),
- _databaseCache(databaseCache),
+ _databaseCache(instance->databaseCache()),
_destroyed(false)
{
try
@@ -651,8 +642,6 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
LogUpdate llu;
- DatabaseConnectionPtr connection = _databaseCache->getConnection();
-
vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
if(p != _subscribers.end())
{
@@ -665,19 +654,20 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
{
try
{
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
TransactionHolder txn(connection);
SubscriberRecordKey key;
key.topic = _id;
key.id = record.id;
- SubscriberMapWrapper subscriberMap(_databaseCache, connection);
- subscriberMap.erase(key);
+ SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
+ subscribersWrapper->erase(key);
- LLUWrapper llumap(_databaseCache, connection);
- llu = llumap.get();
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
+ llu = lluWrapper->get();
llu.iteration++;
- llumap.put(llu);
+ lluWrapper->put(llu);
txn.commit();
break;
@@ -701,20 +691,21 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
{
try
{
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
TransactionHolder txn(connection);
SubscriberRecordKey key;
key.topic = _id;
key.id = subscriber->id();
- SubscriberMapWrapper subscriberMap(_databaseCache, connection);
- subscriberMap.put(key, record);
+ SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
+ subscribersWrapper->put(key, record);
// Update the LLU.
- LLUWrapper llumap(_databaseCache, connection);
- llu = llumap.get();
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
+ llu = lluWrapper->get();
llu.iteration++;
- llumap.put(llu);
+ lluWrapper->put(llu);
txn.commit();
break;
@@ -790,13 +781,13 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
key.topic = _id;
key.id = subscriber->id();
- SubscriberMapWrapper subscriberMap(_databaseCache, connection);
- subscriberMap.put(key, record);
+ SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
+ subscribersWrapper->put(key, record);
- LLUWrapper llumap(_databaseCache, connection);
- llu = llumap.get();
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
+ llu = lluWrapper->get();
llu.iteration++;
- llumap.put(llu);
+ lluWrapper->put(llu);
txn.commit();
break;
@@ -912,13 +903,13 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
key.topic = _id;
key.id = id;
- SubscriberMapWrapper subscriberMap(_databaseCache, connection);
- subscriberMap.put(key, record);
+ SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
+ subscribersWrapper->put(key, record);
- LLUWrapper llumap(_databaseCache, connection);
- llu = llumap.get();
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
+ llu = lluWrapper->get();
llu.iteration++;
- llumap.put(llu);
+ lluWrapper->put(llu);
txn.commit();
break;
@@ -1309,12 +1300,12 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r
key.topic = _id;
key.id = subscriber->id();
- SubscriberMapWrapper subscriberMap(_databaseCache, connection);
- subscriberMap.put(key, record);
+ SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
+ subscribersWrapper->put(key, record);
// Update the LLU.
- LLUWrapper llumap(_databaseCache, connection);
- llumap.put(llu);
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
+ lluWrapper->put(llu);
txn.commit();
break;
@@ -1380,12 +1371,12 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq
key.topic = _id;
key.id = *id;
- SubscriberMapWrapper subscriberMap(_databaseCache, connection);
- subscriberMap.erase(key);
+ SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
+ subscribersWrapper->erase(key);
}
- LLUWrapper llumap(_databaseCache, connection);
- llumap.put(llu);
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
+ lluWrapper->put(llu);
txn.commit();
break;
@@ -1451,21 +1442,21 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
TransactionHolder txn(connection);
// Erase all subscriber records and the topic record.
- SubscriberMapWrapper subscriberMap(_databaseCache, connection);
- subscriberMap.eraseTopic(_id);
+ SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
+ subscribersWrapper->eraseTopic(_id);
// Update the LLU.
- LLUWrapper llumap(_databaseCache, connection);
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
if(master)
{
- llu = llumap.get();
+ llu = lluWrapper->get();
llu.iteration++;
}
else
{
llu = origLLU;
}
- llumap.put(llu);
+ lluWrapper->put(llu);
txn.commit();
break;
@@ -1530,15 +1521,15 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
SubscriberRecordKey key;
key.topic = _id;
key.id = *id;
-
- SubscriberMapWrapper subscriberMap(_databaseCache, connection);
- subscriberMap.erase(key);
+
+ SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
+ subscribersWrapper->erase(key);
}
- LLUWrapper llumap(_databaseCache, connection);
- llu = llumap.get();
+ LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
+ llu = lluWrapper->get();
llu.iteration++;
- llumap.put(llu);
+ lluWrapper->put(llu);
txn.commit();
break;