summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorDwayne Boone <dwayne@zeroc.com>2014-08-25 12:51:31 -0230
committerDwayne Boone <dwayne@zeroc.com>2014-08-25 12:51:31 -0230
commit695f36759bc6dfeb66c3a86c5f3473583a620b7f (patch)
tree00e83692851a9524fd3b835ea572e5f4043c6581 /cpp/src
parentMakedist fixes: (diff)
downloadice-695f36759bc6dfeb66c3a86c5f3473583a620b7f.tar.bz2
ice-695f36759bc6dfeb66c3a86c5f3473583a620b7f.tar.xz
ice-695f36759bc6dfeb66c3a86c5f3473583a620b7f.zip
ICE-5649 handle null subscribers in IceStorm subscribe calls
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp62
-rw-r--r--cpp/src/IceStorm/TransientTopicI.cpp30
2 files changed, 66 insertions, 26 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index 81f404c1b0b..1c46fc2b72e 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -76,7 +76,7 @@ public:
//event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
Ice::ByteSeq data(inParams.first, inParams.second);
event->data.swap(data);
-
+
EventDataSeq v;
v.push_back(event);
_topic->publish(false, v);
@@ -85,7 +85,7 @@ public:
}
private:
-
+
const TopicImplPtr _topic;
const InstancePtr _instance;
};
@@ -340,7 +340,7 @@ public:
CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
return _impl->getLinkInfoSeq();
}
-
+
virtual Ice::IdentitySeq getSubscribers(const Ice::Current&) const
{
return _impl->getSubscribers();
@@ -415,7 +415,7 @@ TopicImpl::TopicImpl(
try
{
__setNoDelete(true);
-
+
// TODO: If we want to improve the performance of the
// non-replicated case we could allocate a null-topic impl here.
_servant = new TopicI(this, instance);
@@ -450,7 +450,7 @@ TopicImpl::TopicImpl(
_publisherPrx = _instance->publishAdapter()->add(new PublisherI(this, instance), pubid);
_linkPrx = TopicLinkPrx::uncheckedCast(
_instance->publishAdapter()->add(new TopicLinkI(this, instance), linkid));
-
+
//
// Re-establish subscribers.
//
@@ -562,6 +562,16 @@ trace(Ice::Trace& out, const InstancePtr& instance, const vector<SubscriberPtr>&
void
TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
{
+ if(!obj)
+ {
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "subscribe with null subscriber.";
+ }
+ throw NullSubscriber();
+ }
Ice::Identity id = obj->ice_getIdentity();
TraceLevelsPtr traceLevels = _instance->traceLevels();
QoS qos = origQoS;
@@ -679,7 +689,7 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
- }
+ }
}
Ice::IdentitySeq ids;
ids.push_back(id);
@@ -716,7 +726,7 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
- }
+ }
}
_subscribers.push_back(subscriber);
@@ -727,6 +737,16 @@ TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
Ice::ObjectPrx
TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
{
+ if(!obj)
+ {
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "subscribe with null subscriber.";
+ }
+ throw NullSubscriber();
+ }
Ice::Identity id = obj->ice_getIdentity();
TraceLevelsPtr traceLevels = _instance->traceLevels();
@@ -799,7 +819,7 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
- }
+ }
}
_subscribers.push_back(subscriber);
@@ -829,7 +849,7 @@ TopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
out << _name << ": unsubscribe: " << _instance->communicator()->identityToString(id);
-
+
if(traceLevels->topic > 1)
{
out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber);
@@ -922,7 +942,7 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
- }
+ }
}
_subscribers.push_back(subscriber);
@@ -940,7 +960,7 @@ TopicImpl::unlink(const TopicPrx& topic)
}
Ice::Identity id = topic->ice_getIdentity();
-
+
vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(), id);
if(p == _subscribers.end())
{
@@ -1032,7 +1052,7 @@ Ice::IdentitySeq
TopicImpl::getSubscribers() const
{
IceUtil::Mutex::Lock sync(_subscribersMutex);
-
+
Ice::IdentitySeq subscribers;
for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
{
@@ -1261,7 +1281,7 @@ TopicImpl::publish(bool forwarded, const EventDataSeq& events)
generation = unlock.generation();
}
-
+
// Tell the master to reap this set of subscribers. This is an
// AMI invocation so it shouldn't block the caller (in the
// typical case) we do it outside of the mutex lock for
@@ -1285,7 +1305,7 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
out << _name << ": add replica observer: " << _instance->communicator()->identityToString(record.id);
-
+
if(traceLevels->topic > 1)
{
out << " endpoints: " << IceStormInternal::describeEndpoints(record.obj)
@@ -1344,7 +1364,7 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r
catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
- }
+ }
}
_subscribers.push_back(subscriber);
@@ -1383,7 +1403,7 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq
_subscribers.erase(p);
}
}
-
+
// Next remove from the database.
for(;;)
{
@@ -1412,7 +1432,7 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq
catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
- }
+ }
}
}
@@ -1487,7 +1507,7 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
// Erase all subscriber records and the topic record.
SubscriberMap subscriberMap(connection, subscriberDbName);
-
+
IceStorm::SubscriberRecordKey key;
key.topic = _id;
SubscriberMap::iterator p = subscriberMap.find(key);
@@ -1518,7 +1538,7 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
- }
+ }
}
_instance->topicAdapter()->remove(_id);
@@ -1571,7 +1591,7 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
SubscriberRecordKey key;
key.topic = _id;
key.id = *id;
-
+
SubscriberMap subscriberMap(connection, subscriberDbName);
subscriberMap.erase(key);
}
@@ -1590,7 +1610,7 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
catch(const DatabaseException& ex)
{
halt(_instance->communicator(), ex);
- }
+ }
}
_instance->observers()->removeSubscriber(llu, _name, ids);
diff --git a/cpp/src/IceStorm/TransientTopicI.cpp b/cpp/src/IceStorm/TransientTopicI.cpp
index 554fd6bee0b..0ee6f7d724f 100644
--- a/cpp/src/IceStorm/TransientTopicI.cpp
+++ b/cpp/src/IceStorm/TransientTopicI.cpp
@@ -56,7 +56,7 @@ public:
//event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
Ice::ByteSeq data(inParams.first, inParams.second);
event->data.swap(data);
-
+
EventDataSeq v;
v.push_back(event);
_impl->publish(false, v);
@@ -65,7 +65,7 @@ public:
}
private:
-
+
const TransientTopicImplPtr _impl;
};
@@ -168,6 +168,16 @@ TransientTopicImpl::getNonReplicatedPublisher(const Ice::Current&) const
void
TransientTopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&)
{
+ if(!obj)
+ {
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "subscribe with null subscriber.";
+ }
+ throw NullSubscriber();
+ }
Ice::Identity id = obj->ice_getIdentity();
TraceLevelsPtr traceLevels = _instance->traceLevels();
QoS qos = origQoS;
@@ -175,7 +185,7 @@ TransientTopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, con
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
out << _name << ": subscribe: " << _instance->communicator()->identityToString(id);
-
+
if(traceLevels->topic > 1)
{
out << " endpoints: " << IceStormInternal::describeEndpoints(obj)
@@ -260,6 +270,16 @@ TransientTopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, con
Ice::ObjectPrx
TransientTopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&)
{
+ if(!obj)
+ {
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "subscribe with null subscriber.";
+ }
+ throw NullSubscriber();
+ }
Ice::Identity id = obj->ice_getIdentity();
TraceLevelsPtr traceLevels = _instance->traceLevels();
@@ -399,7 +419,7 @@ TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&)
}
Ice::Identity id = topic->ice_getIdentity();
-
+
vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
if(p == _subscribers.end())
{
@@ -458,7 +478,7 @@ Ice::IdentitySeq
TransientTopicImpl::getSubscribers(const Ice::Current&) const
{
IceUtil::Mutex::Lock sync(*this);
-
+
Ice::IdentitySeq subscribers;
for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
{