diff options
Diffstat (limited to 'cpp/src/IceStorm/TransientTopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TransientTopicI.cpp | 297 |
1 files changed, 95 insertions, 202 deletions
diff --git a/cpp/src/IceStorm/TransientTopicI.cpp b/cpp/src/IceStorm/TransientTopicI.cpp index 055d32caf60..1691216eb3c 100644 --- a/cpp/src/IceStorm/TransientTopicI.cpp +++ b/cpp/src/IceStorm/TransientTopicI.cpp @@ -27,32 +27,22 @@ class TransientPublisherI : public Ice::BlobjectArray { public: - TransientPublisherI(const TransientTopicImplPtr& impl) : - _impl(impl) + TransientPublisherI(shared_ptr<TransientTopicImpl> impl) : + _impl(move(impl)) { } - virtual bool - ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams, - Ice::ByteSeq&, - const Ice::Current& current) + bool + ice_invoke(pair<const Ice::Byte*, const Ice::Byte*> inParams, Ice::ByteSeq&, const Ice::Current& current) override { // Use cached reads. - EventDataPtr event = new EventData( - current.operation, - current.mode, - Ice::ByteSeq(), - current.ctx); - - // - // COMPILERBUG: gcc 4.0.1 doesn't like this. - // - //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + EventData event = { current.operation, current.mode, Ice::ByteSeq(), current.ctx }; + Ice::ByteSeq data(inParams.first, inParams.second); - event->data.swap(data); + event.data.swap(data); EventDataSeq v; - v.push_back(event); + v.push_back(move(event)); _impl->publish(false, v); return true; @@ -60,7 +50,7 @@ public: private: - const TransientTopicImplPtr _impl; + const shared_ptr<TransientTopicImpl> _impl; }; // @@ -71,33 +61,29 @@ class TransientTopicLinkI : public TopicLink { public: - TransientTopicLinkI(const TransientTopicImplPtr& impl) : - _impl(impl) + TransientTopicLinkI(shared_ptr<TransientTopicImpl> impl) : + _impl(move(impl)) { } - virtual void - forward(const EventDataSeq& v, const Ice::Current& /*current*/) + void + forward(EventDataSeq v, const Ice::Current&) override { - _impl->publish(true, v); + _impl->publish(true, move(v)); } private: - const TransientTopicImplPtr _impl; + const shared_ptr<TransientTopicImpl> _impl; }; } -TransientTopicImpl::TransientTopicImpl( - const InstancePtr& instance, - const string& name, - const Ice::Identity& id) : - _instance(instance), - _name(name), - _id(id), - _destroyed(false) +shared_ptr<TransientTopicImpl> +TransientTopicImpl::create(const shared_ptr<Instance>& instance, const std::string& name, const Ice::Identity& id) { + shared_ptr<TransientTopicImpl> topicImpl(new TransientTopicImpl(instance, name, id)); + // // Create a servant per topic to receive event data. If the // category is empty then we are in backwards compatibility @@ -112,24 +98,34 @@ TransientTopicImpl::TransientTopicImpl( Ice::Identity linkid; if(id.category.empty()) { - pubid.category = _name; + pubid.category = name; pubid.name = "publish"; - linkid.category = _name; + linkid.category = name; linkid.name = "link"; } else { pubid.category = id.category; - pubid.name = _name + ".publish"; + pubid.name = name + ".publish"; linkid.category = id.category; - linkid.name = _name + ".link"; + linkid.name = name + ".link"; } - _publisherPrx = _instance->publishAdapter()->add(new TransientPublisherI(this), pubid); - _linkPrx = TopicLinkPrx::uncheckedCast(_instance->publishAdapter()->add(new TransientTopicLinkI(this), linkid)); + auto publisher = make_shared<TransientPublisherI>(topicImpl); + topicImpl->_publisherPrx = instance->publishAdapter()->add(publisher, pubid); + auto topicLink = make_shared<TransientTopicLinkI>(topicImpl); + topicImpl->_linkPrx = Ice::uncheckedCast<TopicLinkPrx>(instance->publishAdapter()->add(topicLink, linkid)); + + return topicImpl; } -TransientTopicImpl::~TransientTopicImpl() +TransientTopicImpl::TransientTopicImpl(shared_ptr<Instance> instance, + const std::string& name, + const Ice::Identity& id) : + _instance(move(instance)), + _name(name), + _id(id), + _destroyed(false) { } @@ -140,128 +136,26 @@ TransientTopicImpl::getName(const Ice::Current&) const return _name; } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> TransientTopicImpl::getPublisher(const Ice::Current&) const { // Immutable return _publisherPrx; } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> TransientTopicImpl::getNonReplicatedPublisher(const Ice::Current&) const { // Immutable return _publisherPrx; } -void -TransientTopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&) -{ - if(!obj) - { - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << ": subscribe: null proxy"; - } - throw InvalidSubscriber("subscriber is a null proxy"); - } - Ice::Identity id = obj->ice_getIdentity(); - TraceLevelsPtr traceLevels = _instance->traceLevels(); - QoS qos = origQoS; - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << ": subscribe: " << _instance->communicator()->identityToString(id); - - if(traceLevels->topic > 1) - { - out << " endpoints: " << IceStormInternal::describeEndpoints(obj) - << " QoS: "; - for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p) - { - if(p != qos.begin()) - { - out << ','; - } - out << '[' << p->first << "," << p->second << ']'; - } - } - } - - string reliability = "oneway"; - { - QoS::iterator p = qos.find("reliability"); - if(p != qos.end()) - { - reliability = p->second; - qos.erase(p); - } - } - - Ice::ObjectPrx newObj = obj; - if(reliability == "batch") - { - if(newObj->ice_isDatagram()) - { - newObj = newObj->ice_batchDatagram(); - } - else - { - newObj = newObj->ice_batchOneway(); - } - } - else if(reliability == "twoway") - { - newObj = newObj->ice_twoway(); - } - else if(reliability == "twoway ordered") - { - qos["reliability"] = "ordered"; - newObj = newObj->ice_twoway(); - } - else // reliability == "oneway" - { - if(reliability != "oneway" && traceLevels->subscriber > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << reliability <<" mode not understood."; - } - if(!newObj->ice_isDatagram()) - { - newObj = newObj->ice_oneway(); - } - } - - Lock sync(*this); - SubscriberRecord record; - record.id = id; - record.obj = newObj; - record.theQoS = qos; - record.topicName = _name; - record.link = false; - record.cost = 0; - - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); - if(p != _subscribers.end()) - { - // If we already have this subscriber remove it from our - // subscriber list and remove it from the database. - (*p)->destroy(); - _subscribers.erase(p); - } - - SubscriberPtr subscriber = Subscriber::create(_instance, record); - _subscribers.push_back(subscriber); -} - -Ice::ObjectPrx -TransientTopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&) +shared_ptr<Ice::ObjectPrx> +TransientTopicImpl::subscribeAndGetPublisher(QoS qos, shared_ptr<Ice::ObjectPrx> obj, const Ice::Current&) { if(!obj) { - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -271,7 +165,7 @@ TransientTopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPr } Ice::Identity id = obj->ice_getIdentity(); - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -292,7 +186,7 @@ TransientTopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPr } } - Lock sync(*this); + lock_guard<mutex> lg(_mutex); SubscriberRecord record; record.id = id; @@ -302,22 +196,21 @@ TransientTopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPr record.link = false; record.cost = 0; - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); - if(p != _subscribers.end()) + if(find(_subscribers.begin(), _subscribers.end(), record.id) != _subscribers.end()) { throw AlreadySubscribed(); } - SubscriberPtr subscriber = Subscriber::create(_instance, record); + auto subscriber = Subscriber::create(_instance, record); _subscribers.push_back(subscriber); return subscriber->proxy(); } void -TransientTopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&) +TransientTopicImpl::unsubscribe(shared_ptr<Ice::ObjectPrx> subscriber, const Ice::Current&) { - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(!subscriber) { if(traceLevels->topic > 0) @@ -340,11 +233,12 @@ TransientTopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Cur } } - Lock sync(*this); + lock_guard<mutex> lg(_mutex); + // First remove the subscriber from the subscribers list. Note // that its possible that the subscriber isn't in the list, but is // in the database if the subscriber was locally reaped. - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); + auto p = find(_subscribers.begin(), _subscribers.end(), id); if(p != _subscribers.end()) { (*p)->destroy(); @@ -352,7 +246,7 @@ TransientTopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Cur } } -TopicLinkPrx +shared_ptr<TopicLinkPrx> TransientTopicImpl::getLinkProxy(const Ice::Current&) { // immutable @@ -360,12 +254,12 @@ TransientTopicImpl::getLinkProxy(const Ice::Current&) } void -TransientTopicImpl::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) +TransientTopicImpl::link(shared_ptr<TopicPrx> topic, int cost, const Ice::Current&) { - TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic); - TopicLinkPrx link = internal->getLinkProxy(); + auto internal = Ice::uncheckedCast<TopicInternalPrx>(topic); + auto link = internal->getLinkProxy(); - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -373,9 +267,9 @@ TransientTopicImpl::link(const TopicPrx& topic, Ice::Int cost, const Ice::Curren << " cost " << cost; } - Lock sync(*this); + lock_guard<mutex> lg(_mutex); - Ice::Identity id = topic->ice_getIdentity(); + auto id = topic->ice_getIdentity(); SubscriberRecord record; record.id = id; @@ -385,32 +279,32 @@ TransientTopicImpl::link(const TopicPrx& topic, Ice::Int cost, const Ice::Curren record.link = true; record.cost = cost; - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); - if(p != _subscribers.end()) + if(find(_subscribers.begin(), _subscribers.end(), record.id) != _subscribers.end()) { throw LinkExists(IceStormInternal::identityToTopicName(id)); } - SubscriberPtr subscriber = Subscriber::create(_instance, record); + auto subscriber = Subscriber::create(_instance, record); _subscribers.push_back(subscriber); } void -TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&) +TransientTopicImpl::unlink(shared_ptr<TopicPrx> topic, const Ice::Current&) { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); + if(_destroyed) { throw Ice::ObjectNotExistException(__FILE__, __LINE__); } - Ice::Identity id = topic->ice_getIdentity(); + auto id = topic->ice_getIdentity(); + auto traceLevels = _instance->traceLevels(); - vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); - if(p == _subscribers.end()) + if(find(_subscribers.begin(), _subscribers.end(), id) == _subscribers.end()) { string name = IceStormInternal::identityToTopicName(id); - TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -419,7 +313,6 @@ TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&) throw NoSuchLink(name); } - TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -429,7 +322,7 @@ TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&) // Remove the subscriber from the subscribers list. Note // that its possible that the subscriber isn't in the list, but is // in the database if the subscriber was locally reaped. - p = find(_subscribers.begin(), _subscribers.end(), id); + auto p = find(_subscribers.begin(), _subscribers.end(), id); if(p != _subscribers.end()) { (*p)->destroy(); @@ -440,12 +333,13 @@ TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&) LinkInfoSeq TransientTopicImpl::getLinkInfoSeq(const Ice::Current&) const { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); + LinkInfoSeq seq; - for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + for(const auto& subscriber : _subscribers) { - SubscriberRecord record = (*p)->record(); - if(record.link && !(*p)->errored()) + SubscriberRecord record = subscriber->record(); + if(record.link && !subscriber->errored()) { LinkInfo info; info.name = IceStormInternal::identityToTopicName(record.theTopic->ice_getIdentity()); @@ -460,12 +354,12 @@ TransientTopicImpl::getLinkInfoSeq(const Ice::Current&) const Ice::IdentitySeq TransientTopicImpl::getSubscribers(const Ice::Current&) const { - IceUtil::Mutex::Lock sync(*this); + lock_guard<mutex> lg(_mutex); Ice::IdentitySeq subscribers; - for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + for(const auto& subscriber : _subscribers) { - subscribers.push_back((*p)->id()); + subscribers.push_back(subscriber->id()); } return subscribers; } @@ -473,7 +367,7 @@ TransientTopicImpl::getSubscribers(const Ice::Current&) const void TransientTopicImpl::destroy(const Ice::Current&) { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); if(_destroyed) { @@ -481,7 +375,7 @@ TransientTopicImpl::destroy(const Ice::Current&) } _destroyed = true; - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -499,22 +393,22 @@ TransientTopicImpl::destroy(const Ice::Current&) } // Destroy all of the subscribers. - for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + for(const auto& subscriber : _subscribers) { - (*p)->destroy(); + subscriber->destroy(); } _subscribers.clear(); } void -TransientTopicImpl::reap(const Ice::IdentitySeq&, const Ice::Current&) +TransientTopicImpl::reap(Ice::IdentitySeq, const Ice::Current&) { } bool TransientTopicImpl::destroyed() const { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); return _destroyed; } @@ -532,9 +426,9 @@ TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events) // Copy of the subscriber list so that event publishing can occur // in parallel. // - vector<SubscriberPtr> copy; + vector<shared_ptr<Subscriber>> copy; { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); copy = _subscribers; } @@ -542,12 +436,12 @@ TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events) // Queue each event, gathering a list of those subscribers that // must be reaped. // - vector<Ice::Identity> e; - for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p) + vector<Ice::Identity> ids; + for(const auto& subscriber : copy) { - if(!(*p)->queue(forwarded, events) && (*p)->reap()) + if(!subscriber->queue(forwarded, events) && subscriber->reap()) { - e.push_back((*p)->id()); + ids.push_back(subscriber->id()); } } @@ -555,10 +449,10 @@ TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events) // Run through the error list removing those subscribers that are // in error from the subscriber list. // - if(!e.empty()) + if(!ids.empty()) { - Lock sync(*this); - for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep) + lock_guard<mutex> lg(_mutex); + for(const auto& id : ids) { // // Its possible for the subscriber to already have been @@ -574,14 +468,13 @@ TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events) // error'd subscribers and remove it from the database on // the next reap. // - vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep); + auto q = find(_subscribers.begin(), _subscribers.end(), id); if(q != _subscribers.end()) { - SubscriberPtr subscriber = *q; // // Destroy the subscriber. // - subscriber->destroy(); + (*q)->destroy(); _subscribers.erase(q); } } @@ -591,11 +484,11 @@ TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events) void TransientTopicImpl::shutdown() { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); // Shutdown each subscriber. This waits for the event queues to drain. - for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + for(const auto& subscriber : _subscribers) { - (*p)->shutdown(); + subscriber->shutdown(); } } |