diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 1421 |
1 files changed, 1126 insertions, 295 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 1f3ea907310..a391ab306de 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -12,20 +12,33 @@ #include <IceStorm/Instance.h> #include <IceStorm/Subscriber.h> #include <IceStorm/TraceLevels.h> -#include <IceStorm/SubscriberPool.h> +#include <IceStorm/NodeI.h> +#include <IceStorm/Observers.h> #include <Ice/LoggerUtil.h> -#include <Freeze/Initialize.h> +#include <Freeze/Freeze.h> #include <algorithm> -using namespace IceStorm; using namespace std; +using namespace IceStorm; +using namespace IceStormElection; namespace { +void +halt(const Ice::CommunicatorPtr& com, const Freeze::DatabaseException& ex) +{ + { + Ice::Error error(com->getLogger()); + error << "fatal exception: " << ex << "\n*** Aborting application ***"; + } + + abort(); +} + // // The servant has a 1-1 association with a topic. It is used to // receive events from Publishers. @@ -34,16 +47,22 @@ class PublisherI : public Ice::BlobjectArray { public: - PublisherI(const TopicIPtr& topic) : - _topic(topic) + PublisherI(const TopicImplPtr& topic, const InstancePtr& instance) : + _topic(topic), _instance(instance) { } + ~PublisherI() + { + //cout << "~PublisherI" << endl; + } + virtual bool ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams, Ice::ByteSeq&, const Ice::Current& current) { + // The publish call does a cached read. EventDataPtr event = new EventData( current.operation, current.mode, @@ -66,7 +85,8 @@ public: private: - const TopicIPtr _topic; + const TopicImplPtr _topic; + const InstancePtr _instance; }; // @@ -77,118 +97,440 @@ class TopicLinkI : public TopicLink { public: - TopicLinkI(const TopicIPtr& topic) : - _topic(topic) + TopicLinkI(const TopicImplPtr& impl, const InstancePtr& instance) : + _impl(impl), _instance(instance) { } + ~TopicLinkI() + { + //cout << "~TopicLinkI" << endl; + } + virtual void forward(const EventDataSeq& v, const Ice::Current& current) { - _topic->publish(true, v); + // The publish call does a cached read. + _impl->publish(true, v); + } + +private: + + const TopicImplPtr _impl; + const InstancePtr _instance; +}; + +class TopicI : public TopicInternal +{ +public: + + TopicI(const TopicImplPtr& impl, const InstancePtr& instance) : + _impl(impl), _instance(instance) + { + } + + ~TopicI() + { + //cout << "~TopicI" << endl; + } + + virtual string getName(const Ice::Current&) const + { + // Use cached reads. + CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); + return _impl->getName(); + } + + virtual Ice::ObjectPrx getPublisher(const Ice::Current&) const + { + // Use cached reads. + CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); + return _impl->getPublisher(); + } + + virtual Ice::ObjectPrx getNonReplicatedPublisher(const Ice::Current&) const + { + // Use cached reads. + CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); + return _impl->getNonReplicatedPublisher(); + } + + virtual void subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current& current) + { + while(true) + { + Ice::Long generation = -1; + TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__); + if(master) + { + try + { + master->subscribe(qos, obj); + } + catch(const Ice::ConnectFailedException&) + { + _instance->node()->recovery(generation); + continue; + } + catch(const Ice::TimeoutException&) + { + _instance->node()->recovery(generation); + continue; + } + } + else + { + FinishUpdateHelper unlock(_instance->node()); + _impl->subscribe(qos, obj); + } + break; + } + } + + virtual Ice::ObjectPrx subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, + const Ice::Current& current) + { + while(true) + { + Ice::Long generation = -1; + TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__); + if(master) + { + try + { + return master->subscribeAndGetPublisher(qos, obj); + } + catch(const Ice::ConnectFailedException&) + { + _instance->node()->recovery(generation); + continue; + } + catch(const Ice::TimeoutException&) + { + _instance->node()->recovery(generation); + continue; + } + } + else + { + FinishUpdateHelper unlock(_instance->node()); + return _impl->subscribeAndGetPublisher(qos, obj); + } + } + } + + virtual void unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current& current) + { + while(true) + { + Ice::Long generation = -1; + TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__); + if(master) + { + try + { + master->unsubscribe(subscriber); + } + catch(const Ice::ConnectFailedException&) + { + _instance->node()->recovery(generation); + continue; + } + catch(const Ice::TimeoutException&) + { + _instance->node()->recovery(generation); + continue; + } + } + else + { + FinishUpdateHelper unlock(_instance->node()); + _impl->unsubscribe(subscriber); + } + break; + } + } + + virtual TopicLinkPrx getLinkProxy(const Ice::Current&) + { + // Use cached reads. + CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); + return _impl->getLinkProxy(); + } + + virtual void reap(const Ice::IdentitySeq& ids, const Ice::Current& current) + { + NodeIPtr node = _instance->node(); + if(!node->updateMaster(__FILE__, __LINE__)) + { + throw ReapWouldBlock(); + } + FinishUpdateHelper unlock(node); + _impl->reap(ids); + } + + virtual void link(const TopicPrx& topic, Ice::Int cost, const Ice::Current& current) + { + while(true) + { + Ice::Long generation = -1; + TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__); + if(master) + { + try + { + master->link(topic, cost); + } + catch(const Ice::ConnectFailedException&) + { + _instance->node()->recovery(generation); + continue; + } + catch(const Ice::TimeoutException&) + { + _instance->node()->recovery(generation); + continue; + } + } + else + { + FinishUpdateHelper unlock(_instance->node()); + _impl->link(topic, cost); + } + break; + } + } + + virtual void unlink(const TopicPrx& topic, const Ice::Current& current) + { + while(true) + { + Ice::Long generation = -1; + TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__); + if(master) + { + try + { + master->unlink(topic); + } + catch(const Ice::ConnectFailedException&) + { + _instance->node()->recovery(generation); + continue; + } + catch(const Ice::TimeoutException&) + { + _instance->node()->recovery(generation); + continue; + } + } + else + { + FinishUpdateHelper unlock(_instance->node()); + _impl->unlink(topic); + } + break; + } + } + + virtual LinkInfoSeq getLinkInfoSeq(const Ice::Current&) const + { + // Use cached reads. + CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); + return _impl->getLinkInfoSeq(); + } + + virtual void destroy(const Ice::Current& current) + { + while(true) + { + Ice::Long generation = -1; + TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__); + if(master) + { + try + { + master->destroy(); + } + catch(const Ice::ConnectFailedException&) + { + _instance->node()->recovery(generation); + continue; + } + catch(const Ice::TimeoutException&) + { + _instance->node()->recovery(generation); + continue; + } + } + else + { + FinishUpdateHelper unlock(_instance->node()); + _impl->destroy(); + } + break; + } } private: - const TopicIPtr _topic; + TopicPrx getMasterFor(const Ice::Current& cur, Ice::Long& generation, const char* file, int line) const + { + NodeIPtr node = _instance->node(); + Ice::ObjectPrx master; + if(node) + { + master = _instance->node()->startUpdate(generation, file, line); + } + return (master) ? TopicPrx::uncheckedCast(master->ice_identity(cur.id)) : TopicPrx(); + } + + const TopicImplPtr _impl; + const InstancePtr _instance; }; } + namespace IceStorm { extern string identityToTopicName(const Ice::Identity& id); } -TopicI::TopicI( +TopicImpl::TopicImpl( const InstancePtr& instance, const string& name, const Ice::Identity& id, - const LinkRecordSeq& topicRecord, - const string& envName, - const string& dbName) : + const SubscriberRecordSeq& subscribers) : _instance(instance), _name(name), _id(id), - _connection(Freeze::createConnection(instance->communicator(), envName)), - _topics(_connection, dbName, false), - _topicRecord(topicRecord), + _connection(Freeze::createConnection(instance->communicator(), instance->serviceName())), + _subscriberMap(_connection, "subscribers"), + _llumap(_connection, "llu"), _destroyed(false) { - // - // Create a servant per topic to receive event data. If the - // category is empty then we are in backwards compatibility - // mode. In this case the servant's identity is - // category=<topicname>, name=publish, otherwise the name is - // <instancename>/publisher.<topicname>. The same applies to the - // link proxy. - // - // Activate the object and save a reference to give to publishers. - // - Ice::Identity pubid; - Ice::Identity linkid; - if(id.category.empty()) - { - pubid.category = _name; - pubid.name = "publish"; - linkid.category = _name; - linkid.name = "link"; - } - else + try { - pubid.category = id.category; - pubid.name = _name + ".publish"; - linkid.category = id.category; - linkid.name = _name + ".link"; - } - - _publisherPrx = _instance->objectAdapter()->add(new PublisherI(this), pubid); - _linkPrx = TopicLinkPrx::uncheckedCast(_instance->objectAdapter()->add(new TopicLinkI(this), linkid)); + __setNoDelete(true); + // TODO: If we want to improve the performance of the + // non-replicated case we could allocate a null-topic impl here. + _servant = new TopicI(this, instance); - // - // Re-establish linked subscribers. - // - for(LinkRecordSeq::const_iterator p = _topicRecord.begin(); p != _topicRecord.end(); ++p) - { - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) + // + // Create a servant per topic to receive event data. If the + // category is empty then we are in backwards compatibility + // mode. In this case the servant's identity is + // category=<topicname>, name=publish, otherwise the name is + // <instancename>/<topicname>.publish. The same applies to the + // link proxy. + // + // Activate the object and save a reference to give to publishers. + // + Ice::Identity pubid; + Ice::Identity linkid; + if(id.category.empty()) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " relink " << _instance->communicator()->identityToString(p->theTopic->ice_getIdentity()); + pubid.category = _name; + pubid.name = "publish"; + linkid.category = _name; + linkid.name = "link"; + } + else + { + pubid.category = id.category; + pubid.name = _name + ".publish"; + linkid.category = id.category; + linkid.name = _name + ".link"; } + _publisherPrx = _instance->publishAdapter()->add(new PublisherI(this, instance), pubid); + _linkPrx = TopicLinkPrx::uncheckedCast( + _instance->publishAdapter()->add(new TopicLinkI(this, instance), linkid)); + // - // Create the subscriber object add it to the set of - // subscribers. + // Re-establish subscribers. // - SubscriberPtr subscriber = Subscriber::create(_instance, p->obj, p->cost); - _subscribers.push_back(subscriber); - _instance->subscriberPool()->add(subscriber); + for(SubscriberRecordSeq::const_iterator p = subscribers.begin(); p != subscribers.end(); ++p) + { + Ice::Identity id = p->obj->ice_getIdentity(); + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << " recreate " << _instance->communicator()->identityToString(id); + } + + try + { + // + // Create the subscriber object add it to the set of + // subscribers. + // + SubscriberPtr subscriber = Subscriber::create(_instance, *p); + _subscribers.push_back(subscriber); + } + catch(const Ice::Exception& ex) + { + Ice::Warning out(traceLevels->logger); + out << _name << " recreate " << _instance->communicator()->identityToString(id) << " failed: " << ex; + continue; + } + } } + catch(...) + { + shutdown(); + __setNoDelete(false); + throw; + } + __setNoDelete(false); } -TopicI::~TopicI() +TopicImpl::~TopicImpl() { + //cout << "~TopicImpl" << endl; } string -TopicI::getName(const Ice::Current&) const +TopicImpl::getName() const { // Immutable return _name; } Ice::ObjectPrx -TopicI::getPublisher(const Ice::Current&) const +TopicImpl::getPublisher() const { // Immutable + if(_instance->publisherReplicaProxy()) + { + return _instance->publisherReplicaProxy()->ice_identity(_publisherPrx->ice_getIdentity()); + } return _publisherPrx; } +Ice::ObjectPrx +TopicImpl::getNonReplicatedPublisher() const +{ + // If there is an adapter id configured then we're using icegrid + // so create an indirect proxy, otherwise create a direct proxy. + if(!_publisherPrx->ice_getAdapterId().empty()) + { + return _instance->publishAdapter()->createIndirectProxy(_publisherPrx->ice_getIdentity()); + } + else + { + return _instance->publishAdapter()->createDirectProxy(_publisherPrx->ice_getIdentity()); + } +} + // // COMPILERFIX: For some reason with VC6 find reports an error. // #if defined(_MSC_VER) && (_MSC_VER < 1300) -vector<SubscriberPtr>::iterator +namespace +{ +static vector<SubscriberPtr>::iterator find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end, const Ice::Identity& ident) { while(start != end) @@ -201,10 +543,29 @@ find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end, } return end; } +} #endif +namespace +{ +static void +trace(Ice::Trace& out, const InstancePtr& instance, const vector<SubscriberPtr>& s) +{ + out << '['; + for(vector<SubscriberPtr>::const_iterator p = s.begin(); p != s.end(); ++p) + { + if(p != s.begin()) + { + out << ","; + } + out << instance->communicator()->identityToString((*p)->id()); + } + out << "]"; +} +} + void -TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&) +TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj) { Ice::Identity id = obj->ice_getIdentity(); TraceLevelsPtr traceLevels = _instance->traceLevels(); @@ -212,7 +573,7 @@ TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Curr if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "Subscribe: " << _instance->communicator()->identityToString(id); + out << _name << ": subscribe: " << _instance->communicator()->identityToString(id); if(traceLevels->topic > 1) { out << " QoS: "; @@ -224,10 +585,11 @@ TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Curr } out << '[' << p->first << "," << p->second << ']'; } + out << " subscriptions: "; + trace(out, _instance, _subscribers); } } - string reliability = "oneway"; { QoS::iterator p = qos.find("reliability"); @@ -273,28 +635,103 @@ TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Curr } IceUtil::Mutex::Lock sync(_subscribersMutex); - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); + SubscriberRecord record; + record.id = id; + record.obj = newObj; + record.theQoS = qos; + record.topicName = _name; + record.link = false; + record.cost = 0; + + LogUpdate llu; + + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); if(p != _subscribers.end()) { + // If we already have this subscriber remove it from our + // subscriber list and remove it from the database. (*p)->destroy(); - _instance->subscriberPool()->remove(*p); _subscribers.erase(p); + + for(;;) + { + try + { + Freeze::TransactionHolder txn(_connection); + SubscriberRecordKey key; + key.topic = _id; + key.id = record.id; + SubscriberMap::iterator e = _subscriberMap.find(key); + if(e != _subscriberMap.end()) + { + _subscriberMap.erase(e); + } + LLUMap::iterator ci = _llumap.find("_manager"); + assert(ci != _llumap.end()); + llu = ci->second; + llu.iteration++; + ci.set(llu); + txn.commit(); + break; + } + catch(const Freeze::DeadlockException&) + { + continue; + } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } + } + Ice::IdentitySeq ids; + ids.push_back(id); + _instance->observers()->removeSubscriber(llu, _name, ids); + } + + SubscriberPtr subscriber = Subscriber::create(_instance, record); + for(;;) + { + try + { + Freeze::TransactionHolder txn(_connection); + SubscriberRecordKey key; + key.topic = _id; + key.id = subscriber->id(); + _subscriberMap.put(SubscriberMap::value_type(key, record)); + // Update the LLU. + LLUMap::iterator ci = _llumap.find("_manager"); + assert(ci != _llumap.end()); + llu = ci->second; + llu.iteration++; + ci.set(llu); + txn.commit(); + break; + } + catch(const Freeze::DeadlockException&) + { + continue; + } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } } - SubscriberPtr subscriber = Subscriber::create(_instance, _name, newObj, qos); _subscribers.push_back(subscriber); - _instance->subscriberPool()->add(subscriber); + + _instance->observers()->addSubscriber(llu, _name, record); } Ice::ObjectPrx -TopicI::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&) +TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj) { Ice::Identity id = obj->ice_getIdentity(); + TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "Subscribe: " << _instance->communicator()->identityToString(id); + out << _name << ": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(id); if(traceLevels->topic > 1) { out << " QoS: "; @@ -304,27 +741,70 @@ TopicI::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, cons { out << ','; } - out << '[' << p->first << "," << p->second << ']'; + } + out << " subscriptions: "; + trace(out, _instance, _subscribers); } } IceUtil::Mutex::Lock sync(_subscribersMutex); - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); + + SubscriberRecord record; + record.id = id; + record.obj = obj; + record.theQoS = qos; + record.topicName = _name; + record.link = false; + record.cost = 0; + + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); if(p != _subscribers.end()) { throw AlreadySubscribed(); } - SubscriberPtr subscriber = Subscriber::create(_instance, _name, obj, qos); + LogUpdate llu; + + SubscriberPtr subscriber = Subscriber::create(_instance, record); + for(;;) + { + try + { + Freeze::TransactionHolder txn(_connection); + + SubscriberRecordKey key; + key.topic = _id; + key.id = subscriber->id(); + _subscriberMap.put(SubscriberMap::value_type(key, record)); + + LLUMap::iterator ci = _llumap.find("_manager"); + assert(ci != _llumap.end()); + llu = ci->second; + llu.iteration++; + ci.set(llu); + txn.commit(); + break; + } + catch(const Freeze::DeadlockException&) + { + continue; + } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } + } + _subscribers.push_back(subscriber); - _instance->subscriberPool()->add(subscriber); + + _instance->observers()->addSubscriber(llu, _name, record); return subscriber->proxy(); } void -TopicI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&) +TopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber) { TraceLevelsPtr traceLevels = _instance->traceLevels(); if(!subscriber) @@ -342,110 +822,124 @@ TopicI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&) if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "Unsubscribe: " << _instance->communicator()->identityToString(id); + out << _name << ": unsubscribe: " << _instance->communicator()->identityToString(id); + if(traceLevels->topic > 1) + { + trace(out, _instance, _subscribers); + } } - // - // Unsubscribe the subscriber with this identity. - // - removeSubscriber(subscriber); + IceUtil::Mutex::Lock sync(_subscribersMutex); + Ice::IdentitySeq ids; + ids.push_back(id); + removeSubscribers(ids); } TopicLinkPrx -TopicI::getLinkProxy(const Ice::Current&) +TopicImpl::getLinkProxy() { // immutable + if(_instance->publisherReplicaProxy()) + { + return TopicLinkPrx::uncheckedCast(_instance->publisherReplicaProxy()->ice_identity( + _linkPrx->ice_getIdentity())); + } return _linkPrx; } void -TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) +TopicImpl::link(const TopicPrx& topic, Ice::Int cost) { TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic); TopicLinkPrx link = internal->getLinkProxy(); - IceUtil::RecMutex::Lock topicSync(_topicRecordMutex); - if(_destroyed) + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << ": link " << _instance->communicator()->identityToString(topic->ice_getIdentity()) + << " cost " << cost; } - reap(); + IceUtil::Mutex::Lock sync(_subscribersMutex); Ice::Identity id = topic->ice_getIdentity(); - string name = identityToTopicName(id); - // Validate that this topic doesn't already have an established - // link. - for(LinkRecordSeq::const_iterator p = _topicRecord.begin(); p != _topicRecord.end(); ++p) + SubscriberRecord record; + record.id = id; + record.obj = link; + record.theTopic = topic; + record.topicName = _name; + record.link = true; + record.cost = cost; + + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); + if(p != _subscribers.end()) { - if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity()) - { - LinkExists ex; - ex.name = name; - throw ex; - } + string name = identityToTopicName(id); + LinkExists ex; + ex.name = name; + throw ex; } - - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) + + LogUpdate llu; + + SubscriberPtr subscriber = Subscriber::create(_instance, record); + + for(;;) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " link " << _instance->communicator()->identityToString(id) - << " cost " << cost; + try + { + Freeze::TransactionHolder txn(_connection); + + SubscriberRecordKey key; + key.topic = _id; + key.id = id; + _subscriberMap.put(SubscriberMap::value_type(key, record)); + + LLUMap::iterator ci = _llumap.find("_manager"); + assert(ci != _llumap.end()); + llu = ci->second; + llu.iteration++; + ci.set(llu); + txn.commit(); + break; + } + catch(const Freeze::DeadlockException&) + { + continue; + } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } } - SubscriberPtr subscriber = Subscriber::create(_instance, link, cost); - - // - // Create the LinkRecord - // - LinkRecord record; - record.obj = link; - record.cost = cost; - record.theTopic = topic; - - // - // Save - // - _topicRecord.push_back(record); - _topics.put(PersistentTopicMap::value_type(_id, _topicRecord)); - - IceUtil::Mutex::Lock subscriberSync(_subscribersMutex); _subscribers.push_back(subscriber); - _instance->subscriberPool()->add(subscriber); + + _instance->observers()->addSubscriber(llu, _name, record); } void -TopicI::unlink(const TopicPrx& topic, const Ice::Current& current) +TopicImpl::unlink(const TopicPrx& topic) { - IceUtil::RecMutex::Lock topicSync(_topicRecordMutex); + IceUtil::Mutex::Lock sync(_subscribersMutex); if(_destroyed) { throw Ice::ObjectNotExistException(__FILE__, __LINE__); } - reap(); - Ice::Identity id = topic->ice_getIdentity(); - string name = identityToTopicName(id); - LinkRecordSeq::iterator p = _topicRecord.begin(); - while(p != _topicRecord.end()) - { - if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity()) - { - break; - } - ++p; - } - if(p == _topicRecord.end()) + vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(), id); + if(p == _subscribers.end()) { + string name = identityToTopicName(id); TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " unlink " << name << " failed - not linked"; + out << _name << ": unlink " << name << " failed - not linked"; } NoSuchLink ex; @@ -453,48 +947,79 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current) throw ex; } - Ice::ObjectPrx subscriber = p->obj; - _topicRecord.erase(p); + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << " unlink " << _instance->communicator()->identityToString(id); + } + + Ice::IdentitySeq ids; + ids.push_back(id); + removeSubscribers(ids); +} - // - // Save - // - _topics.put(PersistentTopicMap::value_type(_id, _topicRecord)); +void +TopicImpl::reap(const Ice::IdentitySeq& ids) +{ + IceUtil::Mutex::Lock sync(_subscribersMutex); TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " unlink " << _instance->communicator()->identityToString(id); + out << _name << ": reap "; + for(Ice::IdentitySeq::const_iterator p = ids.begin(); p != ids.end() ; ++p) + { + if(p != ids.begin()) + { + out << ","; + } + out << _instance->communicator()->identityToString(*p); + } + } + + removeSubscribers(ids); +} + +void +TopicImpl::shutdown() +{ + IceUtil::Mutex::Lock sync(_subscribersMutex); + _servant = 0; + + // Shutdown each subscriber. This waits for the event queues to drain. + for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + { + (*p)->shutdown(); } - removeSubscriber(subscriber); } LinkInfoSeq -TopicI::getLinkInfoSeq(const Ice::Current&) const +TopicImpl::getLinkInfoSeq() const { - IceUtil::RecMutex::Lock topicSync(_topicRecordMutex); - TopicI* This = const_cast<TopicI*>(this); - This->reap(); - + IceUtil::Mutex::Lock sync(_subscribersMutex); + LinkInfoSeq seq; - - for(LinkRecordSeq::const_iterator q = _topicRecord.begin(); q != _topicRecord.end(); ++q) + for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) { - LinkInfo info; - info.name = identityToTopicName(q->theTopic->ice_getIdentity()); - info.cost = q->cost; - info.theTopic = q->theTopic; - seq.push_back(info); + SubscriberRecord record = (*p)->record(); + if(record.link && !(*p)->errored()) + { + LinkInfo info; + info.name = identityToTopicName(record.theTopic->ice_getIdentity()); + info.cost = record.cost; + info.theTopic = record.theTopic; + seq.push_back(info); + } } - return seq; } void -TopicI::destroy(const Ice::Current&) +TopicImpl::destroy() { - IceUtil::RecMutex::Lock sync(_topicRecordMutex); + IceUtil::Mutex::Lock sync(_subscribersMutex); if(_destroyed) { @@ -502,208 +1027,514 @@ TopicI::destroy(const Ice::Current&) } _destroyed = true; - try + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << ": destroy"; + } + + // destroyInternal clears out the topic content. + LogUpdate llu = {0,0}; + _instance->observers()->destroyTopic(destroyInternal(llu, true), _name); +} + +TopicContent +TopicImpl::getContent() const +{ + IceUtil::Mutex::Lock sync(_subscribersMutex); + + TopicContent content; + content.id = _id; + for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + { + // Don't return errored subscribers (subscribers that have + // errored out, but not reaped due to a failure with the + // master). This means we can avoid the reaping step later. + if(!(*p)->errored()) + { + content.records.push_back((*p)->record()); + } + } + return content; +} + +void +TopicImpl::update(const SubscriberRecordSeq& records) +{ + IceUtil::Mutex::Lock sync(_subscribersMutex); + + // We do this with two scans. The first runs through the subscribers + // that we have and removes those not in the init list. The second + // runs through the init list and add the ones that don't + // exist. + { - _instance->objectAdapter()->remove(_linkPrx->ice_getIdentity()); - _instance->objectAdapter()->remove(_publisherPrx->ice_getIdentity()); + vector<SubscriberPtr>::iterator p = _subscribers.begin(); + while(p != _subscribers.end()) + { + SubscriberRecordSeq::const_iterator q; + for(q = records.begin(); q != records.end(); ++q) + { + if((*p)->id() == q->id) + { + break; + } + } + // The subscriber doesn't exist in the incoming subscriber + // set so destroy it. + if(q == records.end()) + { + (*p)->destroy(); + p = _subscribers.erase(p); + } + else + { + // Otherwise reset the reaped status if necessary. + (*p)->resetIfReaped(); + ++p; + } + } } - catch(const Ice::ObjectAdapterDeactivatedException&) + + for(SubscriberRecordSeq::const_iterator p = records.begin(); p != records.end(); ++p) { - // Ignore -- this could occur on shutdown. + vector<SubscriberPtr>::iterator q; + for(q = _subscribers.begin(); q != _subscribers.end(); ++q) + { + if((*q)->id() == p->id) + { + break; + } + } + if(q == _subscribers.end()) + { + SubscriberPtr subscriber = Subscriber::create(_instance, *p); + _subscribers.push_back(subscriber); + } } } bool -TopicI::destroyed() const +TopicImpl::destroyed() const { - IceUtil::RecMutex::Lock sync(_topicRecordMutex); + IceUtil::Mutex::Lock sync(_subscribersMutex); return _destroyed; } Ice::Identity -TopicI::id() const +TopicImpl::id() const { // immutable return _id; } -void -TopicI::reap() +TopicPrx +TopicImpl::proxy() const { - IceUtil::RecMutex::Lock topicSync(_topicRecordMutex); - if(_destroyed) + // immutable + Ice::ObjectPrx prx; + if(_instance->topicReplicaProxy()) { - return; + prx = _instance->topicReplicaProxy()->ice_identity(_id); + } + else + { + prx = _instance->topicAdapter()->createProxy(_id); } - bool updated = false; + return TopicPrx::uncheckedCast(prx); +} - // - // Run through all invalid subscribers and remove them from the - // database. - // - list<SubscriberPtr> error; +namespace +{ + +class TopicInternal_reapI : public AMI_TopicInternal_reap +{ +public: + + TopicInternal_reapI(const InstancePtr& instance, Ice::Long generation) : + _instance(instance), _generation(generation) { - IceUtil::Mutex::Lock errorSync(_errorMutex); - _error.swap(error); } - TraceLevelsPtr traceLevels = _instance->traceLevels(); - for(list<SubscriberPtr>::const_iterator p = error.begin(); p != error.end(); ++p) + virtual void ice_response() + { + } + + virtual void ice_exception(const Ice::Exception& ex) + { + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "exception when calling `reap' on the master replica: " << ex; + } + _instance->node()->recovery(_generation); + } + +private: + + const InstancePtr _instance; + const Ice::Long _generation; +}; + +} + +void +TopicImpl::publish(bool forwarded, const EventDataSeq& events) +{ + TopicInternalPrx masterInternal; + Ice::Long generation = -1; + Ice::IdentitySeq reap; { - SubscriberPtr subscriber = *p; - assert(subscriber->persistent()); // Only persistent subscribers need to be reaped. + // Use cached reads. + CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); - bool found = false; // - // If this turns out to be a performance problem then we - // can create an in memory map cache. + // Copy of the subscriber list so that event publishing can occur + // in parallel. // - LinkRecordSeq::iterator q = _topicRecord.begin(); - while(q != _topicRecord.end()) + vector<SubscriberPtr> copy; { - if(q->obj->ice_getIdentity() == subscriber->id()) - { - _topicRecord.erase(q); - updated = true; - found = true; - break; - } - ++q; + IceUtil::Mutex::Lock sync(_subscribersMutex); + copy = _subscribers; } - if(traceLevels->topic > 0) + + // + // Queue each event, gathering a list of those subscribers that + // must be reaped. + // + for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "reaping " << _instance->communicator()->identityToString(subscriber->id()); - if(!found) + if(!(*p)->queue(forwarded, events) && (*p)->reap()) { - out << ": failed - not in database"; + reap.push_back((*p)->id()); } } + + // If there are no subscribers in error then we're done. + if(reap.empty()) + { + return; + } + if(!unlock.getMaster()) + { + IceUtil::Mutex::Lock sync(_subscribersMutex); + removeSubscribers(reap); + return; + } + masterInternal = TopicInternalPrx::uncheckedCast(unlock.getMaster()->ice_identity(_id)); + generation = unlock.generation(); } - if(updated) - { - _topics.put(PersistentTopicMap::value_type(_id, _topicRecord)); - } + + + // Tell the master to reap this set of subscribers. This is an + // AMI invocation so it shouldn't block the caller (in the + // typical case) we do it outside of the mutex lock for + // performance reasons. + // + // We must release the cached lock before calling this as the AMI + // call may raise an exception in the caller (that is directly + // call ice_exception) which calls recover() on the node which + // would result in a deadlock since the node is locked. + masterInternal->reap_async(new TopicInternal_reapI(_instance, generation), reap); } void -TopicI::publish(bool forwarded, const EventDataSeq& events) +TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& record) { - // - // Copy of the subscriber list so that event publishing can occur - // in parallel. - // - vector<SubscriberPtr> copy; + IceUtil::Mutex::Lock sync(_subscribersMutex); + + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) { - IceUtil::Mutex::Lock sync(_subscribersMutex); - copy = _subscribers; + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << ": add replica observer: " << _instance->communicator()->identityToString(record.id); + if(traceLevels->topic > 1) + { + out << " QoS: "; + for(QoS::const_iterator p = record.theQoS.begin(); p != record.theQoS.end() ; ++p) + { + if(p != record.theQoS.begin()) + { + out << ','; + } + out << '[' << p->first << "," << p->second << ']'; + } + } + out << " llu: " << llu.generation << "/" << llu.iteration; } - // - // Queue each event. This results in two lists -- one the list of - // subscribers in error and the second a list of subscribers that - // need to be flushed. - // - vector<Ice::Identity> e; - list<SubscriberPtr> flush; - for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p) + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); + if(p != _subscribers.end()) { - Subscriber::QueueState state = (*p)->queue(forwarded, events); - switch(state) + // If the subscriber is already in the database display a + // diagnostic. + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) { - case Subscriber::QueueStateError: - e.push_back((*p)->id()); - break; - case Subscriber::QueueStateFlush: - flush.push_back(*p); - break; - case Subscriber::QueueStateNoFlush: - break; + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _instance->communicator()->identityToString(record.id) << ": already subscribed"; } + return; } - // - // Now we add each subscriber to be flushed to the flush manager. - // - if(!flush.empty()) + SubscriberPtr subscriber = Subscriber::create(_instance, record); + for(;;) { - _instance->subscriberPool()->flush(flush); + try + { + Freeze::TransactionHolder txn(_connection); + + SubscriberRecordKey key; + key.topic = _id; + key.id = subscriber->id(); + _subscriberMap.put(SubscriberMap::value_type(key, record)); + + // Update the LLU. + LLUMap::iterator ci = _llumap.find("_manager"); + assert(ci != _llumap.end()); + ci.set(llu); + txn.commit(); + break; + } + catch(const Freeze::DeadlockException&) + { + continue; + } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } } - // - // Run through the error list removing those subscribers that are - // in error from the subscriber list. - // - list<SubscriberPtr> reap; - if(!e.empty()) - { - IceUtil::Mutex::Lock sync(_subscribersMutex); - for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep) - { - // - // Its possible for the subscriber to already have been - // removed since the copy is iterated over outside of - // mutex protection. - // - // Note that although this could be quicker if we used a - // map, the most optimal case should be pushing around - // events not searching for a particular subscriber. - // - // The subscriber is immediately destroyed & removed from - // the _subscribers list. If the subscriber is persistent - // its added to an list of error'd subscribers and removed - // from the database on the next reap. - // - vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep); - if(q != _subscribers.end()) + _subscribers.push_back(subscriber); +} + +void +TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq& ids) +{ + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << ": remove replica observer: "; + for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) + { + if(id != ids.begin()) { - // - // Destroy the subscriber in any case. - // - (*q)->destroy(); - if((*q)->persistent()) - { - reap.push_back(*q); - } - _instance->subscriberPool()->remove(*q); - _subscribers.erase(q); + out << ","; } + out << _instance->communicator()->identityToString(*id); + } + out << " llu: " << llu.generation << "/" << llu.iteration; + } + + IceUtil::Mutex::Lock sync(_subscribersMutex); + + // Remove the subscriber from the subscribers list. If the + // subscriber had a local failure and was removed from the + // subscriber list it could already be gone. That's not a problem. + for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) + { + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id); + if(p != _subscribers.end()) + { + (*p)->destroy(); + _subscribers.erase(p); } } - if(!reap.empty()) + // Next remove from the database. + for(;;) { - // - // This is why _error is a list, so we can splice on the - // reaped subscribers. - // - IceUtil::Mutex::Lock errorSync(_errorMutex); - _error.splice(_error.begin(), reap); + try + { + Freeze::TransactionHolder txn(_connection); + for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) + { + SubscriberRecordKey key; + key.topic = _id; + key.id = *id; + SubscriberMap::iterator e = _subscriberMap.find(key); + if(e != _subscriberMap.end()) + { + _subscriberMap.erase(e); + } + } + LLUMap::iterator ci = _llumap.find("_manager"); + assert(ci != _llumap.end()); + ci.set(llu); + txn.commit(); + break; + } + catch(const Freeze::DeadlockException&) + { + continue; + } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } } } void -TopicI::removeSubscriber(const Ice::ObjectPrx& obj) +TopicImpl::observerDestroyTopic(const LogUpdate& llu) { - Ice::Identity id = obj->ice_getIdentity(); - IceUtil::Mutex::Lock sync(_subscribersMutex); - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); - if(p != _subscribers.end()) + + if(_destroyed) { - (*p)->destroy(); - _instance->subscriberPool()->remove(*p); - _subscribers.erase(p); return; } - - // - // If the subscriber was not found then display a diagnostic. - // + _destroyed = true; + TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _instance->communicator()->identityToString(id) << ": not subscribed."; + out << _name << ": destroyed"; + out << " llu: " << llu.generation << "/" << llu.iteration; + } + destroyInternal(llu, false); +} + +Ice::ObjectPtr +TopicImpl::getServant() const +{ + return _servant; +} + +LogUpdate +TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master) +{ + _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity()); + _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity()); + + // Destroy each of the subscribers. + for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + { + (*p)->destroy(); } + _subscribers.clear(); + + // Clear out the database records related to this topic. + LogUpdate llu; + for(;;) + { + try + { + SubscriberRecordKey key; + key.topic = _id; + + Freeze::TransactionHolder txn(_connection); + // Erase all subscriber records and the topic record. + SubscriberMap::iterator p = _subscriberMap.find(key); + while(p != _subscriberMap.end() && p->first.topic == key.topic) + { + _subscriberMap.erase(p++); + } + + // Update the LLU. + LLUMap::iterator ci = _llumap.find("_manager"); + assert(ci != _llumap.end()); + if(master) + { + llu = ci->second; + llu.iteration++; + } + else + { + llu = origLLU; + } + ci.set(llu); + txn.commit(); + break; + } + catch(const Freeze::DeadlockException&) + { + continue; + } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } + } + + _instance->topicAdapter()->remove(_id); + + _servant = 0; + + return llu; +} + +void +TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids) +{ + Ice::IdentitySeq removed; + + // First remove the subscriber from the subscribers list. Its + // possible that some of these subscribers have already been + // removed (consider, for example, a concurrent reap call from two + // replicas on the same subscriber). To avoid sending unnecessary + // observer updates keep track of the observers that are actually + // removed. + for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) + { + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id); + if(p != _subscribers.end()) + { + (*p)->destroy(); + _subscribers.erase(p); + removed.push_back(*id); + } + } + + // If there is no further work to do we are done. + if(removed.empty()) + { + return; + } + + // Next update the database and send the notification to any + // slaves. + LogUpdate llu; + for(;;) + { + try + { + Freeze::TransactionHolder txn(_connection); + + for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) + { + SubscriberRecordKey key; + key.topic = _id; + key.id = *id; + SubscriberMap::iterator e = _subscriberMap.find(key); + if(e != _subscriberMap.end()) + { + _subscriberMap.erase(e); + } + } + + LLUMap::iterator ci = _llumap.find("_manager"); + assert(ci != _llumap.end()); + llu = ci->second; + llu.iteration++; + ci.set(llu); + txn.commit(); + break; + } + catch(const Freeze::DeadlockException&) + { + continue; + } + catch(const Freeze::DatabaseException& ex) + { + halt(_instance->communicator(), ex); + } + } + + _instance->observers()->removeSubscriber(llu, _name, ids); } |