diff options
author | Joe George <joe@zeroc.com> | 2021-01-28 16:26:44 -0500 |
---|---|---|
committer | Joe George <joe@zeroc.com> | 2021-02-01 16:59:30 -0500 |
commit | 92a6531e409f2691d82591e185a92299d415fc0f (patch) | |
tree | 60c79e2a8f327b8f0b6ebc06b06f48a2e8086f6a /cpp/src/IceStorm/TopicI.cpp | |
parent | Port Glacier2, IceBox, IceBridge, IceDB, IceXML, icegriddb (diff) | |
download | ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.bz2 ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.xz ice-92a6531e409f2691d82591e185a92299d415fc0f.zip |
IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 465 |
1 files changed, 224 insertions, 241 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 480189c6387..5dcaff63153 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -21,7 +21,7 @@ namespace { void -logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex) +logError(const shared_ptr<Ice::Communicator>& com, const IceDB::LMDBException& ex) { Ice::Error error(com->getLogger()); error << "LMDB error: " << ex; @@ -35,28 +35,24 @@ class PublisherI : public Ice::BlobjectArray { public: - PublisherI(const TopicImplPtr& topic, const PersistentInstancePtr& instance) : - _topic(topic), _instance(instance) + PublisherI(shared_ptr<TopicImpl> topic, shared_ptr<PersistentInstance> instance) : + _topic(move(topic)), _instance(move(instance)) { } - virtual bool - ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams, + bool + ice_invoke(pair<const Ice::Byte*, const Ice::Byte*> inParams, Ice::ByteSeq&, - const Ice::Current& current) + const Ice::Current& current) override { // The publish call does a cached read. - EventDataPtr event = new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx); + EventData event = { current.operation, current.mode, Ice::ByteSeq(), current.ctx }; - // - // COMPILERBUG: gcc 4.0.1 doesn't like this. - // - //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); Ice::ByteSeq data(inParams.first, inParams.second); - event->data.swap(data); + event.data = move(data); EventDataSeq v; - v.push_back(event); + v.push_back(move(event)); _topic->publish(false, v); return true; @@ -64,8 +60,8 @@ public: private: - const TopicImplPtr _topic; - const PersistentInstancePtr _instance; + const shared_ptr<TopicImpl> _topic; + const shared_ptr<PersistentInstance> _instance; }; // @@ -76,66 +72,66 @@ class TopicLinkI : public TopicLink { public: - TopicLinkI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) : - _impl(impl), _instance(instance) + TopicLinkI(shared_ptr<TopicImpl> impl, shared_ptr<PersistentInstance> instance) : + _impl(move(impl)), _instance(move(instance)) { } - virtual void - forward(const EventDataSeq& v, const Ice::Current& /*current*/) + void + forward(EventDataSeq v, const Ice::Current&) override { // The publish call does a cached read. - _impl->publish(true, v); + _impl->publish(true, move(v)); } private: - const TopicImplPtr _impl; - const PersistentInstancePtr _instance; + const shared_ptr<TopicImpl> _impl; + const shared_ptr<PersistentInstance> _instance; }; class TopicI : public TopicInternal { public: - TopicI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) : - _impl(impl), _instance(instance) + TopicI(shared_ptr<TopicImpl> impl, shared_ptr<PersistentInstance> instance) : + _impl(move(impl)), _instance(move(instance)) { } - virtual string getName(const Ice::Current&) const + string getName(const Ice::Current&) const override { // Use cached reads. CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); return _impl->getName(); } - virtual Ice::ObjectPrx getPublisher(const Ice::Current&) const + shared_ptr<Ice::ObjectPrx> getPublisher(const Ice::Current&) const override { // Use cached reads. CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); return _impl->getPublisher(); } - virtual Ice::ObjectPrx getNonReplicatedPublisher(const Ice::Current&) const + shared_ptr<Ice::ObjectPrx> getNonReplicatedPublisher(const Ice::Current&) const override { // Use cached reads. CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); return _impl->getNonReplicatedPublisher(); } - virtual Ice::ObjectPrx subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, - const Ice::Current& current) + shared_ptr<Ice::ObjectPrx> subscribeAndGetPublisher(QoS qos, shared_ptr<Ice::ObjectPrx> obj, + const Ice::Current& current) override { while(true) { Ice::Long generation = -1; - TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__); + auto master = getMasterFor(current, generation, __FILE__, __LINE__); if(master) { try { - return master->subscribeAndGetPublisher(qos, obj); + return master->subscribeAndGetPublisher(move(qos), move(obj)); } catch(const Ice::ConnectFailedException&) { @@ -151,22 +147,22 @@ public: else { FinishUpdateHelper unlock(_instance->node()); - return _impl->subscribeAndGetPublisher(qos, obj); + return _impl->subscribeAndGetPublisher(move(qos), move(obj)); } } } - virtual void unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current& current) + void unsubscribe(shared_ptr<Ice::ObjectPrx> subscriber, const Ice::Current& current) override { while(true) { Ice::Long generation = -1; - TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__); + auto master = getMasterFor(current, generation, __FILE__, __LINE__); if(master) { try { - master->unsubscribe(subscriber); + master->unsubscribe(move(subscriber)); } catch(const Ice::ConnectFailedException&) { @@ -182,22 +178,22 @@ public: else { FinishUpdateHelper unlock(_instance->node()); - _impl->unsubscribe(subscriber); + _impl->unsubscribe(move(subscriber)); } break; } } - virtual TopicLinkPrx getLinkProxy(const Ice::Current&) + shared_ptr<TopicLinkPrx> getLinkProxy(const Ice::Current&) override { // Use cached reads. CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); return _impl->getLinkProxy(); } - virtual void reap(const Ice::IdentitySeq& ids, const Ice::Current& /*current*/) + void reap(Ice::IdentitySeq ids, const Ice::Current&) override { - NodeIPtr node = _instance->node(); + auto node = _instance->node(); if(!node->updateMaster(__FILE__, __LINE__)) { throw ReapWouldBlock(); @@ -206,17 +202,17 @@ public: _impl->reap(ids); } - virtual void link(const TopicPrx& topic, Ice::Int cost, const Ice::Current& current) + void link(shared_ptr<TopicPrx> topic, int cost, const Ice::Current& current) override { while(true) { Ice::Long generation = -1; - TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__); + auto master = getMasterFor(current, generation, __FILE__, __LINE__); if(master) { try { - master->link(topic, cost); + master->link(move(topic), cost); } catch(const Ice::ConnectFailedException&) { @@ -232,23 +228,23 @@ public: else { FinishUpdateHelper unlock(_instance->node()); - _impl->link(topic, cost); + _impl->link(move(topic), cost); } break; } } - virtual void unlink(const TopicPrx& topic, const Ice::Current& current) + void unlink(shared_ptr<TopicPrx> topic, const Ice::Current& current) override { while(true) { Ice::Long generation = -1; - TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__); + auto master = getMasterFor(current, generation, __FILE__, __LINE__); if(master) { try { - master->unlink(topic); + master->unlink(move(topic)); } catch(const Ice::ConnectFailedException&) { @@ -264,30 +260,30 @@ public: else { FinishUpdateHelper unlock(_instance->node()); - _impl->unlink(topic); + _impl->unlink(move(topic)); } break; } } - virtual LinkInfoSeq getLinkInfoSeq(const Ice::Current&) const + LinkInfoSeq getLinkInfoSeq(const Ice::Current&) const override { // Use cached reads. CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); return _impl->getLinkInfoSeq(); } - virtual Ice::IdentitySeq getSubscribers(const Ice::Current&) const + Ice::IdentitySeq getSubscribers(const Ice::Current&) const override { return _impl->getSubscribers(); } - virtual void destroy(const Ice::Current& current) + void destroy(const Ice::Current& current) override { while(true) { Ice::Long generation = -1; - TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__); + auto master = getMasterFor(current, generation, __FILE__, __LINE__); if(master) { try @@ -316,29 +312,72 @@ public: private: - TopicPrx getMasterFor(const Ice::Current& cur, Ice::Long& generation, const char* file, int line) const + shared_ptr<TopicPrx> getMasterFor(const Ice::Current& cur, Ice::Long& generation, const char* file, int line) const { - NodeIPtr node = _instance->node(); - Ice::ObjectPrx master; + auto node = _instance->node(); + shared_ptr<Ice::ObjectPrx> master; if(node) { master = _instance->node()->startUpdate(generation, file, line); } - return (master) ? TopicPrx::uncheckedCast(master->ice_identity(cur.id)) : TopicPrx(); + return master ? Ice::uncheckedCast<TopicPrx>(master->ice_identity(cur.id)) : nullptr; } - const TopicImplPtr _impl; - const PersistentInstancePtr _instance; + const shared_ptr<TopicImpl> _impl; + const shared_ptr<PersistentInstance> _instance; }; } -TopicImpl::TopicImpl( - const PersistentInstancePtr& instance, - const string& name, - const Ice::Identity& id, - const SubscriberRecordSeq& subscribers) : - _instance(instance), +shared_ptr<TopicImpl> +TopicImpl::create(shared_ptr<PersistentInstance> instance, + const string& name, + const Ice::Identity& id, + const SubscriberRecordSeq& subscribers) +{ + shared_ptr<TopicImpl> topicImpl(new TopicImpl(instance, name, id, subscribers)); + + topicImpl->_servant = make_shared<TopicI>(topicImpl, instance); + // + // Create a servant per topic to receive event data. If the + // category is empty then we are in backwards compatibility + // mode. In this case the servant's identity is + // category=<topicname>, name=publish, otherwise the name is + // <instancename>/<topicname>.publish. The same applies to the + // link proxy. + // + // Activate the object and save a reference to give to publishers. + // + Ice::Identity pubid; + Ice::Identity linkid; + if(id.category.empty()) + { + pubid.category = name; + pubid.name = "publish"; + linkid.category = name; + linkid.name = "link"; + } + else + { + pubid.category = id.category; + pubid.name = name + ".publish"; + linkid.category = id.category; + linkid.name = name + ".link"; + } + + auto publisher = make_shared<PublisherI>(topicImpl, instance); + topicImpl->_publisherPrx = instance->publishAdapter()->add(publisher, pubid); + auto topicLink = make_shared<TopicLinkI>(topicImpl, instance); + topicImpl->_linkPrx = Ice::uncheckedCast<TopicLinkPrx>(instance->publishAdapter()->add(topicLink, linkid)); + + return topicImpl; +} + +TopicImpl::TopicImpl(shared_ptr<PersistentInstance> instance, + const string& name, + const Ice::Identity& id, + const SubscriberRecordSeq& subscribers) : + _instance(move(instance)), _name(name), _id(id), _destroyed(false), @@ -347,57 +386,20 @@ TopicImpl::TopicImpl( { try { - __setNoDelete(true); - - // TODO: If we want to improve the performance of the - // non-replicated case we could allocate a null-topic impl here. - _servant = new TopicI(this, instance); - - // - // Create a servant per topic to receive event data. If the - // category is empty then we are in backwards compatibility - // mode. In this case the servant's identity is - // category=<topicname>, name=publish, otherwise the name is - // <instancename>/<topicname>.publish. The same applies to the - // link proxy. - // - // Activate the object and save a reference to give to publishers. - // - Ice::Identity pubid; - Ice::Identity linkid; - if(id.category.empty()) - { - pubid.category = _name; - pubid.name = "publish"; - linkid.category = _name; - linkid.name = "link"; - } - else - { - pubid.category = id.category; - pubid.name = _name + ".publish"; - linkid.category = id.category; - linkid.name = _name + ".link"; - } - - _publisherPrx = _instance->publishAdapter()->add(new PublisherI(this, instance), pubid); - _linkPrx = TopicLinkPrx::uncheckedCast( - _instance->publishAdapter()->add(new TopicLinkI(this, instance), linkid)); - // // Re-establish subscribers. // - for(SubscriberRecordSeq::const_iterator p = subscribers.begin(); p != subscribers.end(); ++p) + for(const auto& subscriber : subscribers) { - Ice::Identity ident = p->obj->ice_getIdentity(); - TraceLevelsPtr traceLevels = _instance->traceLevels(); + Ice::Identity ident = subscriber.obj->ice_getIdentity(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); out << _name << " recreate " << _instance->communicator()->identityToString(ident); if(traceLevels->topic > 1) { - out << " endpoints: " << IceStormInternal::describeEndpoints(p->obj); + out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber.obj); } } @@ -407,8 +409,7 @@ TopicImpl::TopicImpl( // Create the subscriber object add it to the set of // subscribers. // - SubscriberPtr subscriber = Subscriber::create(_instance, *p); - _subscribers.push_back(subscriber); + _subscribers.push_back(Subscriber::create(_instance, subscriber)); } catch(const Ice::Exception& ex) { @@ -416,7 +417,7 @@ TopicImpl::TopicImpl( out << _name << " recreate " << _instance->communicator()->identityToString(ident); if(traceLevels->topic > 1) { - out << " endpoints: " << IceStormInternal::describeEndpoints(p->obj); + out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber.obj); } out << " failed: " << ex; } @@ -424,16 +425,14 @@ TopicImpl::TopicImpl( if(_instance->observer()) { - _observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, 0)); + _observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, nullptr)); } } - catch(...) + catch(const std::exception&) { shutdown(); - __setNoDelete(false); throw; } - __setNoDelete(false); } string @@ -443,7 +442,7 @@ TopicImpl::getName() const return _name; } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> TopicImpl::getPublisher() const { // Immutable @@ -454,7 +453,7 @@ TopicImpl::getPublisher() const return _publisherPrx; } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> TopicImpl::getNonReplicatedPublisher() const { // If there is an adapter id configured then we're using icegrid @@ -472,10 +471,10 @@ TopicImpl::getNonReplicatedPublisher() const namespace { void -trace(Ice::Trace& out, const PersistentInstancePtr& instance, const vector<SubscriberPtr>& s) +trace(Ice::Trace& out, const shared_ptr<PersistentInstance>& instance, const vector<shared_ptr<Subscriber>>& s) { out << '['; - for(vector<SubscriberPtr>::const_iterator p = s.begin(); p != s.end(); ++p) + for(auto p = s.cbegin(); p != s.cend(); ++p) { if(p != s.begin()) { @@ -487,12 +486,12 @@ trace(Ice::Trace& out, const PersistentInstancePtr& instance, const vector<Subsc } } -Ice::ObjectPrx -TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj) +shared_ptr<Ice::ObjectPrx> +TopicImpl::subscribeAndGetPublisher(QoS qos, shared_ptr<Ice::ObjectPrx> obj) { if(!obj) { - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -500,9 +499,9 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj) } throw InvalidSubscriber("subscriber is a null proxy"); } - Ice::Identity id = obj->ice_getIdentity(); + auto id = obj->ice_getIdentity(); - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -525,7 +524,7 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj) } } - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lg(_subscribersMutex); SubscriberRecord record; record.id = id; @@ -535,15 +534,14 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj) record.link = false; record.cost = 0; - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); - if(p != _subscribers.end()) + if(find(_subscribers.begin(), _subscribers.end(), record.id) != _subscribers.end()) { throw AlreadySubscribed(); } LogUpdate llu; - SubscriberPtr subscriber = Subscriber::create(_instance, record); + auto subscriber = Subscriber::create(_instance, record); try { IceDB::ReadWriteTxn txn(_instance->dbEnv()); @@ -572,9 +570,9 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj) } void -TopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber) +TopicImpl::unsubscribe(const shared_ptr<Ice::ObjectPrx>& subscriber) { - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(!subscriber) { if(traceLevels->topic > 0) @@ -599,31 +597,32 @@ TopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber) } } - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lg(_subscribersMutex); + Ice::IdentitySeq ids; ids.push_back(id); removeSubscribers(ids); } -TopicLinkPrx +shared_ptr<TopicLinkPrx> TopicImpl::getLinkProxy() { // immutable if(_instance->publisherReplicaProxy()) { - return TopicLinkPrx::uncheckedCast(_instance->publisherReplicaProxy()->ice_identity( + return Ice::uncheckedCast<TopicLinkPrx>(_instance->publisherReplicaProxy()->ice_identity( _linkPrx->ice_getIdentity())); } return _linkPrx; } void -TopicImpl::link(const TopicPrx& topic, Ice::Int cost) +TopicImpl::link(const shared_ptr<TopicPrx>& topic, int cost) { - TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic); - TopicLinkPrx link = internal->getLinkProxy(); + auto internal = Ice::uncheckedCast<TopicInternalPrx>(topic); + auto link = internal->getLinkProxy(); - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -631,7 +630,7 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost) << " cost " << cost; } - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lg(_subscribersMutex); Ice::Identity id = topic->ice_getIdentity(); @@ -643,8 +642,7 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost) record.link = true; record.cost = cost; - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); - if(p != _subscribers.end()) + if(find(_subscribers.begin(), _subscribers.end(), record.id) != _subscribers.end()) { string name = IceStormInternal::identityToTopicName(id); throw LinkExists(name); @@ -652,7 +650,7 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost) LogUpdate llu; - SubscriberPtr subscriber = Subscriber::create(_instance, record); + auto subscriber = Subscriber::create(_instance, record); try { @@ -680,9 +678,10 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost) } void -TopicImpl::unlink(const TopicPrx& topic) +TopicImpl::unlink(const shared_ptr<TopicPrx>& topic) { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lg(_subscribersMutex); + if(_destroyed) { throw Ice::ObjectNotExistException(__FILE__, __LINE__); @@ -690,11 +689,12 @@ TopicImpl::unlink(const TopicPrx& topic) Ice::Identity id = topic->ice_getIdentity(); - vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(), id); - if(p == _subscribers.end()) + auto traceLevels = _instance->traceLevels(); + + if(find(_subscribers.begin(), _subscribers.end(), id) == _subscribers.end()) { string name = IceStormInternal::identityToTopicName(id); - TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -704,7 +704,6 @@ TopicImpl::unlink(const TopicPrx& topic) throw NoSuchLink(name); } - TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -719,9 +718,9 @@ TopicImpl::unlink(const TopicPrx& topic) void TopicImpl::reap(const Ice::IdentitySeq& ids) { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lg(_subscribersMutex); - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -742,13 +741,14 @@ TopicImpl::reap(const Ice::IdentitySeq& ids) void TopicImpl::shutdown() { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lg(_subscribersMutex); + _servant = 0; // Shutdown each subscriber. This waits for the event queues to drain. - for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + for(const auto& subscriber : _subscribers) { - (*p)->shutdown(); + subscriber->shutdown(); } _observer.detach(); @@ -757,13 +757,13 @@ TopicImpl::shutdown() LinkInfoSeq TopicImpl::getLinkInfoSeq() const { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lg(_subscribersMutex); LinkInfoSeq seq; - for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + for(const auto& subscriber : _subscribers) { - SubscriberRecord record = (*p)->record(); - if(record.link && !(*p)->errored()) + SubscriberRecord record = subscriber->record(); + if(record.link && !subscriber->errored()) { LinkInfo info; info.name = IceStormInternal::identityToTopicName(record.theTopic->ice_getIdentity()); @@ -778,12 +778,12 @@ TopicImpl::getLinkInfoSeq() const Ice::IdentitySeq TopicImpl::getSubscribers() const { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lg(_subscribersMutex); Ice::IdentitySeq subscribers; - for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + for(const auto& subscriber : _subscribers) { - subscribers.push_back((*p)->id()); + subscribers.push_back(subscriber->id()); } return subscribers; } @@ -791,7 +791,7 @@ TopicImpl::getSubscribers() const void TopicImpl::destroy() { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lg(_subscribersMutex); if(_destroyed) { @@ -799,7 +799,7 @@ TopicImpl::destroy() } _destroyed = true; - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -807,7 +807,7 @@ TopicImpl::destroy() } // destroyInternal clears out the topic content. - LogUpdate llu = {0,0}; + LogUpdate llu = { 0,0 }; _instance->observers()->destroyTopic(destroyInternal(llu, true), _name); _observer.detach(); @@ -816,18 +816,18 @@ TopicImpl::destroy() TopicContent TopicImpl::getContent() const { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lg(_subscribersMutex); TopicContent content; content.id = _id; - for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + for(const auto& subscriber : _subscribers) { // Don't return errored subscribers (subscribers that have // errored out, but not reaped due to a failure with the // master). This means we can avoid the reaping step later. - if(!(*p)->errored()) + if(!subscriber->errored()) { - content.records.push_back((*p)->record()); + content.records.push_back(subscriber->record()); } } return content; @@ -836,7 +836,7 @@ TopicImpl::getContent() const void TopicImpl::update(const SubscriberRecordSeq& records) { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lg(_subscribersMutex); // We do this with two scans. The first runs through the subscribers // that we have and removes those not in the init list. The second @@ -844,7 +844,7 @@ TopicImpl::update(const SubscriberRecordSeq& records) // exist. { - vector<SubscriberPtr>::iterator p = _subscribers.begin(); + auto p = _subscribers.begin(); while(p != _subscribers.end()) { SubscriberRecordSeq::const_iterator q; @@ -871,19 +871,19 @@ TopicImpl::update(const SubscriberRecordSeq& records) } } - for(SubscriberRecordSeq::const_iterator p = records.begin(); p != records.end(); ++p) + for(const auto& record : records) { - vector<SubscriberPtr>::iterator q; + vector<shared_ptr<Subscriber>>::iterator q; for(q = _subscribers.begin(); q != _subscribers.end(); ++q) { - if((*q)->id() == p->id) + if((*q)->id() == record.id) { break; } } if(q == _subscribers.end()) { - SubscriberPtr subscriber = Subscriber::create(_instance, *p); + auto subscriber = Subscriber::create(_instance, record); _subscribers.push_back(subscriber); } } @@ -892,7 +892,8 @@ TopicImpl::update(const SubscriberRecordSeq& records) bool TopicImpl::destroyed() const { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lg(_subscribersMutex); + return _destroyed; } @@ -903,11 +904,11 @@ TopicImpl::id() const return _id; } -TopicPrx +shared_ptr<TopicPrx> TopicImpl::proxy() const { // immutable - Ice::ObjectPrx prx; + shared_ptr<Ice::ObjectPrx> prx; if(_instance->topicReplicaProxy()) { prx = _instance->topicReplicaProxy()->ice_identity(_id); @@ -916,44 +917,13 @@ TopicImpl::proxy() const { prx = _instance->topicAdapter()->createProxy(_id); } - return TopicPrx::uncheckedCast(prx); -} - -namespace -{ - -class TopicInternalReapCB : public IceUtil::Shared -{ -public: - - TopicInternalReapCB(const PersistentInstancePtr& instance, Ice::Long generation) : - _instance(instance), _generation(generation) - { - } - - virtual void exception(const Ice::Exception& ex) - { - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "exception when calling `reap' on the master replica: " << ex; - } - _instance->node()->recovery(_generation); - } - -private: - - const PersistentInstancePtr _instance; - const Ice::Long _generation; -}; - + return Ice::uncheckedCast<TopicPrx>(prx); } void TopicImpl::publish(bool forwarded, const EventDataSeq& events) { - TopicInternalPrx masterInternal; + shared_ptr<TopicInternalPrx> masterInternal; Ice::Long generation = -1; Ice::IdentitySeq reap; { @@ -964,9 +934,10 @@ TopicImpl::publish(bool forwarded, const EventDataSeq& events) // Copy of the subscriber list so that event publishing can occur // in parallel. // - vector<SubscriberPtr> copy; + vector<shared_ptr<Subscriber>> copy; { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lock(_subscribersMutex); + if(_observer) { if(forwarded) @@ -985,11 +956,11 @@ TopicImpl::publish(bool forwarded, const EventDataSeq& events) // Queue each event, gathering a list of those subscribers that // must be reaped. // - for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p) + for(const auto& subscriber : copy) { - if(!(*p)->queue(forwarded, events) && (*p)->reap()) + if(!subscriber->queue(forwarded, events) && subscriber->reap()) { - reap.push_back((*p)->id()); + reap.push_back(subscriber->id()); } } @@ -1000,11 +971,11 @@ TopicImpl::publish(bool forwarded, const EventDataSeq& events) } if(!unlock.getMaster()) { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lock(_subscribersMutex); removeSubscribers(reap); return; } - masterInternal = TopicInternalPrx::uncheckedCast(unlock.getMaster()->ice_identity(_id)); + masterInternal = Ice::uncheckedCast<TopicInternalPrx>(unlock.getMaster()->ice_identity(_id)); generation = unlock.generation(); } @@ -1017,16 +988,32 @@ TopicImpl::publish(bool forwarded, const EventDataSeq& events) // call may raise an exception in the caller (that is directly // call ice_exception) which calls recover() on the node which // would result in a deadlock since the node is locked. - masterInternal->begin_reap(reap, newCallback_TopicInternal_reap(new TopicInternalReapCB(_instance, generation), - &TopicInternalReapCB::exception)); + + masterInternal->reapAsync(reap, nullptr, [instance = _instance, generation](exception_ptr ex) + { + auto traceLevels = instance->traceLevels(); + if(traceLevels->topic > 0) + { + try + { + rethrow_exception(ex); + } + catch(const std::exception& e) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "exception when calling `reap' on the master replica: " << e; + } + } + instance->node()->recovery(generation); + }); } void TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& record) { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lock(_subscribersMutex); - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -1048,8 +1035,7 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r out << " llu: " << llu.generation << "/" << llu.iteration; } - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); - if(p != _subscribers.end()) + if(find(_subscribers.begin(), _subscribers.end(), record.id) != _subscribers.end()) { // If the subscriber is already in the database display a // diagnostic. @@ -1061,7 +1047,7 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r return; } - SubscriberPtr subscriber = Subscriber::create(_instance, record); + auto subscriber = Subscriber::create(_instance, record); try { IceDB::ReadWriteTxn txn(_instance->dbEnv()); @@ -1089,7 +1075,7 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r void TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq& ids) { - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -1105,19 +1091,16 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq out << " llu: " << llu.generation << "/" << llu.iteration; } - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lock(_subscribersMutex); // First remove from the database. try { IceDB::ReadWriteTxn txn(_instance->dbEnv()); - for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) + for(const auto& id : ids) { - SubscriberRecordKey key; - key.topic = _id; - key.id = *id; - + SubscriberRecordKey key = { _id, id }; _subscriberMap.del(txn, key); } @@ -1134,9 +1117,9 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq // Then remove the subscriber from the subscribers list. If the // subscriber had a local failure and was removed from the // subscriber list it could already be gone. That's not a problem. - for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) + for(const auto& id : ids) { - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id); + auto p = find(_subscribers.begin(), _subscribers.end(), id); if(p != _subscribers.end()) { (*p)->destroy(); @@ -1148,7 +1131,7 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq void TopicImpl::observerDestroyTopic(const LogUpdate& llu) { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lock(_subscribersMutex); if(_destroyed) { @@ -1156,7 +1139,7 @@ TopicImpl::observerDestroyTopic(const LogUpdate& llu) } _destroyed = true; - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -1166,7 +1149,7 @@ TopicImpl::observerDestroyTopic(const LogUpdate& llu) destroyInternal(llu, false); } -Ice::ObjectPtr +shared_ptr<Ice::Object> TopicImpl::getServant() const { return _servant; @@ -1175,7 +1158,8 @@ TopicImpl::getServant() const void TopicImpl::updateObserver() { - IceUtil::Mutex::Lock sync(_subscribersMutex); + lock_guard<mutex> lock(_subscribersMutex); + if(_instance->observer()) { _observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, _observer.get())); @@ -1185,10 +1169,11 @@ TopicImpl::updateObserver() void TopicImpl::updateSubscriberObservers() { - IceUtil::Mutex::Lock sync(_subscribersMutex); - for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + lock_guard<mutex> lock(_subscribersMutex); + + for(const auto& subscriber : _subscribers) { - (*p)->updateObserver(); + subscriber->updateObserver(); } } @@ -1243,9 +1228,9 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master) _instance->topicReaper()->add(_name); // Destroy each of the subscribers. - for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + for(const auto& subscriber : _subscribers) { - (*p)->destroy(); + subscriber->destroy(); } _subscribers.clear(); @@ -1267,11 +1252,9 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids) { IceDB::ReadWriteTxn txn(_instance->dbEnv()); - for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) + for(const auto& id : ids) { - SubscriberRecordKey key; - key.topic = _id; - key.id = *id; + SubscriberRecordKey key = { _id, id }; if(_subscriberMap.del(txn, key)) { @@ -1305,7 +1288,7 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids) // removed. for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id) { - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id); + auto p = find(_subscribers.begin(), _subscribers.end(), *id); if(p != _subscribers.end()) { (*p)->destroy(); |