summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicManagerI.cpp
diff options
context:
space:
mode:
authorDwayne Boone <dwayne@zeroc.com>2009-09-28 11:05:44 -0230
committerDwayne Boone <dwayne@zeroc.com>2009-09-28 11:05:44 -0230
commit7d20430028f05cc26c412465176a75ce4ea5af9e (patch)
tree593695acf366f7e3a7081d15af8f474683ce4af7 /cpp/src/IceStorm/TopicManagerI.cpp
parentRemoved unused __checkTwoway(const char*) from Proxy (diff)
downloadice-7d20430028f05cc26c412465176a75ce4ea5af9e.tar.bz2
ice-7d20430028f05cc26c412465176a75ce4ea5af9e.tar.xz
ice-7d20430028f05cc26c412465176a75ce4ea5af9e.zip
Bug 3231 - alternative storage for IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicManagerI.cpp158
1 files changed, 93 insertions, 65 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp
index 56d33b07a15..3a065b6498d 100644
--- a/cpp/src/IceStorm/TopicManagerI.cpp
+++ b/cpp/src/IceStorm/TopicManagerI.cpp
@@ -12,13 +12,13 @@
#include <IceStorm/TopicI.h>
#include <IceStorm/TraceLevels.h>
#include <IceStorm/Instance.h>
-#include <Freeze/Freeze.h>
-
#include <IceStorm/NodeI.h>
#include <IceStorm/Observers.h>
#include <IceStorm/Subscriber.h>
-
#include <Ice/SliceChecksums.h>
+#ifdef QTSQL
+# include <Ice/Instance.h>
+#endif
#include <functional>
@@ -26,15 +26,24 @@ using namespace std;
using namespace IceStorm;
using namespace IceStormElection;
+#ifdef QTSQL
+using namespace IceSQL;
+#else
+using namespace Freeze;
+#endif
+
namespace
{
void
-halt(const Ice::CommunicatorPtr& com, const Freeze::DatabaseException& ex)
+halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
{
{
Ice::Error error(com->getLogger());
error << "fatal exception: " << ex << "\n*** Aborting application ***";
+#ifdef __GNUC__
+ error << "\n" << ex.ice_stackTrace();
+#endif
}
abort();
@@ -288,11 +297,8 @@ nameToIdentity(const InstancePtr& instance, const string& name)
}
TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) :
-
_instance(instance),
- _connection(Freeze::createConnection(instance->communicator(), instance->serviceName())),
- _llumap(_connection, "llu"),
- _subscriberMap(_connection, "subscribers")
+ _databaseCache(new IceStorm::DatabaseCache(instance))
{
try
{
@@ -314,17 +320,22 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) :
_sync = _instance->nodeAdapter()->addWithUUID(_syncImpl);
}
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+
// Ensure that the llu counter is present in the log.
- LLUMap::const_iterator ci = _llumap.find("_manager");
- if(ci == _llumap.end())
- {
- LogUpdate empty = {0, 0};
- _llumap.put(LLUMap::value_type("_manager", empty));
- }
+ LLUWrapper llumap(_databaseCache, connection);
+ LogUpdate empty = {0, 0};
+ llumap.put(empty);
// Recreate each of the topics.
- SubscriberMap::const_iterator p = _subscriberMap.begin();
- while(p != _subscriberMap.end())
+ SubscriberMapWrapper subWrap(_databaseCache, connection);
+#ifdef QTSQL
+ SubscriberMap subscriberMap = subWrap.getMap();
+#else
+ SubscriberMap& subscriberMap = subWrap.getMap();
+#endif
+ SubscriberMap::const_iterator p = subscriberMap.begin();
+ while(p != subscriberMap.end())
{
// This record has to be a place holder record, otherwise
// there is a database bug.
@@ -336,7 +347,7 @@ TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) :
++p;
SubscriberRecordSeq content;
- while(p != _subscriberMap.end() && p->first.topic == topic)
+ while(p != subscriberMap.end() && p->first.topic == topic)
{
content.push_back(p->second);
++p;
@@ -381,28 +392,31 @@ TopicManagerImpl::create(const string& name)
{
try
{
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txn(connection);
- Freeze::TransactionHolder txn(_connection);
SubscriberRecordKey key;
key.topic = id;
SubscriberRecord rec;
rec.link = false;
rec.cost = 0;
- _subscriberMap.put(SubscriberMap::value_type(key, rec));
- LLUMap::iterator ci = _llumap.find("_manager");
- assert(ci != _llumap.end());
- llu = ci->second;
+
+ SubscriberMapWrapper subscriberMap(_databaseCache, connection);
+ subscriberMap.put(key, rec);
+
+ LLUWrapper llumap(_databaseCache, connection);
+ llu = llumap.get();
llu.iteration++;
- ci.set(llu);
+ llumap.put(llu);
txn.commit();
break;
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}
@@ -479,10 +493,14 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont
{
try
{
- Freeze::TransactionHolder txn(_connection);
- _subscriberMap.clear();
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txn(connection);
- _llumap.put(LLUMap::value_type("_manager", llu));
+ LLUWrapper llumap(_databaseCache, connection);
+ llumap.put(llu);
+
+ SubscriberMapWrapper subscriberMap(_databaseCache, connection);
+ subscriberMap.clear();
for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
{
@@ -491,24 +509,26 @@ TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& cont
SubscriberRecord rec;
rec.link = false;
rec.cost = 0;
- _subscriberMap.put(SubscriberMap::value_type(key, rec));
+
+ subscriberMap.put(key, rec);
+
for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q)
{
SubscriberRecordKey key;
key.topic = p->id;
key.id = q->id;
- _subscriberMap.put(SubscriberMap::value_type(key, *q));
+
+ subscriberMap.put(key, *q);
}
}
-
txn.commit();
break;
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}
@@ -577,27 +597,37 @@ TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name)
{
try
{
- Freeze::TransactionHolder txn(_connection);
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txn(connection);
+
SubscriberRecordKey key;
key.topic = id;
SubscriberRecord rec;
rec.link = false;
rec.cost = 0;
- SubscriberMap::const_iterator q = _subscriberMap.find(key);
- if(q != _subscriberMap.end())
+
+ SubscriberMapWrapper subscriberMap(_databaseCache, connection);
+ try
{
+ subscriberMap.find(key);
throw ObserverInconsistencyException("topic exists: " + name);
}
- _subscriberMap.put(SubscriberMap::value_type(key, rec));
- _llumap.put(LLUMap::value_type("_manager", llu));
+ catch(const NotFoundException&)
+ {
+ }
+ subscriberMap.put(key, rec);
+
+ LLUWrapper llumap(_databaseCache, connection);
+ llumap.put(llu);
+
txn.commit();
break;
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}
@@ -664,34 +694,28 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content)
reap();
}
- // Reads are not synchronized and therefore must use a separate
- // connection.
- const Freeze::ConnectionPtr connection = Freeze::createConnection(_instance->communicator(),
- _instance->serviceName());
- const LLUMap llumap(connection, "llu");
+ DatabaseConnectionPtr connection = _databaseCache->newConnection();
for(;;)
{
try
{
content.clear();
- Freeze::TransactionHolder txn(connection);
for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
{
TopicContent rec = p->second->getContent();
content.push_back(rec);
}
-
- LLUMap::const_iterator ci = llumap.find("_manager");
- assert(ci != llumap.end());
- llu = ci->second;
+
+ LLUWrapper llumap(_databaseCache, connection);
+ llu = llumap.get();
break;
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}
@@ -701,22 +725,20 @@ TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content)
LogUpdate
TopicManagerImpl::getLastLogUpdate() const
{
- const Freeze::ConnectionPtr connection = Freeze::createConnection(
- _instance->communicator(), _instance->serviceName());
- const LLUMap llumap(connection, "llu");
+ DatabaseConnectionPtr connection = _databaseCache->newConnection();
for(;;)
{
try
{
- LLUMap::const_iterator ci = llumap.find("_manager");
- return ci->second;
+ LLUWrapper llumap(_databaseCache, connection);
+ return llumap.get();
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}
@@ -761,24 +783,26 @@ TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate&
{
content.clear();
- Freeze::TransactionHolder txn(_connection);
+ DatabaseConnectionPtr connection = _databaseCache->getConnection();
+ TransactionHolder txn(connection);
+
for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
{
TopicContent rec = p->second->getContent();
content.push_back(rec);
}
- LLUMap::iterator ci = _llumap.find("_manager");
- ci.set(llu);
+ LLUWrapper llumap(_databaseCache, connection);
+ llumap.put(llu);
txn.commit();
break;
}
- catch(const Freeze::DeadlockException&)
+ catch(const DeadlockException&)
{
continue;
}
- catch(const Freeze::DatabaseException& ex)
+ catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
}
@@ -868,7 +892,11 @@ TopicManagerImpl::installTopic(const string& name, const Ice::Identity& id, bool
}
// Create topic implementation
- TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers);
+#ifdef QTSQL
+ TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers, _databaseCache);
+#else
+ TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers, new IceStorm::DatabaseCache(_instance));
+#endif
// The identity is the name of the Topic.
_topics.insert(map<string, TopicImplPtr>::value_type(name, topicImpl));