diff options
author | Dwayne Boone <dwayne@zeroc.com> | 2014-08-25 12:51:31 -0230 |
---|---|---|
committer | Dwayne Boone <dwayne@zeroc.com> | 2014-08-25 12:51:31 -0230 |
commit | 695f36759bc6dfeb66c3a86c5f3473583a620b7f (patch) | |
tree | 00e83692851a9524fd3b835ea572e5f4043c6581 /cpp/src/IceStorm/TopicI.cpp | |
parent | Makedist fixes: (diff) | |
download | ice-695f36759bc6dfeb66c3a86c5f3473583a620b7f.tar.bz2 ice-695f36759bc6dfeb66c3a86c5f3473583a620b7f.tar.xz ice-695f36759bc6dfeb66c3a86c5f3473583a620b7f.zip |
ICE-5649 handle null subscribers in IceStorm subscribe calls
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 62 |
1 files changed, 41 insertions, 21 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 81f404c1b0b..1c46fc2b72e 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -76,7 +76,7 @@ public: //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); Ice::ByteSeq data(inParams.first, inParams.second); event->data.swap(data); - + EventDataSeq v; v.push_back(event); _topic->publish(false, v); @@ -85,7 +85,7 @@ public: } private: - + const TopicImplPtr _topic; const InstancePtr _instance; }; @@ -340,7 +340,7 @@ public: CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); return _impl->getLinkInfoSeq(); } - + virtual Ice::IdentitySeq getSubscribers(const Ice::Current&) const { return _impl->getSubscribers(); @@ -415,7 +415,7 @@ TopicImpl::TopicImpl( try { __setNoDelete(true); - + // TODO: If we want to improve the performance of the // non-replicated case we could allocate a null-topic impl here. _servant = new TopicI(this, instance); @@ -450,7 +450,7 @@ TopicImpl::TopicImpl( _publisherPrx = _instance->publishAdapter()->add(new PublisherI(this, instance), pubid); _linkPrx = TopicLinkPrx::uncheckedCast( _instance->publishAdapter()->add(new TopicLinkI(this, instance), linkid)); - + // // Re-establish subscribers. // @@ -562,6 +562,16 @@ trace(Ice::Trace& out, const InstancePtr& instance, const vector<SubscriberPtr>& void TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj) { + if(!obj) + { + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "subscribe with null subscriber."; + } + throw NullSubscriber(); + } Ice::Identity id = obj->ice_getIdentity(); TraceLevelsPtr traceLevels = _instance->traceLevels(); QoS qos = origQoS; @@ -679,7 +689,7 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj) catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); - } + } } Ice::IdentitySeq ids; ids.push_back(id); @@ -716,7 +726,7 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj) catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); - } + } } _subscribers.push_back(subscriber); @@ -727,6 +737,16 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj) Ice::ObjectPrx TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj) { + if(!obj) + { + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "subscribe with null subscriber."; + } + throw NullSubscriber(); + } Ice::Identity id = obj->ice_getIdentity(); TraceLevelsPtr traceLevels = _instance->traceLevels(); @@ -799,7 +819,7 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj) catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); - } + } } _subscribers.push_back(subscriber); @@ -829,7 +849,7 @@ TopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); out << _name << ": unsubscribe: " << _instance->communicator()->identityToString(id); - + if(traceLevels->topic > 1) { out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber); @@ -922,7 +942,7 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost) catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); - } + } } _subscribers.push_back(subscriber); @@ -940,7 +960,7 @@ TopicImpl::unlink(const TopicPrx& topic) } Ice::Identity id = topic->ice_getIdentity(); - + vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(), id); if(p == _subscribers.end()) { @@ -1032,7 +1052,7 @@ Ice::IdentitySeq TopicImpl::getSubscribers() const { IceUtil::Mutex::Lock sync(_subscribersMutex); - + Ice::IdentitySeq subscribers; for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) { @@ -1261,7 +1281,7 @@ TopicImpl::publish(bool forwarded, const EventDataSeq& events) generation = unlock.generation(); } - + // Tell the master to reap this set of subscribers. This is an // AMI invocation so it shouldn't block the caller (in the // typical case) we do it outside of the mutex lock for @@ -1285,7 +1305,7 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); out << _name << ": add replica observer: " << _instance->communicator()->identityToString(record.id); - + if(traceLevels->topic > 1) { out << " endpoints: " << IceStormInternal::describeEndpoints(record.obj) @@ -1344,7 +1364,7 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); - } + } } _subscribers.push_back(subscriber); @@ -1383,7 +1403,7 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq _subscribers.erase(p); } } - + // Next remove from the database. for(;;) { @@ -1412,7 +1432,7 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); - } + } } } @@ -1487,7 +1507,7 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master) // Erase all subscriber records and the topic record. SubscriberMap subscriberMap(connection, subscriberDbName); - + IceStorm::SubscriberRecordKey key; key.topic = _id; SubscriberMap::iterator p = subscriberMap.find(key); @@ -1518,7 +1538,7 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master) catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); - } + } } _instance->topicAdapter()->remove(_id); @@ -1571,7 +1591,7 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids) SubscriberRecordKey key; key.topic = _id; key.id = *id; - + SubscriberMap subscriberMap(connection, subscriberDbName); subscriberMap.erase(key); } @@ -1590,7 +1610,7 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids) catch(const DatabaseException& ex) { halt(_instance->communicator(), ex); - } + } } _instance->observers()->removeSubscriber(llu, _name, ids); |