summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp194
1 files changed, 106 insertions, 88 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index e799b5fe532..77b6ae00542 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -14,26 +14,32 @@
#include <IceStorm/TraceLevels.h>
#include <IceStorm/NodeI.h>
#include <IceStorm/Observers.h>
-
#include <Ice/LoggerUtil.h>
-#include <Freeze/Freeze.h>
-
#include <algorithm>
using namespace std;
using namespace IceStorm;
using namespace IceStormElection;
+#ifdef QTSQL
+using namespace IceSQL;
+#else
+using namespace Freeze;
+#endif
+
namespace
{
void
-halt(const Ice::CommunicatorPtr& com, const Freeze::DatabaseException& ex)
+halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
{
{
Ice::Error error(com->getLogger());
error << "fatal exception: " << ex << "\n*** Aborting application ***";
+#ifdef __GNUC__
+ error << "\n" << ex.ice_stackTrace();
+#endif
}
abort();
@@ -400,18 +406,19 @@ TopicImpl::TopicImpl(
const InstancePtr& instance,
const string& name,
const Ice::Identity& id,
- const SubscriberRecordSeq& subscribers) :
+ const SubscriberRecordSeq& subscribers,
+ const IceStorm::DatabaseCachePtr& databaseCache
+ ) :
_instance(instance),
_name(name),
_id(id),
- _connection(Freeze::createConnection(instance->communicator(), instance->serviceName())),
- _subscriberMap(_connection, "subscribers"),
- _llumap(_connection, "llu"),
+ _databaseCache(databaseCache),
_destroyed(false)
{
try
{
__setNoDelete(true);
+
// TODO: If we want to improve the performance of the
// non-replicated case we could allocate a null-topic impl here.
_servant = new TopicI(this, instance);
@@ -644,6 +651,8 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
LogUpdate llu;
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+
vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
if(p != _subscribers.end())
{
@@ -656,28 +665,28 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
{
try
{
- Freeze::TransactionHolder txn(_connection);
+ TransactionHolder txn(connection);
+
SubscriberRecordKey key;
key.topic = _id;
key.id = record.id;
- SubscriberMap::iterator e = _subscriberMap.find(key);
- if(e != _subscriberMap.end())
- {
- _subscriberMap.erase(e);
- }
- LLUMap::iterator ci = _llumap.find("_manager");
- assert(ci != _llumap.end());
- llu = ci->second;
+
+ SubscriberMapWrapper subscriberMap(_databaseCache, connection);
+ subscriberMap.erase(key);
+
+ LLUWrapper llumap(_databaseCache, connection);
+ llu = llumap.get();
llu.iteration++;
- ci.set(llu);
+ llumap.put(llu);
+
txn.commit();
break;
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}
@@ -692,25 +701,29 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
{
try
{
- Freeze::TransactionHolder txn(_connection);
+ TransactionHolder txn(connection);
+
SubscriberRecordKey key;
key.topic = _id;
key.id = subscriber->id();
- _subscriberMap.put(SubscriberMap::value_type(key, record));
+
+ SubscriberMapWrapper subscriberMap(_databaseCache, connection);
+ subscriberMap.put(key, record);
+
// Update the LLU.
- LLUMap::iterator ci = _llumap.find("_manager");
- assert(ci != _llumap.end());
- llu = ci->second;
+ LLUWrapper llumap(_databaseCache, connection);
+ llu = llumap.get();
llu.iteration++;
- ci.set(llu);
+ llumap.put(llu);
+
txn.commit();
break;
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}
@@ -770,26 +783,29 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
{
try
{
- Freeze::TransactionHolder txn(_connection);
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txn(connection);
SubscriberRecordKey key;
key.topic = _id;
key.id = subscriber->id();
- _subscriberMap.put(SubscriberMap::value_type(key, record));
- LLUMap::iterator ci = _llumap.find("_manager");
- assert(ci != _llumap.end());
- llu = ci->second;
+ SubscriberMapWrapper subscriberMap(_databaseCache, connection);
+ subscriberMap.put(key, record);
+
+ LLUWrapper llumap(_databaseCache, connection);
+ llu = llumap.get();
llu.iteration++;
- ci.set(llu);
+ llumap.put(llu);
+
txn.commit();
break;
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}
@@ -889,26 +905,29 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
{
try
{
- Freeze::TransactionHolder txn(_connection);
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txn(connection);
SubscriberRecordKey key;
key.topic = _id;
key.id = id;
- _subscriberMap.put(SubscriberMap::value_type(key, record));
- LLUMap::iterator ci = _llumap.find("_manager");
- assert(ci != _llumap.end());
- llu = ci->second;
+ SubscriberMapWrapper subscriberMap(_databaseCache, connection);
+ subscriberMap.put(key, record);
+
+ LLUWrapper llumap(_databaseCache, connection);
+ llu = llumap.get();
llu.iteration++;
- ci.set(llu);
+ llumap.put(llu);
+
txn.commit();
break;
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}
@@ -1283,25 +1302,28 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r
{
try
{
- Freeze::TransactionHolder txn(_connection);
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txn(connection);
SubscriberRecordKey key;
key.topic = _id;
key.id = subscriber->id();
- _subscriberMap.put(SubscriberMap::value_type(key, record));
+
+ SubscriberMapWrapper subscriberMap(_databaseCache, connection);
+ subscriberMap.put(key, record);
// Update the LLU.
- LLUMap::iterator ci = _llumap.find("_manager");
- assert(ci != _llumap.end());
- ci.set(llu);
+ LLUWrapper llumap(_databaseCache, connection);
+ llumap.put(llu);
+
txn.commit();
break;
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}
@@ -1349,29 +1371,30 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq
{
try
{
- Freeze::TransactionHolder txn(_connection);
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txn(connection);
+
for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
{
SubscriberRecordKey key;
key.topic = _id;
key.id = *id;
- SubscriberMap::iterator e = _subscriberMap.find(key);
- if(e != _subscriberMap.end())
- {
- _subscriberMap.erase(e);
- }
+
+ SubscriberMapWrapper subscriberMap(_databaseCache, connection);
+ subscriberMap.erase(key);
}
- LLUMap::iterator ci = _llumap.find("_manager");
- assert(ci != _llumap.end());
- ci.set(llu);
+
+ LLUWrapper llumap(_databaseCache, connection);
+ llumap.put(llu);
+
txn.commit();
break;
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}
@@ -1424,38 +1447,34 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
{
try
{
- SubscriberRecordKey key;
- key.topic = _id;
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txn(connection);
- Freeze::TransactionHolder txn(_connection);
// Erase all subscriber records and the topic record.
- SubscriberMap::iterator p = _subscriberMap.find(key);
- while(p != _subscriberMap.end() && p->first.topic == key.topic)
- {
- _subscriberMap.erase(p++);
- }
+ SubscriberMapWrapper subscriberMap(_databaseCache, connection);
+ subscriberMap.eraseTopic(_id);
// Update the LLU.
- LLUMap::iterator ci = _llumap.find("_manager");
- assert(ci != _llumap.end());
+ LLUWrapper llumap(_databaseCache, connection);
if(master)
{
- llu = ci->second;
+ llu = llumap.get();
llu.iteration++;
}
else
{
llu = origLLU;
}
- ci.set(llu);
+ llumap.put(llu);
+
txn.commit();
break;
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}
@@ -1503,33 +1522,32 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
{
try
{
- Freeze::TransactionHolder txn(_connection);
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txn(connection);
for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
{
SubscriberRecordKey key;
key.topic = _id;
key.id = *id;
- SubscriberMap::iterator e = _subscriberMap.find(key);
- if(e != _subscriberMap.end())
- {
- _subscriberMap.erase(e);
- }
+
+ SubscriberMapWrapper subscriberMap(_databaseCache, connection);
+ subscriberMap.erase(key);
}
- LLUMap::iterator ci = _llumap.find("_manager");
- assert(ci != _llumap.end());
- llu = ci->second;
+ LLUWrapper llumap(_databaseCache, connection);
+ llu = llumap.get();
llu.iteration++;
- ci.set(llu);
+ llumap.put(llu);
+
txn.commit();
break;
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}