summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicManagerI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicManagerI.cpp817
1 files changed, 729 insertions, 88 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp
index f5bc398b5d3..077a77e0f2a 100644
--- a/cpp/src/IceStorm/TopicManagerI.cpp
+++ b/cpp/src/IceStorm/TopicManagerI.cpp
@@ -10,19 +10,248 @@
#include <IceUtil/DisableWarnings.h>
#include <IceStorm/TopicManagerI.h>
#include <IceStorm/TopicI.h>
-#include <IceStorm/SubscriberPool.h>
-#include <IceStorm/BatchFlusher.h>
#include <IceStorm/TraceLevels.h>
#include <IceStorm/Instance.h>
-#include <Freeze/Initialize.h>
+#include <Freeze/Freeze.h>
+
+#include <IceStorm/NodeI.h>
+#include <IceStorm/Observers.h>
#include <Ice/SliceChecksums.h>
#include <functional>
-#include <ctype.h>
-using namespace IceStorm;
using namespace std;
+using namespace IceStorm;
+using namespace IceStormElection;
+
+namespace
+{
+
+void
+halt(const Ice::CommunicatorPtr& com, const Freeze::DatabaseException& ex)
+{
+ {
+ Ice::Error error(com->getLogger());
+ error << "fatal exception: " << ex << "\n*** Aborting application ***";
+ }
+
+ abort();
+}
+
+class TopicManagerI : public TopicManagerInternal
+{
+public:
+
+ TopicManagerI(const InstancePtr& instance, const TopicManagerImplPtr& impl) :
+ _instance(instance), _impl(impl)
+ {
+ }
+
+ ~TopicManagerI()
+ {
+ //cout << "~TopicManagerI" << endl;
+ }
+
+ virtual TopicPrx create(const string& id, const Ice::Current&)
+ {
+ while(true)
+ {
+ Ice::Long generation;
+ TopicManagerPrx master = getMaster(generation, __FILE__, __LINE__);
+ if(master)
+ {
+ try
+ {
+ return master->create(id);
+ }
+ catch(const Ice::ConnectFailedException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ catch(const Ice::TimeoutException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ }
+ else
+ {
+ FinishUpdateHelper unlock(_instance->node());
+ return _impl->create(id);
+ }
+ }
+ }
+
+ virtual TopicPrx retrieve(const string& id, const Ice::Current&) const
+ {
+ // Use cached reads.
+ CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
+ return _impl->retrieve(id);
+ }
+
+ virtual TopicDict retrieveAll(const Ice::Current&) const
+ {
+ // Use cached reads.
+ CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
+ return _impl->retrieveAll();
+ }
+
+ virtual Ice::SliceChecksumDict getSliceChecksums(const Ice::Current&) const
+ {
+ // This doesn't require the replication to be running.
+ return Ice::sliceChecksums();
+ }
+
+ virtual NodePrx getReplicaNode(const Ice::Current&) const
+ {
+ // This doesn't require the replication to be running.
+ return _instance->nodeProxy();
+ }
+
+private:
+
+ TopicManagerPrx getMaster(Ice::Long& generation, const char* file, int line) const
+ {
+ NodeIPtr node = _instance->node();
+ if(node)
+ {
+ return TopicManagerPrx::uncheckedCast(node->startUpdate(generation, file, line));
+ }
+ else
+ {
+ return TopicManagerPrx();
+ }
+ }
+
+ const InstancePtr _instance;
+ const TopicManagerImplPtr _impl;
+};
+
+class ReplicaObserverI : public ReplicaObserver
+{
+public:
+
+ ReplicaObserverI(const InstancePtr& instance, const TopicManagerImplPtr& impl) :
+ _instance(instance),
+ _impl(impl)
+ {
+ }
+
+ ~ReplicaObserverI()
+ {
+ //cout << "~ReplicaObserverI" << endl;
+ }
+
+ virtual void init(const LogUpdate& llu, const TopicContentSeq& content, const Ice::Current&)
+ {
+ NodeIPtr node = _instance->node();
+ if(node)
+ {
+ node->checkObserverInit(llu.generation);
+ }
+ _impl->observerInit(llu, content);
+ }
+
+ virtual void createTopic(const LogUpdate& llu, const string& name, const Ice::Current&)
+ {
+ try
+ {
+ ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
+ //cout << "createTopic: " << llu.generation << " node generation: " << unlock.generation() << endl;
+ _impl->observerCreateTopic(llu, name);
+ }
+ catch(const ObserverInconsistencyException& e)
+ {
+ Ice::Warning warn(_instance->traceLevels()->logger);
+ warn << "ReplicaObserverI::create: ObserverInconsistencyException: " << e.reason;
+ _instance->node()->recovery(llu.generation);
+ throw;
+ }
+ }
+
+ virtual void destroyTopic(const LogUpdate& llu, const string& name, const Ice::Current&)
+ {
+ try
+ {
+ ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
+ _impl->observerDestroyTopic(llu, name);
+ }
+ catch(const ObserverInconsistencyException& e)
+ {
+ Ice::Warning warn(_instance->traceLevels()->logger);
+ warn << "ReplicaObserverI::destroy: ObserverInconsistencyException: " << e.reason;
+ _instance->node()->recovery(llu.generation);
+ throw;
+ }
+ }
+
+ virtual void addSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& rec,
+ const Ice::Current&)
+ {
+ try
+ {
+ ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
+ _impl->observerAddSubscriber(llu, name, rec);
+ }
+ catch(const ObserverInconsistencyException& e)
+ {
+ Ice::Warning warn(_instance->traceLevels()->logger);
+ warn << "ReplicaObserverI::add: ObserverInconsistencyException: " << e.reason;
+ _instance->node()->recovery(llu.generation);
+ throw;
+ }
+ }
+
+ virtual void removeSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id,
+ const Ice::Current&)
+ {
+ try
+ {
+ ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
+ _impl->observerRemoveSubscriber(llu, name, id);
+ }
+ catch(const ObserverInconsistencyException& e)
+ {
+ Ice::Warning warn(_instance->traceLevels()->logger);
+ warn << "ReplicaObserverI::remove: ObserverInconsistencyException: " << e.reason;
+ _instance->node()->recovery(llu.generation);
+ throw;
+ }
+ }
+
+private:
+
+ const InstancePtr _instance;
+ const TopicManagerImplPtr _impl;
+};
+
+class TopicManagerSyncI : public TopicManagerSync
+{
+public:
+
+ TopicManagerSyncI(const TopicManagerImplPtr& impl) :
+ _impl(impl)
+ {
+ }
+
+ ~TopicManagerSyncI()
+ {
+ //cout << "~TopicManagerSyncI" << endl;
+ }
+
+ virtual void getContent(LogUpdate& llu, TopicContentSeq& content, const Ice::Current&)
+ {
+ _impl->getContent(llu, content);
+ }
+
+private:
+
+ const TopicManagerImplPtr _impl;
+};
+
+}
namespace IceStorm
{
@@ -44,40 +273,99 @@ identityToTopicName(const Ice::Identity& id)
return id.name.substr(6);
}
+Ice::Identity
+nameToIdentity(const InstancePtr& instance, const string& name)
+{
+ // Identity is instanceName>/topic.<topicname>
+ Ice::Identity id;
+ id.category = instance->instanceName();
+ id.name = "topic." + name;
+
+ return id;
+}
+
}
-TopicManagerI::TopicManagerI(
- const InstancePtr& instance,
- const Ice::ObjectAdapterPtr& topicAdapter,
- const string& envName,
- const string& dbName) :
+TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) :
+
_instance(instance),
- _topicAdapter(topicAdapter),
- _envName(envName),
- _dbName(dbName),
- _connection(Freeze::createConnection(instance->communicator(), envName)),
- _topics(_connection, dbName)
+ _connection(Freeze::createConnection(instance->communicator(), instance->serviceName())),
+ _llumap(_connection, "llu"),
+ _subscriberMap(_connection, "subscribers")
{
- //
- // Recreate each of the topics in the persistent map
- //
- for(PersistentTopicMap::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
+ try
+ {
+ __setNoDelete(true);
+
+ // TODO: If we want to improve the performance of the
+ // non-replicated case we could allocate a null-topic manager impl
+ // here.
+ _managerImpl = new TopicManagerI(instance, this);
+
+ Ice::PropertiesPtr properties = _instance->communicator()->getProperties();
+ // If there is no node adapter we don't need to start the
+ // observer, nor sync since we're not replicating.
+ if(_instance->nodeAdapter())
+ {
+ _observerImpl = new ReplicaObserverI(instance, this);
+ _observer = _instance->nodeAdapter()->addWithUUID(_observerImpl);
+ _syncImpl = new TopicManagerSyncI(this);
+ _sync = _instance->nodeAdapter()->addWithUUID(_syncImpl);
+ }
+
+ // Ensure that the llu counter is present in the log.
+ LLUMap::const_iterator ci = _llumap.find("_manager");
+ if(ci == _llumap.end())
+ {
+ LogUpdate empty = {0, 0};
+ _llumap.put(LLUMap::value_type("_manager", empty));
+ }
+
+ // Recreate each of the topics.
+ SubscriberMap::const_iterator p = _subscriberMap.begin();
+ while(p != _subscriberMap.end())
+ {
+ // This record has to be a place holder record, otherwise
+ // there is a database bug.
+ assert(p->first.id.name.empty() && p->first.id.category.empty());
+
+ Ice::Identity topic = p->first.topic;
+
+ // Skip the place holder.
+ ++p;
+
+ SubscriberRecordSeq content;
+ while(p != _subscriberMap.end() && p->first.topic == topic)
+ {
+ content.push_back(p->second);
+ ++p;
+ }
+
+ string name = identityToTopicName(topic);
+ installTopic(name, topic, false, content);
+ }
+ }
+ catch(...)
{
- installTopic(identityToTopicName(p->first), p->first, p->second, false);
+ shutdown();
+ __setNoDelete(false);
+ throw;
}
+ __setNoDelete(false);
}
-TopicManagerI::~TopicManagerI()
+TopicManagerImpl::~TopicManagerImpl()
{
+ //cout << "~TopicManagerImpl" << endl;
}
TopicPrx
-TopicManagerI::create(const string& name, const Ice::Current&)
+TopicManagerImpl::create(const string& name)
{
- IceUtil::Mutex::Lock sync(*this);
+ Lock sync(*this);
reap();
- if(_topicIMap.find(name) != _topicIMap.end())
+ if(_topics.find(name) != _topics.end())
{
TopicExists ex;
ex.name = name;
@@ -85,122 +373,479 @@ TopicManagerI::create(const string& name, const Ice::Current&)
}
// Identity is instanceName>/topic.<topicname>
- Ice::Identity id;
- id.category = _instance->instanceName();
- id.name = "topic." + name;
+ Ice::Identity id = nameToIdentity(_instance, name);
+ LogUpdate llu;
- _topics.put(PersistentTopicMap::value_type(id, LinkRecordSeq()));
+ for(;;)
+ {
+ try
+ {
- return installTopic(name, id, LinkRecordSeq(), true);
+ Freeze::TransactionHolder txn(_connection);
+ SubscriberRecordKey key;
+ key.topic = id;
+ SubscriberRecord rec;
+ rec.link = false;
+ rec.cost = 0;
+ _subscriberMap.put(SubscriberMap::value_type(key, rec));
+ LLUMap::iterator ci = _llumap.find("_manager");
+ assert(ci != _llumap.end());
+ llu = ci->second;
+ llu.iteration++;
+ ci.set(llu);
+
+ txn.commit();
+ break;
+ }
+ catch(const Freeze::DeadlockException&)
+ {
+ continue;
+ }
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
+ }
+
+ _instance->observers()->createTopic(llu, name);
+
+ return installTopic(name, id, true);
}
TopicPrx
-TopicManagerI::retrieve(const string& name, const Ice::Current&) const
+TopicManagerImpl::retrieve(const string& name) const
{
- IceUtil::Mutex::Lock sync(*this);
+ Lock sync(*this);
- TopicManagerI* This = const_cast<TopicManagerI*>(this);
+ TopicManagerImpl* This = const_cast<TopicManagerImpl*>(this);
This->reap();
- TopicIMap::const_iterator p = _topicIMap.find(name);
- if(p == _topicIMap.end())
+ map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
+ if(p == _topics.end())
{
NoSuchTopic ex;
ex.name = name;
throw ex;
}
- // Here we cannot just reconstruct the identity since the
- // identity could be either instanceName/topic name, or if
- // created with pre-3.2 IceStorm / topic name.
- return TopicPrx::uncheckedCast(_topicAdapter->createProxy(p->second->id()));
+ return p->second->proxy();
}
TopicDict
-TopicManagerI::retrieveAll(const Ice::Current&) const
+TopicManagerImpl::retrieveAll() const
{
- IceUtil::Mutex::Lock sync(*this);
+ Lock sync(*this);
- TopicManagerI* This = const_cast<TopicManagerI*>(this);
+ TopicManagerImpl* This = const_cast<TopicManagerImpl*>(this);
This->reap();
TopicDict all;
- for(TopicIMap::const_iterator p = _topicIMap.begin(); p != _topicIMap.end(); ++p)
+ for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
{
- //
- // Here we cannot just reconstruct the identity since the
- // identity could be either "<instanceName>/topic.<topicname>"
- // name, or if created with pre-3.2 IceStorm "/<topicname>".
- //
- all.insert(TopicDict::value_type(
- p->first, TopicPrx::uncheckedCast(_topicAdapter->createProxy(p->second->id()))));
+ all.insert(TopicDict::value_type(p->first, p->second->proxy()));
}
return all;
}
-Ice::SliceChecksumDict
-TopicManagerI::getSliceChecksums(const Ice::Current&) const
+void
+TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& content)
{
- return Ice::sliceChecksums();
+ Lock sync(*this);
+
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topicMgr > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
+ out << "init";
+ for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
+ {
+ out << " topic: " << _instance->communicator()->identityToString(p->id) << " subscribers: ";
+ for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q)
+ {
+ if(q != p->records.begin())
+ {
+ out << ",";
+ }
+ out << _instance->communicator()->identityToString(q->id);
+ }
+ }
+ }
+
+ // First we update the database state, and then we update our
+ // internal state.
+ for(;;)
+ {
+ try
+ {
+ Freeze::TransactionHolder txn(_connection);
+ _subscriberMap.clear();
+
+ _llumap.put(LLUMap::value_type("_manager", llu));
+
+ for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
+ {
+ SubscriberRecordKey key;
+ key.topic = p->id;
+ SubscriberRecord rec;
+ rec.link = false;
+ rec.cost = 0;
+ _subscriberMap.put(SubscriberMap::value_type(key, rec));
+ for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q)
+ {
+ SubscriberRecordKey key;
+ key.topic = p->id;
+ key.id = q->id;
+ _subscriberMap.put(SubscriberMap::value_type(key, *q));
+ }
+ }
+
+ txn.commit();
+ break;
+ }
+ catch(const Freeze::DeadlockException&)
+ {
+ continue;
+ }
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
+ }
+
+ // We do this with two scans. The first runs through the topics
+ // that we have and removes those not in the init list. The second
+ // runs through the init list and either adds the ones that don't
+ // exist, or updates those that do.
+
+ map<string, TopicImplPtr>::iterator p = _topics.begin();
+ while(p != _topics.end())
+ {
+ TopicContentSeq::const_iterator q;
+ for(q = content.begin(); q != content.end(); ++q)
+ {
+ if(q->id == p->second->id())
+ {
+ break;
+ }
+ }
+
+ if(q == content.end())
+ {
+ // Note that this destroy should not remove anything from
+ // the database since we've already synced up the db
+ // state.
+ //
+ // TODO: We could short circuit the database operations in
+ // the topic by calling a third form of destroy.
+ p->second->observerDestroyTopic(llu);
+ _topics.erase(p++);
+ }
+ else
+ {
+ ++p;
+ }
+ }
+
+ // Now run through the contents updating the topics that do exist,
+ // and creating those that do not.
+ for(TopicContentSeq::const_iterator q = content.begin(); q != content.end(); ++q)
+ {
+ string name = identityToTopicName(q->id);
+ map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
+ if(p == _topics.end())
+ {
+ installTopic(name, q->id, true, q->records);
+ }
+ else
+ {
+ p->second->update(q->records);
+ }
+ }
+ // Clear the set of observers.
+ _instance->observers()->clear();
}
void
-TopicManagerI::reap()
+TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name)
{
- //
- // Always called with mutex locked.
- //
- // IceUtil::Mutex::Lock sync(*this);
- //
- TopicIMap::iterator i = _topicIMap.begin();
- while(i != _topicIMap.end())
+ Lock sync(*this);
+ Ice::Identity id = nameToIdentity(_instance, name);
+
+ for(;;)
{
- if(i->second->destroyed())
+ try
{
- Ice::Identity id = i->second->id();
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topicMgr > 0)
+ Freeze::TransactionHolder txn(_connection);
+ SubscriberRecordKey key;
+ key.topic = id;
+ SubscriberRecord rec;
+ rec.link = false;
+ rec.cost = 0;
+ SubscriberMap::const_iterator q = _subscriberMap.find(key);
+ if(q != _subscriberMap.end())
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
- out << "Reaping " << i->first;
+ throw ObserverInconsistencyException("topic exists: " + name);
}
+ _subscriberMap.put(SubscriberMap::value_type(key, rec));
+ _llumap.put(LLUMap::value_type("_manager", llu));
+ txn.commit();
+ break;
+ }
+ catch(const Freeze::DeadlockException&)
+ {
+ continue;
+ }
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
+ }
+ installTopic(name, id, true);
+}
+
+void
+TopicManagerImpl::observerDestroyTopic(const LogUpdate& llu, const string& name)
+{
+ Lock sync(*this);
+
+ map<string, TopicImplPtr>::iterator q = _topics.find(name);
+ if(q == _topics.end())
+ {
+ throw ObserverInconsistencyException("no topic: " + name);
+ }
+ q->second->observerDestroyTopic(llu);
+
+ _topics.erase(q);
+}
- _topics.erase(id);
+void
+TopicManagerImpl::observerAddSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& record)
+{
+ TopicImplPtr topic;
+ {
+ Lock sync(*this);
+
+ map<string, TopicImplPtr>::iterator q = _topics.find(name);
+ if(q == _topics.end())
+ {
+ throw ObserverInconsistencyException("no topic: " + name);
+ }
+ assert(q != _topics.end());
+ topic = q->second;
+ }
+ topic->observerAddSubscriber(llu, record);
+}
- try
+void
+TopicManagerImpl::observerRemoveSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id)
+{
+ TopicImplPtr topic;
+ {
+ Lock sync(*this);
+
+ map<string, TopicImplPtr>::iterator q = _topics.find(name);
+ if(q == _topics.end())
+ {
+ throw ObserverInconsistencyException("no topic: " + name);
+ }
+ assert(q != _topics.end());
+ topic = q->second;
+ }
+ topic->observerRemoveSubscriber(llu, id);
+}
+
+void
+TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content)
+{
+ {
+ Lock sync(*this);
+ reap();
+ }
+
+ // Reads are not synchronized and therefore must use a separate
+ // connection.
+ const Freeze::ConnectionPtr connection = Freeze::createConnection(_instance->communicator(),
+ _instance->serviceName());
+ const LLUMap llumap(connection, "llu");
+
+ for(;;)
+ {
+ try
+ {
+ content.clear();
+ Freeze::TransactionHolder txn(connection);
+ for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
{
- _topicAdapter->remove(id);
+ TopicContent rec = p->second->getContent();
+ content.push_back(rec);
}
- catch(const Ice::ObjectAdapterDeactivatedException&)
+
+ LLUMap::const_iterator ci = llumap.find("_manager");
+ assert(ci != llumap.end());
+ llu = ci->second;
+ break;
+ }
+ catch(const Freeze::DeadlockException&)
+ {
+ continue;
+ }
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
+ }
+}
+
+LogUpdate
+TopicManagerImpl::getLastLogUpdate() const
+{
+ const Freeze::ConnectionPtr connection = Freeze::createConnection(
+ _instance->communicator(), _instance->serviceName());
+ const LLUMap llumap(connection, "llu");
+
+ for(;;)
+ {
+ try
+ {
+ LLUMap::const_iterator ci = llumap.find("_manager");
+ return ci->second;
+ }
+ catch(const Freeze::DeadlockException&)
+ {
+ continue;
+ }
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
+ }
+}
+
+void
+TopicManagerImpl::sync(const Ice::ObjectPrx& master)
+{
+ TopicManagerSyncPrx sync = TopicManagerSyncPrx::uncheckedCast(master);
+
+ LogUpdate llu;
+ TopicContentSeq content;
+ sync->getContent(llu, content);
+
+ observerInit(llu, content);
+}
+
+void
+TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate& llu)
+{
+ Lock sync(*this);
+
+ reap();
+
+ TopicContentSeq content;
+
+ // Update the database llu. This prevents the following case:
+ //
+ // Three replicas 1, 2, 3. 3 is the master. It accepts a change
+ // (say A=10, old value 9), writes to disk and then crashes. Now 2
+ // becomes the master. The client can ask this master for A and it
+ // returns 9. Now 3 comes back online, it has the last database
+ // state, so it syncs this state with 1, 2. The client will now
+ // magically get A==10. The solution here is when a new master is
+ // elected and gets the latest database state it immediately
+ // updates the llu stamp.
+ //
+ for(;;)
+ {
+ try
+ {
+ content.clear();
+
+ Freeze::TransactionHolder txn(_connection);
+ for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
{
- // Ignore
+ TopicContent rec = p->second->getContent();
+ content.push_back(rec);
}
- _topicIMap.erase(i++);
+ LLUMap::iterator ci = _llumap.find("_manager");
+ ci.set(llu);
+
+ txn.commit();
+ break;
}
- else
+ catch(const Freeze::DeadlockException&)
{
- ++i;
+ continue;
}
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
}
+
+ // Now initialize the observers.
+ _instance->observers()->init(slaves, llu, content);
+}
+
+Ice::ObjectPrx
+TopicManagerImpl::getObserver() const
+{
+ return _observer;
+}
+
+Ice::ObjectPrx
+TopicManagerImpl::getSync() const
+{
+ return _sync;
}
void
-TopicManagerI::shutdown()
+TopicManagerImpl::reap()
{
- IceUtil::Mutex::Lock sync(*this);
+ //
+ // Always called with mutex locked.
+ //
+ // Lock sync(*this);
+ //
+ map<string, TopicImplPtr>::iterator p = _topics.begin();
+ while(p != _topics.end())
+ {
+ if(p->second->destroyed())
+ {
+ _topics.erase(p++);
+ }
+ else
+ {
+ ++p;
+ }
+ }
+}
- reap();
+void
+TopicManagerImpl::shutdown()
+{
+ Lock sync(*this);
- for(TopicIMap::const_iterator p = _topicIMap.begin(); p != _topicIMap.end(); ++p)
+ for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
{
- p->second->reap();
+ p->second->shutdown();
}
+ _topics.clear();
+
+ _observerImpl = 0;
+ _syncImpl = 0;
+ _managerImpl = 0;
+}
+
+Ice::ObjectPtr
+TopicManagerImpl::getServant() const
+{
+ return _managerImpl;
}
TopicPrx
-TopicManagerI::installTopic(const string& name, const Ice::Identity& id, const LinkRecordSeq& rec, bool create)
+TopicManagerImpl::installTopic(const string& name, const Ice::Identity& id, bool create,
+ const IceStorm::SubscriberRecordSeq& subscribers)
{
//
// Called by constructor or with 'this' mutex locked.
@@ -221,15 +866,11 @@ TopicManagerI::installTopic(const string& name, const Ice::Identity& id, const L
}
}
- //
// Create topic implementation
- //
- TopicIPtr topicI = new TopicI(_instance, name, id, rec, _envName, _dbName);
+ TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers);
- //
// The identity is the name of the Topic.
- //
- TopicPrx prx = TopicPrx::uncheckedCast(_topicAdapter->add(topicI, id));
- _topicIMap.insert(TopicIMap::value_type(name, topicI));
- return prx;
+ _topics.insert(map<string, TopicImplPtr>::value_type(name, topicImpl));
+ _instance->topicAdapter()->add(topicImpl->getServant(), id);
+ return topicImpl->proxy();
}