diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.cpp | 817 |
1 files changed, 729 insertions, 88 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp index f5bc398b5d3..077a77e0f2a 100644 --- a/cpp/src/IceStorm/TopicManagerI.cpp +++ b/cpp/src/IceStorm/TopicManagerI.cpp @@ -10,19 +10,248 @@ #include <IceUtil/DisableWarnings.h> #include <IceStorm/TopicManagerI.h> #include <IceStorm/TopicI.h> -#include <IceStorm/SubscriberPool.h> -#include <IceStorm/BatchFlusher.h> #include <IceStorm/TraceLevels.h> #include <IceStorm/Instance.h> -#include <Freeze/Initialize.h> +#include <Freeze/Freeze.h> + +#include <IceStorm/NodeI.h> +#include <IceStorm/Observers.h> #include <Ice/SliceChecksums.h> #include <functional> -#include <ctype.h> -using namespace IceStorm; using namespace std; +using namespace IceStorm; +using namespace IceStormElection; + +namespace +{ + +void +halt(const Ice::CommunicatorPtr& com, const Freeze::DatabaseException& ex) +{ + { + Ice::Error error(com->getLogger()); + error << "fatal exception: " << ex << "\n*** Aborting application ***"; + } + + abort(); +} + +class TopicManagerI : public TopicManagerInternal +{ +public: + + TopicManagerI(const InstancePtr& instance, const TopicManagerImplPtr& impl) : + _instance(instance), _impl(impl) + { + } + + ~TopicManagerI() + { + //cout << "~TopicManagerI" << endl; + } + + virtual TopicPrx create(const string& id, const Ice::Current&) + { + while(true) + { + Ice::Long generation; + TopicManagerPrx master = getMaster(generation, __FILE__, __LINE__); + if(master) + { + try + { + return master->create(id); + } + catch(const Ice::ConnectFailedException&) + { + _instance->node()->recovery(generation); + continue; + } + catch(const Ice::TimeoutException&) + { + _instance->node()->recovery(generation); + continue; + } + } + else + { + FinishUpdateHelper unlock(_instance->node()); + return _impl->create(id); + } + } + } + + virtual TopicPrx retrieve(const string& id, const Ice::Current&) const + { + // Use cached reads. + CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); + return _impl->retrieve(id); + } + + virtual TopicDict retrieveAll(const Ice::Current&) const + { + // Use cached reads. + CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); + return _impl->retrieveAll(); + } + + virtual Ice::SliceChecksumDict getSliceChecksums(const Ice::Current&) const + { + // This doesn't require the replication to be running. + return Ice::sliceChecksums(); + } + + virtual NodePrx getReplicaNode(const Ice::Current&) const + { + // This doesn't require the replication to be running. + return _instance->nodeProxy(); + } + +private: + + TopicManagerPrx getMaster(Ice::Long& generation, const char* file, int line) const + { + NodeIPtr node = _instance->node(); + if(node) + { + return TopicManagerPrx::uncheckedCast(node->startUpdate(generation, file, line)); + } + else + { + return TopicManagerPrx(); + } + } + + const InstancePtr _instance; + const TopicManagerImplPtr _impl; +}; + +class ReplicaObserverI : public ReplicaObserver +{ +public: + + ReplicaObserverI(const InstancePtr& instance, const TopicManagerImplPtr& impl) : + _instance(instance), + _impl(impl) + { + } + + ~ReplicaObserverI() + { + //cout << "~ReplicaObserverI" << endl; + } + + virtual void init(const LogUpdate& llu, const TopicContentSeq& content, const Ice::Current&) + { + NodeIPtr node = _instance->node(); + if(node) + { + node->checkObserverInit(llu.generation); + } + _impl->observerInit(llu, content); + } + + virtual void createTopic(const LogUpdate& llu, const string& name, const Ice::Current&) + { + try + { + ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__); + //cout << "createTopic: " << llu.generation << " node generation: " << unlock.generation() << endl; + _impl->observerCreateTopic(llu, name); + } + catch(const ObserverInconsistencyException& e) + { + Ice::Warning warn(_instance->traceLevels()->logger); + warn << "ReplicaObserverI::create: ObserverInconsistencyException: " << e.reason; + _instance->node()->recovery(llu.generation); + throw; + } + } + + virtual void destroyTopic(const LogUpdate& llu, const string& name, const Ice::Current&) + { + try + { + ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__); + _impl->observerDestroyTopic(llu, name); + } + catch(const ObserverInconsistencyException& e) + { + Ice::Warning warn(_instance->traceLevels()->logger); + warn << "ReplicaObserverI::destroy: ObserverInconsistencyException: " << e.reason; + _instance->node()->recovery(llu.generation); + throw; + } + } + + virtual void addSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& rec, + const Ice::Current&) + { + try + { + ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__); + _impl->observerAddSubscriber(llu, name, rec); + } + catch(const ObserverInconsistencyException& e) + { + Ice::Warning warn(_instance->traceLevels()->logger); + warn << "ReplicaObserverI::add: ObserverInconsistencyException: " << e.reason; + _instance->node()->recovery(llu.generation); + throw; + } + } + + virtual void removeSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id, + const Ice::Current&) + { + try + { + ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__); + _impl->observerRemoveSubscriber(llu, name, id); + } + catch(const ObserverInconsistencyException& e) + { + Ice::Warning warn(_instance->traceLevels()->logger); + warn << "ReplicaObserverI::remove: ObserverInconsistencyException: " << e.reason; + _instance->node()->recovery(llu.generation); + throw; + } + } + +private: + + const InstancePtr _instance; + const TopicManagerImplPtr _impl; +}; + +class TopicManagerSyncI : public TopicManagerSync +{ +public: + + TopicManagerSyncI(const TopicManagerImplPtr& impl) : + _impl(impl) + { + } + + ~TopicManagerSyncI() + { + //cout << "~TopicManagerSyncI" << endl; + } + + virtual void getContent(LogUpdate& llu, TopicContentSeq& content, const Ice::Current&) + { + _impl->getContent(llu, content); + } + +private: + + const TopicManagerImplPtr _impl; +}; + +} namespace IceStorm { @@ -44,40 +273,99 @@ identityToTopicName(const Ice::Identity& id) return id.name.substr(6); } +Ice::Identity +nameToIdentity(const InstancePtr& instance, const string& name) +{ + // Identity is instanceName>/topic.<topicname> + Ice::Identity id; + id.category = instance->instanceName(); + id.name = "topic." + name; + + return id; +} + } -TopicManagerI::TopicManagerI( - const InstancePtr& instance, - const Ice::ObjectAdapterPtr& topicAdapter, - const string& envName, - const string& dbName) : +TopicManagerImpl::TopicManagerImpl(const InstancePtr& instance) : + _instance(instance), - _topicAdapter(topicAdapter), - _envName(envName), - _dbName(dbName), - _connection(Freeze::createConnection(instance->communicator(), envName)), - _topics(_connection, dbName) + _connection(Freeze::createConnection(instance->communicator(), instance->serviceName())), + _llumap(_connection, "llu"), + _subscriberMap(_connection, "subscribers") { - // - // Recreate each of the topics in the persistent map - // - for(PersistentTopicMap::const_iterator p = _topics.begin(); p != _topics.end(); ++p) + try + { + __setNoDelete(true); + + // TODO: If we want to improve the performance of the + // non-replicated case we could allocate a null-topic manager impl + // here. + _managerImpl = new TopicManagerI(instance, this); + + Ice::PropertiesPtr properties = _instance->communicator()->getProperties(); + // If there is no node adapter we don't need to start the + // observer, nor sync since we're not replicating. + if(_instance->nodeAdapter()) + { + _observerImpl = new ReplicaObserverI(instance, this); + _observer = _instance->nodeAdapter()->addWithUUID(_observerImpl); + _syncImpl = new TopicManagerSyncI(this); + _sync = _instance->nodeAdapter()->addWithUUID(_syncImpl); + } + + // Ensure that the llu counter is present in the log. + LLUMap::const_iterator ci = _llumap.find("_manager"); + if(ci == _llumap.end()) + { + LogUpdate empty = {0, 0}; + _llumap.put(LLUMap::value_type("_manager", empty)); + } + + // Recreate each of the topics. + SubscriberMap::const_iterator p = _subscriberMap.begin(); + while(p != _subscriberMap.end()) + { + // This record has to be a place holder record, otherwise + // there is a database bug. + assert(p->first.id.name.empty() && p->first.id.category.empty()); + + Ice::Identity topic = p->first.topic; + + // Skip the place holder. + ++p; + + SubscriberRecordSeq content; + while(p != _subscriberMap.end() && p->first.topic == topic) + { + content.push_back(p->second); + ++p; + } + + string name = identityToTopicName(topic); + installTopic(name, topic, false, content); + } + } + catch(...) { - installTopic(identityToTopicName(p->first), p->first, p->second, false); + shutdown(); + __setNoDelete(false); + throw; } + __setNoDelete(false); } -TopicManagerI::~TopicManagerI() +TopicManagerImpl::~TopicManagerImpl() { + //cout << "~TopicManagerImpl" << endl; } TopicPrx -TopicManagerI::create(const string& name, const Ice::Current&) +TopicManagerImpl::create(const string& name) { - IceUtil::Mutex::Lock sync(*this); + Lock sync(*this); reap(); - if(_topicIMap.find(name) != _topicIMap.end()) + if(_topics.find(name) != _topics.end()) { TopicExists ex; ex.name = name; @@ -85,122 +373,479 @@ TopicManagerI::create(const string& name, const Ice::Current&) } // Identity is instanceName>/topic.<topicname> - Ice::Identity id; - id.category = _instance->instanceName(); - id.name = "topic." + name; + Ice::Identity id = nameToIdentity(_instance, name); + LogUpdate llu; - _topics.put(PersistentTopicMap::value_type(id, LinkRecordSeq())); + for(;;) + { + try + { - return installTopic(name, id, LinkRecordSeq(), true); + Freeze::TransactionHolder txn(_connection); + SubscriberRecordKey key; + key.topic = id; + SubscriberRecord rec; + rec.link = false; + rec.cost = 0; + _subscriberMap.put(SubscriberMap::value_type(key, rec)); + LLUMap::iterator ci = _llumap.find("_manager"); + assert(ci != _llumap.end()); + llu = ci->second; + llu.iteration++; + ci.set(llu); + + txn.commit(); + break; + } + catch(const Freeze::DeadlockException&) + { + continue; + } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } + } + + _instance->observers()->createTopic(llu, name); + + return installTopic(name, id, true); } TopicPrx -TopicManagerI::retrieve(const string& name, const Ice::Current&) const +TopicManagerImpl::retrieve(const string& name) const { - IceUtil::Mutex::Lock sync(*this); + Lock sync(*this); - TopicManagerI* This = const_cast<TopicManagerI*>(this); + TopicManagerImpl* This = const_cast<TopicManagerImpl*>(this); This->reap(); - TopicIMap::const_iterator p = _topicIMap.find(name); - if(p == _topicIMap.end()) + map<string, TopicImplPtr>::const_iterator p = _topics.find(name); + if(p == _topics.end()) { NoSuchTopic ex; ex.name = name; throw ex; } - // Here we cannot just reconstruct the identity since the - // identity could be either instanceName/topic name, or if - // created with pre-3.2 IceStorm / topic name. - return TopicPrx::uncheckedCast(_topicAdapter->createProxy(p->second->id())); + return p->second->proxy(); } TopicDict -TopicManagerI::retrieveAll(const Ice::Current&) const +TopicManagerImpl::retrieveAll() const { - IceUtil::Mutex::Lock sync(*this); + Lock sync(*this); - TopicManagerI* This = const_cast<TopicManagerI*>(this); + TopicManagerImpl* This = const_cast<TopicManagerImpl*>(this); This->reap(); TopicDict all; - for(TopicIMap::const_iterator p = _topicIMap.begin(); p != _topicIMap.end(); ++p) + for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) { - // - // Here we cannot just reconstruct the identity since the - // identity could be either "<instanceName>/topic.<topicname>" - // name, or if created with pre-3.2 IceStorm "/<topicname>". - // - all.insert(TopicDict::value_type( - p->first, TopicPrx::uncheckedCast(_topicAdapter->createProxy(p->second->id())))); + all.insert(TopicDict::value_type(p->first, p->second->proxy())); } return all; } -Ice::SliceChecksumDict -TopicManagerI::getSliceChecksums(const Ice::Current&) const +void +TopicManagerImpl::observerInit(const LogUpdate& llu, const TopicContentSeq& content) { - return Ice::sliceChecksums(); + Lock sync(*this); + + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topicMgr > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat); + out << "init"; + for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p) + { + out << " topic: " << _instance->communicator()->identityToString(p->id) << " subscribers: "; + for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q) + { + if(q != p->records.begin()) + { + out << ","; + } + out << _instance->communicator()->identityToString(q->id); + } + } + } + + // First we update the database state, and then we update our + // internal state. + for(;;) + { + try + { + Freeze::TransactionHolder txn(_connection); + _subscriberMap.clear(); + + _llumap.put(LLUMap::value_type("_manager", llu)); + + for(TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p) + { + SubscriberRecordKey key; + key.topic = p->id; + SubscriberRecord rec; + rec.link = false; + rec.cost = 0; + _subscriberMap.put(SubscriberMap::value_type(key, rec)); + for(SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q) + { + SubscriberRecordKey key; + key.topic = p->id; + key.id = q->id; + _subscriberMap.put(SubscriberMap::value_type(key, *q)); + } + } + + txn.commit(); + break; + } + catch(const Freeze::DeadlockException&) + { + continue; + } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } + } + + // We do this with two scans. The first runs through the topics + // that we have and removes those not in the init list. The second + // runs through the init list and either adds the ones that don't + // exist, or updates those that do. + + map<string, TopicImplPtr>::iterator p = _topics.begin(); + while(p != _topics.end()) + { + TopicContentSeq::const_iterator q; + for(q = content.begin(); q != content.end(); ++q) + { + if(q->id == p->second->id()) + { + break; + } + } + + if(q == content.end()) + { + // Note that this destroy should not remove anything from + // the database since we've already synced up the db + // state. + // + // TODO: We could short circuit the database operations in + // the topic by calling a third form of destroy. + p->second->observerDestroyTopic(llu); + _topics.erase(p++); + } + else + { + ++p; + } + } + + // Now run through the contents updating the topics that do exist, + // and creating those that do not. + for(TopicContentSeq::const_iterator q = content.begin(); q != content.end(); ++q) + { + string name = identityToTopicName(q->id); + map<string, TopicImplPtr>::const_iterator p = _topics.find(name); + if(p == _topics.end()) + { + installTopic(name, q->id, true, q->records); + } + else + { + p->second->update(q->records); + } + } + // Clear the set of observers. + _instance->observers()->clear(); } void -TopicManagerI::reap() +TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name) { - // - // Always called with mutex locked. - // - // IceUtil::Mutex::Lock sync(*this); - // - TopicIMap::iterator i = _topicIMap.begin(); - while(i != _topicIMap.end()) + Lock sync(*this); + Ice::Identity id = nameToIdentity(_instance, name); + + for(;;) { - if(i->second->destroyed()) + try { - Ice::Identity id = i->second->id(); - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topicMgr > 0) + Freeze::TransactionHolder txn(_connection); + SubscriberRecordKey key; + key.topic = id; + SubscriberRecord rec; + rec.link = false; + rec.cost = 0; + SubscriberMap::const_iterator q = _subscriberMap.find(key); + if(q != _subscriberMap.end()) { - Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat); - out << "Reaping " << i->first; + throw ObserverInconsistencyException("topic exists: " + name); } + _subscriberMap.put(SubscriberMap::value_type(key, rec)); + _llumap.put(LLUMap::value_type("_manager", llu)); + txn.commit(); + break; + } + catch(const Freeze::DeadlockException&) + { + continue; + } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } + } + installTopic(name, id, true); +} + +void +TopicManagerImpl::observerDestroyTopic(const LogUpdate& llu, const string& name) +{ + Lock sync(*this); + + map<string, TopicImplPtr>::iterator q = _topics.find(name); + if(q == _topics.end()) + { + throw ObserverInconsistencyException("no topic: " + name); + } + q->second->observerDestroyTopic(llu); + + _topics.erase(q); +} - _topics.erase(id); +void +TopicManagerImpl::observerAddSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& record) +{ + TopicImplPtr topic; + { + Lock sync(*this); + + map<string, TopicImplPtr>::iterator q = _topics.find(name); + if(q == _topics.end()) + { + throw ObserverInconsistencyException("no topic: " + name); + } + assert(q != _topics.end()); + topic = q->second; + } + topic->observerAddSubscriber(llu, record); +} - try +void +TopicManagerImpl::observerRemoveSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id) +{ + TopicImplPtr topic; + { + Lock sync(*this); + + map<string, TopicImplPtr>::iterator q = _topics.find(name); + if(q == _topics.end()) + { + throw ObserverInconsistencyException("no topic: " + name); + } + assert(q != _topics.end()); + topic = q->second; + } + topic->observerRemoveSubscriber(llu, id); +} + +void +TopicManagerImpl::getContent(LogUpdate& llu, TopicContentSeq& content) +{ + { + Lock sync(*this); + reap(); + } + + // Reads are not synchronized and therefore must use a separate + // connection. + const Freeze::ConnectionPtr connection = Freeze::createConnection(_instance->communicator(), + _instance->serviceName()); + const LLUMap llumap(connection, "llu"); + + for(;;) + { + try + { + content.clear(); + Freeze::TransactionHolder txn(connection); + for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) { - _topicAdapter->remove(id); + TopicContent rec = p->second->getContent(); + content.push_back(rec); } - catch(const Ice::ObjectAdapterDeactivatedException&) + + LLUMap::const_iterator ci = llumap.find("_manager"); + assert(ci != llumap.end()); + llu = ci->second; + break; + } + catch(const Freeze::DeadlockException&) + { + continue; + } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } + } +} + +LogUpdate +TopicManagerImpl::getLastLogUpdate() const +{ + const Freeze::ConnectionPtr connection = Freeze::createConnection( + _instance->communicator(), _instance->serviceName()); + const LLUMap llumap(connection, "llu"); + + for(;;) + { + try + { + LLUMap::const_iterator ci = llumap.find("_manager"); + return ci->second; + } + catch(const Freeze::DeadlockException&) + { + continue; + } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } + } +} + +void +TopicManagerImpl::sync(const Ice::ObjectPrx& master) +{ + TopicManagerSyncPrx sync = TopicManagerSyncPrx::uncheckedCast(master); + + LogUpdate llu; + TopicContentSeq content; + sync->getContent(llu, content); + + observerInit(llu, content); +} + +void +TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate& llu) +{ + Lock sync(*this); + + reap(); + + TopicContentSeq content; + + // Update the database llu. This prevents the following case: + // + // Three replicas 1, 2, 3. 3 is the master. It accepts a change + // (say A=10, old value 9), writes to disk and then crashes. Now 2 + // becomes the master. The client can ask this master for A and it + // returns 9. Now 3 comes back online, it has the last database + // state, so it syncs this state with 1, 2. The client will now + // magically get A==10. The solution here is when a new master is + // elected and gets the latest database state it immediately + // updates the llu stamp. + // + for(;;) + { + try + { + content.clear(); + + Freeze::TransactionHolder txn(_connection); + for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) { - // Ignore + TopicContent rec = p->second->getContent(); + content.push_back(rec); } - _topicIMap.erase(i++); + LLUMap::iterator ci = _llumap.find("_manager"); + ci.set(llu); + + txn.commit(); + break; } - else + catch(const Freeze::DeadlockException&) { - ++i; + continue; } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } } + + // Now initialize the observers. + _instance->observers()->init(slaves, llu, content); +} + +Ice::ObjectPrx +TopicManagerImpl::getObserver() const +{ + return _observer; +} + +Ice::ObjectPrx +TopicManagerImpl::getSync() const +{ + return _sync; } void -TopicManagerI::shutdown() +TopicManagerImpl::reap() { - IceUtil::Mutex::Lock sync(*this); + // + // Always called with mutex locked. + // + // Lock sync(*this); + // + map<string, TopicImplPtr>::iterator p = _topics.begin(); + while(p != _topics.end()) + { + if(p->second->destroyed()) + { + _topics.erase(p++); + } + else + { + ++p; + } + } +} - reap(); +void +TopicManagerImpl::shutdown() +{ + Lock sync(*this); - for(TopicIMap::const_iterator p = _topicIMap.begin(); p != _topicIMap.end(); ++p) + for(map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) { - p->second->reap(); + p->second->shutdown(); } + _topics.clear(); + + _observerImpl = 0; + _syncImpl = 0; + _managerImpl = 0; +} + +Ice::ObjectPtr +TopicManagerImpl::getServant() const +{ + return _managerImpl; } TopicPrx -TopicManagerI::installTopic(const string& name, const Ice::Identity& id, const LinkRecordSeq& rec, bool create) +TopicManagerImpl::installTopic(const string& name, const Ice::Identity& id, bool create, + const IceStorm::SubscriberRecordSeq& subscribers) { // // Called by constructor or with 'this' mutex locked. @@ -221,15 +866,11 @@ TopicManagerI::installTopic(const string& name, const Ice::Identity& id, const L } } - // // Create topic implementation - // - TopicIPtr topicI = new TopicI(_instance, name, id, rec, _envName, _dbName); + TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers); - // // The identity is the name of the Topic. - // - TopicPrx prx = TopicPrx::uncheckedCast(_topicAdapter->add(topicI, id)); - _topicIMap.insert(TopicIMap::value_type(name, topicI)); - return prx; + _topics.insert(map<string, TopicImplPtr>::value_type(name, topicImpl)); + _instance->topicAdapter()->add(topicImpl->getServant(), id); + return topicImpl->proxy(); } |