summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TransientTopicI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TransientTopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TransientTopicI.cpp297
1 files changed, 95 insertions, 202 deletions
diff --git a/cpp/src/IceStorm/TransientTopicI.cpp b/cpp/src/IceStorm/TransientTopicI.cpp
index 055d32caf60..1691216eb3c 100644
--- a/cpp/src/IceStorm/TransientTopicI.cpp
+++ b/cpp/src/IceStorm/TransientTopicI.cpp
@@ -27,32 +27,22 @@ class TransientPublisherI : public Ice::BlobjectArray
{
public:
- TransientPublisherI(const TransientTopicImplPtr& impl) :
- _impl(impl)
+ TransientPublisherI(shared_ptr<TransientTopicImpl> impl) :
+ _impl(move(impl))
{
}
- virtual bool
- ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
- Ice::ByteSeq&,
- const Ice::Current& current)
+ bool
+ ice_invoke(pair<const Ice::Byte*, const Ice::Byte*> inParams, Ice::ByteSeq&, const Ice::Current& current) override
{
// Use cached reads.
- EventDataPtr event = new EventData(
- current.operation,
- current.mode,
- Ice::ByteSeq(),
- current.ctx);
-
- //
- // COMPILERBUG: gcc 4.0.1 doesn't like this.
- //
- //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
+ EventData event = { current.operation, current.mode, Ice::ByteSeq(), current.ctx };
+
Ice::ByteSeq data(inParams.first, inParams.second);
- event->data.swap(data);
+ event.data.swap(data);
EventDataSeq v;
- v.push_back(event);
+ v.push_back(move(event));
_impl->publish(false, v);
return true;
@@ -60,7 +50,7 @@ public:
private:
- const TransientTopicImplPtr _impl;
+ const shared_ptr<TransientTopicImpl> _impl;
};
//
@@ -71,33 +61,29 @@ class TransientTopicLinkI : public TopicLink
{
public:
- TransientTopicLinkI(const TransientTopicImplPtr& impl) :
- _impl(impl)
+ TransientTopicLinkI(shared_ptr<TransientTopicImpl> impl) :
+ _impl(move(impl))
{
}
- virtual void
- forward(const EventDataSeq& v, const Ice::Current& /*current*/)
+ void
+ forward(EventDataSeq v, const Ice::Current&) override
{
- _impl->publish(true, v);
+ _impl->publish(true, move(v));
}
private:
- const TransientTopicImplPtr _impl;
+ const shared_ptr<TransientTopicImpl> _impl;
};
}
-TransientTopicImpl::TransientTopicImpl(
- const InstancePtr& instance,
- const string& name,
- const Ice::Identity& id) :
- _instance(instance),
- _name(name),
- _id(id),
- _destroyed(false)
+shared_ptr<TransientTopicImpl>
+TransientTopicImpl::create(const shared_ptr<Instance>& instance, const std::string& name, const Ice::Identity& id)
{
+ shared_ptr<TransientTopicImpl> topicImpl(new TransientTopicImpl(instance, name, id));
+
//
// Create a servant per topic to receive event data. If the
// category is empty then we are in backwards compatibility
@@ -112,24 +98,34 @@ TransientTopicImpl::TransientTopicImpl(
Ice::Identity linkid;
if(id.category.empty())
{
- pubid.category = _name;
+ pubid.category = name;
pubid.name = "publish";
- linkid.category = _name;
+ linkid.category = name;
linkid.name = "link";
}
else
{
pubid.category = id.category;
- pubid.name = _name + ".publish";
+ pubid.name = name + ".publish";
linkid.category = id.category;
- linkid.name = _name + ".link";
+ linkid.name = name + ".link";
}
- _publisherPrx = _instance->publishAdapter()->add(new TransientPublisherI(this), pubid);
- _linkPrx = TopicLinkPrx::uncheckedCast(_instance->publishAdapter()->add(new TransientTopicLinkI(this), linkid));
+ auto publisher = make_shared<TransientPublisherI>(topicImpl);
+ topicImpl->_publisherPrx = instance->publishAdapter()->add(publisher, pubid);
+ auto topicLink = make_shared<TransientTopicLinkI>(topicImpl);
+ topicImpl->_linkPrx = Ice::uncheckedCast<TopicLinkPrx>(instance->publishAdapter()->add(topicLink, linkid));
+
+ return topicImpl;
}
-TransientTopicImpl::~TransientTopicImpl()
+TransientTopicImpl::TransientTopicImpl(shared_ptr<Instance> instance,
+ const std::string& name,
+ const Ice::Identity& id) :
+ _instance(move(instance)),
+ _name(name),
+ _id(id),
+ _destroyed(false)
{
}
@@ -140,128 +136,26 @@ TransientTopicImpl::getName(const Ice::Current&) const
return _name;
}
-Ice::ObjectPrx
+shared_ptr<Ice::ObjectPrx>
TransientTopicImpl::getPublisher(const Ice::Current&) const
{
// Immutable
return _publisherPrx;
}
-Ice::ObjectPrx
+shared_ptr<Ice::ObjectPrx>
TransientTopicImpl::getNonReplicatedPublisher(const Ice::Current&) const
{
// Immutable
return _publisherPrx;
}
-void
-TransientTopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&)
-{
- if(!obj)
- {
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << ": subscribe: null proxy";
- }
- throw InvalidSubscriber("subscriber is a null proxy");
- }
- Ice::Identity id = obj->ice_getIdentity();
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- QoS qos = origQoS;
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << ": subscribe: " << _instance->communicator()->identityToString(id);
-
- if(traceLevels->topic > 1)
- {
- out << " endpoints: " << IceStormInternal::describeEndpoints(obj)
- << " QoS: ";
- for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
- {
- if(p != qos.begin())
- {
- out << ',';
- }
- out << '[' << p->first << "," << p->second << ']';
- }
- }
- }
-
- string reliability = "oneway";
- {
- QoS::iterator p = qos.find("reliability");
- if(p != qos.end())
- {
- reliability = p->second;
- qos.erase(p);
- }
- }
-
- Ice::ObjectPrx newObj = obj;
- if(reliability == "batch")
- {
- if(newObj->ice_isDatagram())
- {
- newObj = newObj->ice_batchDatagram();
- }
- else
- {
- newObj = newObj->ice_batchOneway();
- }
- }
- else if(reliability == "twoway")
- {
- newObj = newObj->ice_twoway();
- }
- else if(reliability == "twoway ordered")
- {
- qos["reliability"] = "ordered";
- newObj = newObj->ice_twoway();
- }
- else // reliability == "oneway"
- {
- if(reliability != "oneway" && traceLevels->subscriber > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << reliability <<" mode not understood.";
- }
- if(!newObj->ice_isDatagram())
- {
- newObj = newObj->ice_oneway();
- }
- }
-
- Lock sync(*this);
- SubscriberRecord record;
- record.id = id;
- record.obj = newObj;
- record.theQoS = qos;
- record.topicName = _name;
- record.link = false;
- record.cost = 0;
-
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
- if(p != _subscribers.end())
- {
- // If we already have this subscriber remove it from our
- // subscriber list and remove it from the database.
- (*p)->destroy();
- _subscribers.erase(p);
- }
-
- SubscriberPtr subscriber = Subscriber::create(_instance, record);
- _subscribers.push_back(subscriber);
-}
-
-Ice::ObjectPrx
-TransientTopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&)
+shared_ptr<Ice::ObjectPrx>
+TransientTopicImpl::subscribeAndGetPublisher(QoS qos, shared_ptr<Ice::ObjectPrx> obj, const Ice::Current&)
{
if(!obj)
{
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -271,7 +165,7 @@ TransientTopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPr
}
Ice::Identity id = obj->ice_getIdentity();
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -292,7 +186,7 @@ TransientTopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPr
}
}
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
SubscriberRecord record;
record.id = id;
@@ -302,22 +196,21 @@ TransientTopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPr
record.link = false;
record.cost = 0;
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
- if(p != _subscribers.end())
+ if(find(_subscribers.begin(), _subscribers.end(), record.id) != _subscribers.end())
{
throw AlreadySubscribed();
}
- SubscriberPtr subscriber = Subscriber::create(_instance, record);
+ auto subscriber = Subscriber::create(_instance, record);
_subscribers.push_back(subscriber);
return subscriber->proxy();
}
void
-TransientTopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&)
+TransientTopicImpl::unsubscribe(shared_ptr<Ice::ObjectPrx> subscriber, const Ice::Current&)
{
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(!subscriber)
{
if(traceLevels->topic > 0)
@@ -340,11 +233,12 @@ TransientTopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Cur
}
}
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
+
// First remove the subscriber from the subscribers list. Note
// that its possible that the subscriber isn't in the list, but is
// in the database if the subscriber was locally reaped.
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
+ auto p = find(_subscribers.begin(), _subscribers.end(), id);
if(p != _subscribers.end())
{
(*p)->destroy();
@@ -352,7 +246,7 @@ TransientTopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Cur
}
}
-TopicLinkPrx
+shared_ptr<TopicLinkPrx>
TransientTopicImpl::getLinkProxy(const Ice::Current&)
{
// immutable
@@ -360,12 +254,12 @@ TransientTopicImpl::getLinkProxy(const Ice::Current&)
}
void
-TransientTopicImpl::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
+TransientTopicImpl::link(shared_ptr<TopicPrx> topic, int cost, const Ice::Current&)
{
- TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic);
- TopicLinkPrx link = internal->getLinkProxy();
+ auto internal = Ice::uncheckedCast<TopicInternalPrx>(topic);
+ auto link = internal->getLinkProxy();
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -373,9 +267,9 @@ TransientTopicImpl::link(const TopicPrx& topic, Ice::Int cost, const Ice::Curren
<< " cost " << cost;
}
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
- Ice::Identity id = topic->ice_getIdentity();
+ auto id = topic->ice_getIdentity();
SubscriberRecord record;
record.id = id;
@@ -385,32 +279,32 @@ TransientTopicImpl::link(const TopicPrx& topic, Ice::Int cost, const Ice::Curren
record.link = true;
record.cost = cost;
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
- if(p != _subscribers.end())
+ if(find(_subscribers.begin(), _subscribers.end(), record.id) != _subscribers.end())
{
throw LinkExists(IceStormInternal::identityToTopicName(id));
}
- SubscriberPtr subscriber = Subscriber::create(_instance, record);
+ auto subscriber = Subscriber::create(_instance, record);
_subscribers.push_back(subscriber);
}
void
-TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&)
+TransientTopicImpl::unlink(shared_ptr<TopicPrx> topic, const Ice::Current&)
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
+
if(_destroyed)
{
throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
- Ice::Identity id = topic->ice_getIdentity();
+ auto id = topic->ice_getIdentity();
+ auto traceLevels = _instance->traceLevels();
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
- if(p == _subscribers.end())
+ if(find(_subscribers.begin(), _subscribers.end(), id) == _subscribers.end())
{
string name = IceStormInternal::identityToTopicName(id);
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -419,7 +313,6 @@ TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&)
throw NoSuchLink(name);
}
- TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -429,7 +322,7 @@ TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&)
// Remove the subscriber from the subscribers list. Note
// that its possible that the subscriber isn't in the list, but is
// in the database if the subscriber was locally reaped.
- p = find(_subscribers.begin(), _subscribers.end(), id);
+ auto p = find(_subscribers.begin(), _subscribers.end(), id);
if(p != _subscribers.end())
{
(*p)->destroy();
@@ -440,12 +333,13 @@ TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&)
LinkInfoSeq
TransientTopicImpl::getLinkInfoSeq(const Ice::Current&) const
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
+
LinkInfoSeq seq;
- for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ for(const auto& subscriber : _subscribers)
{
- SubscriberRecord record = (*p)->record();
- if(record.link && !(*p)->errored())
+ SubscriberRecord record = subscriber->record();
+ if(record.link && !subscriber->errored())
{
LinkInfo info;
info.name = IceStormInternal::identityToTopicName(record.theTopic->ice_getIdentity());
@@ -460,12 +354,12 @@ TransientTopicImpl::getLinkInfoSeq(const Ice::Current&) const
Ice::IdentitySeq
TransientTopicImpl::getSubscribers(const Ice::Current&) const
{
- IceUtil::Mutex::Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
Ice::IdentitySeq subscribers;
- for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ for(const auto& subscriber : _subscribers)
{
- subscribers.push_back((*p)->id());
+ subscribers.push_back(subscriber->id());
}
return subscribers;
}
@@ -473,7 +367,7 @@ TransientTopicImpl::getSubscribers(const Ice::Current&) const
void
TransientTopicImpl::destroy(const Ice::Current&)
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
if(_destroyed)
{
@@ -481,7 +375,7 @@ TransientTopicImpl::destroy(const Ice::Current&)
}
_destroyed = true;
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -499,22 +393,22 @@ TransientTopicImpl::destroy(const Ice::Current&)
}
// Destroy all of the subscribers.
- for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ for(const auto& subscriber : _subscribers)
{
- (*p)->destroy();
+ subscriber->destroy();
}
_subscribers.clear();
}
void
-TransientTopicImpl::reap(const Ice::IdentitySeq&, const Ice::Current&)
+TransientTopicImpl::reap(Ice::IdentitySeq, const Ice::Current&)
{
}
bool
TransientTopicImpl::destroyed() const
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
return _destroyed;
}
@@ -532,9 +426,9 @@ TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events)
// Copy of the subscriber list so that event publishing can occur
// in parallel.
//
- vector<SubscriberPtr> copy;
+ vector<shared_ptr<Subscriber>> copy;
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
copy = _subscribers;
}
@@ -542,12 +436,12 @@ TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events)
// Queue each event, gathering a list of those subscribers that
// must be reaped.
//
- vector<Ice::Identity> e;
- for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
+ vector<Ice::Identity> ids;
+ for(const auto& subscriber : copy)
{
- if(!(*p)->queue(forwarded, events) && (*p)->reap())
+ if(!subscriber->queue(forwarded, events) && subscriber->reap())
{
- e.push_back((*p)->id());
+ ids.push_back(subscriber->id());
}
}
@@ -555,10 +449,10 @@ TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events)
// Run through the error list removing those subscribers that are
// in error from the subscriber list.
//
- if(!e.empty())
+ if(!ids.empty())
{
- Lock sync(*this);
- for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep)
+ lock_guard<mutex> lg(_mutex);
+ for(const auto& id : ids)
{
//
// Its possible for the subscriber to already have been
@@ -574,14 +468,13 @@ TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events)
// error'd subscribers and remove it from the database on
// the next reap.
//
- vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep);
+ auto q = find(_subscribers.begin(), _subscribers.end(), id);
if(q != _subscribers.end())
{
- SubscriberPtr subscriber = *q;
//
// Destroy the subscriber.
//
- subscriber->destroy();
+ (*q)->destroy();
_subscribers.erase(q);
}
}
@@ -591,11 +484,11 @@ TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events)
void
TransientTopicImpl::shutdown()
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
// Shutdown each subscriber. This waits for the event queues to drain.
- for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ for(const auto& subscriber : _subscribers)
{
- (*p)->shutdown();
+ subscriber->shutdown();
}
}