summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
authorJoe George <joe@zeroc.com>2021-01-28 16:26:44 -0500
committerJoe George <joe@zeroc.com>2021-02-01 16:59:30 -0500
commit92a6531e409f2691d82591e185a92299d415fc0f (patch)
tree60c79e2a8f327b8f0b6ebc06b06f48a2e8086f6a /cpp/src/IceStorm/TopicI.cpp
parentPort Glacier2, IceBox, IceBridge, IceDB, IceXML, icegriddb (diff)
downloadice-92a6531e409f2691d82591e185a92299d415fc0f.tar.bz2
ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.xz
ice-92a6531e409f2691d82591e185a92299d415fc0f.zip
IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp465
1 files changed, 224 insertions, 241 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index 480189c6387..5dcaff63153 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -21,7 +21,7 @@ namespace
{
void
-logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
+logError(const shared_ptr<Ice::Communicator>& com, const IceDB::LMDBException& ex)
{
Ice::Error error(com->getLogger());
error << "LMDB error: " << ex;
@@ -35,28 +35,24 @@ class PublisherI : public Ice::BlobjectArray
{
public:
- PublisherI(const TopicImplPtr& topic, const PersistentInstancePtr& instance) :
- _topic(topic), _instance(instance)
+ PublisherI(shared_ptr<TopicImpl> topic, shared_ptr<PersistentInstance> instance) :
+ _topic(move(topic)), _instance(move(instance))
{
}
- virtual bool
- ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
+ bool
+ ice_invoke(pair<const Ice::Byte*, const Ice::Byte*> inParams,
Ice::ByteSeq&,
- const Ice::Current& current)
+ const Ice::Current& current) override
{
// The publish call does a cached read.
- EventDataPtr event = new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx);
+ EventData event = { current.operation, current.mode, Ice::ByteSeq(), current.ctx };
- //
- // COMPILERBUG: gcc 4.0.1 doesn't like this.
- //
- //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
Ice::ByteSeq data(inParams.first, inParams.second);
- event->data.swap(data);
+ event.data = move(data);
EventDataSeq v;
- v.push_back(event);
+ v.push_back(move(event));
_topic->publish(false, v);
return true;
@@ -64,8 +60,8 @@ public:
private:
- const TopicImplPtr _topic;
- const PersistentInstancePtr _instance;
+ const shared_ptr<TopicImpl> _topic;
+ const shared_ptr<PersistentInstance> _instance;
};
//
@@ -76,66 +72,66 @@ class TopicLinkI : public TopicLink
{
public:
- TopicLinkI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) :
- _impl(impl), _instance(instance)
+ TopicLinkI(shared_ptr<TopicImpl> impl, shared_ptr<PersistentInstance> instance) :
+ _impl(move(impl)), _instance(move(instance))
{
}
- virtual void
- forward(const EventDataSeq& v, const Ice::Current& /*current*/)
+ void
+ forward(EventDataSeq v, const Ice::Current&) override
{
// The publish call does a cached read.
- _impl->publish(true, v);
+ _impl->publish(true, move(v));
}
private:
- const TopicImplPtr _impl;
- const PersistentInstancePtr _instance;
+ const shared_ptr<TopicImpl> _impl;
+ const shared_ptr<PersistentInstance> _instance;
};
class TopicI : public TopicInternal
{
public:
- TopicI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) :
- _impl(impl), _instance(instance)
+ TopicI(shared_ptr<TopicImpl> impl, shared_ptr<PersistentInstance> instance) :
+ _impl(move(impl)), _instance(move(instance))
{
}
- virtual string getName(const Ice::Current&) const
+ string getName(const Ice::Current&) const override
{
// Use cached reads.
CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
return _impl->getName();
}
- virtual Ice::ObjectPrx getPublisher(const Ice::Current&) const
+ shared_ptr<Ice::ObjectPrx> getPublisher(const Ice::Current&) const override
{
// Use cached reads.
CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
return _impl->getPublisher();
}
- virtual Ice::ObjectPrx getNonReplicatedPublisher(const Ice::Current&) const
+ shared_ptr<Ice::ObjectPrx> getNonReplicatedPublisher(const Ice::Current&) const override
{
// Use cached reads.
CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
return _impl->getNonReplicatedPublisher();
}
- virtual Ice::ObjectPrx subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj,
- const Ice::Current& current)
+ shared_ptr<Ice::ObjectPrx> subscribeAndGetPublisher(QoS qos, shared_ptr<Ice::ObjectPrx> obj,
+ const Ice::Current& current) override
{
while(true)
{
Ice::Long generation = -1;
- TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
+ auto master = getMasterFor(current, generation, __FILE__, __LINE__);
if(master)
{
try
{
- return master->subscribeAndGetPublisher(qos, obj);
+ return master->subscribeAndGetPublisher(move(qos), move(obj));
}
catch(const Ice::ConnectFailedException&)
{
@@ -151,22 +147,22 @@ public:
else
{
FinishUpdateHelper unlock(_instance->node());
- return _impl->subscribeAndGetPublisher(qos, obj);
+ return _impl->subscribeAndGetPublisher(move(qos), move(obj));
}
}
}
- virtual void unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current& current)
+ void unsubscribe(shared_ptr<Ice::ObjectPrx> subscriber, const Ice::Current& current) override
{
while(true)
{
Ice::Long generation = -1;
- TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
+ auto master = getMasterFor(current, generation, __FILE__, __LINE__);
if(master)
{
try
{
- master->unsubscribe(subscriber);
+ master->unsubscribe(move(subscriber));
}
catch(const Ice::ConnectFailedException&)
{
@@ -182,22 +178,22 @@ public:
else
{
FinishUpdateHelper unlock(_instance->node());
- _impl->unsubscribe(subscriber);
+ _impl->unsubscribe(move(subscriber));
}
break;
}
}
- virtual TopicLinkPrx getLinkProxy(const Ice::Current&)
+ shared_ptr<TopicLinkPrx> getLinkProxy(const Ice::Current&) override
{
// Use cached reads.
CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
return _impl->getLinkProxy();
}
- virtual void reap(const Ice::IdentitySeq& ids, const Ice::Current& /*current*/)
+ void reap(Ice::IdentitySeq ids, const Ice::Current&) override
{
- NodeIPtr node = _instance->node();
+ auto node = _instance->node();
if(!node->updateMaster(__FILE__, __LINE__))
{
throw ReapWouldBlock();
@@ -206,17 +202,17 @@ public:
_impl->reap(ids);
}
- virtual void link(const TopicPrx& topic, Ice::Int cost, const Ice::Current& current)
+ void link(shared_ptr<TopicPrx> topic, int cost, const Ice::Current& current) override
{
while(true)
{
Ice::Long generation = -1;
- TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
+ auto master = getMasterFor(current, generation, __FILE__, __LINE__);
if(master)
{
try
{
- master->link(topic, cost);
+ master->link(move(topic), cost);
}
catch(const Ice::ConnectFailedException&)
{
@@ -232,23 +228,23 @@ public:
else
{
FinishUpdateHelper unlock(_instance->node());
- _impl->link(topic, cost);
+ _impl->link(move(topic), cost);
}
break;
}
}
- virtual void unlink(const TopicPrx& topic, const Ice::Current& current)
+ void unlink(shared_ptr<TopicPrx> topic, const Ice::Current& current) override
{
while(true)
{
Ice::Long generation = -1;
- TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
+ auto master = getMasterFor(current, generation, __FILE__, __LINE__);
if(master)
{
try
{
- master->unlink(topic);
+ master->unlink(move(topic));
}
catch(const Ice::ConnectFailedException&)
{
@@ -264,30 +260,30 @@ public:
else
{
FinishUpdateHelper unlock(_instance->node());
- _impl->unlink(topic);
+ _impl->unlink(move(topic));
}
break;
}
}
- virtual LinkInfoSeq getLinkInfoSeq(const Ice::Current&) const
+ LinkInfoSeq getLinkInfoSeq(const Ice::Current&) const override
{
// Use cached reads.
CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
return _impl->getLinkInfoSeq();
}
- virtual Ice::IdentitySeq getSubscribers(const Ice::Current&) const
+ Ice::IdentitySeq getSubscribers(const Ice::Current&) const override
{
return _impl->getSubscribers();
}
- virtual void destroy(const Ice::Current& current)
+ void destroy(const Ice::Current& current) override
{
while(true)
{
Ice::Long generation = -1;
- TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
+ auto master = getMasterFor(current, generation, __FILE__, __LINE__);
if(master)
{
try
@@ -316,29 +312,72 @@ public:
private:
- TopicPrx getMasterFor(const Ice::Current& cur, Ice::Long& generation, const char* file, int line) const
+ shared_ptr<TopicPrx> getMasterFor(const Ice::Current& cur, Ice::Long& generation, const char* file, int line) const
{
- NodeIPtr node = _instance->node();
- Ice::ObjectPrx master;
+ auto node = _instance->node();
+ shared_ptr<Ice::ObjectPrx> master;
if(node)
{
master = _instance->node()->startUpdate(generation, file, line);
}
- return (master) ? TopicPrx::uncheckedCast(master->ice_identity(cur.id)) : TopicPrx();
+ return master ? Ice::uncheckedCast<TopicPrx>(master->ice_identity(cur.id)) : nullptr;
}
- const TopicImplPtr _impl;
- const PersistentInstancePtr _instance;
+ const shared_ptr<TopicImpl> _impl;
+ const shared_ptr<PersistentInstance> _instance;
};
}
-TopicImpl::TopicImpl(
- const PersistentInstancePtr& instance,
- const string& name,
- const Ice::Identity& id,
- const SubscriberRecordSeq& subscribers) :
- _instance(instance),
+shared_ptr<TopicImpl>
+TopicImpl::create(shared_ptr<PersistentInstance> instance,
+ const string& name,
+ const Ice::Identity& id,
+ const SubscriberRecordSeq& subscribers)
+{
+ shared_ptr<TopicImpl> topicImpl(new TopicImpl(instance, name, id, subscribers));
+
+ topicImpl->_servant = make_shared<TopicI>(topicImpl, instance);
+ //
+ // Create a servant per topic to receive event data. If the
+ // category is empty then we are in backwards compatibility
+ // mode. In this case the servant's identity is
+ // category=<topicname>, name=publish, otherwise the name is
+ // <instancename>/<topicname>.publish. The same applies to the
+ // link proxy.
+ //
+ // Activate the object and save a reference to give to publishers.
+ //
+ Ice::Identity pubid;
+ Ice::Identity linkid;
+ if(id.category.empty())
+ {
+ pubid.category = name;
+ pubid.name = "publish";
+ linkid.category = name;
+ linkid.name = "link";
+ }
+ else
+ {
+ pubid.category = id.category;
+ pubid.name = name + ".publish";
+ linkid.category = id.category;
+ linkid.name = name + ".link";
+ }
+
+ auto publisher = make_shared<PublisherI>(topicImpl, instance);
+ topicImpl->_publisherPrx = instance->publishAdapter()->add(publisher, pubid);
+ auto topicLink = make_shared<TopicLinkI>(topicImpl, instance);
+ topicImpl->_linkPrx = Ice::uncheckedCast<TopicLinkPrx>(instance->publishAdapter()->add(topicLink, linkid));
+
+ return topicImpl;
+}
+
+TopicImpl::TopicImpl(shared_ptr<PersistentInstance> instance,
+ const string& name,
+ const Ice::Identity& id,
+ const SubscriberRecordSeq& subscribers) :
+ _instance(move(instance)),
_name(name),
_id(id),
_destroyed(false),
@@ -347,57 +386,20 @@ TopicImpl::TopicImpl(
{
try
{
- __setNoDelete(true);
-
- // TODO: If we want to improve the performance of the
- // non-replicated case we could allocate a null-topic impl here.
- _servant = new TopicI(this, instance);
-
- //
- // Create a servant per topic to receive event data. If the
- // category is empty then we are in backwards compatibility
- // mode. In this case the servant's identity is
- // category=<topicname>, name=publish, otherwise the name is
- // <instancename>/<topicname>.publish. The same applies to the
- // link proxy.
- //
- // Activate the object and save a reference to give to publishers.
- //
- Ice::Identity pubid;
- Ice::Identity linkid;
- if(id.category.empty())
- {
- pubid.category = _name;
- pubid.name = "publish";
- linkid.category = _name;
- linkid.name = "link";
- }
- else
- {
- pubid.category = id.category;
- pubid.name = _name + ".publish";
- linkid.category = id.category;
- linkid.name = _name + ".link";
- }
-
- _publisherPrx = _instance->publishAdapter()->add(new PublisherI(this, instance), pubid);
- _linkPrx = TopicLinkPrx::uncheckedCast(
- _instance->publishAdapter()->add(new TopicLinkI(this, instance), linkid));
-
//
// Re-establish subscribers.
//
- for(SubscriberRecordSeq::const_iterator p = subscribers.begin(); p != subscribers.end(); ++p)
+ for(const auto& subscriber : subscribers)
{
- Ice::Identity ident = p->obj->ice_getIdentity();
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ Ice::Identity ident = subscriber.obj->ice_getIdentity();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
out << _name << " recreate " << _instance->communicator()->identityToString(ident);
if(traceLevels->topic > 1)
{
- out << " endpoints: " << IceStormInternal::describeEndpoints(p->obj);
+ out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber.obj);
}
}
@@ -407,8 +409,7 @@ TopicImpl::TopicImpl(
// Create the subscriber object add it to the set of
// subscribers.
//
- SubscriberPtr subscriber = Subscriber::create(_instance, *p);
- _subscribers.push_back(subscriber);
+ _subscribers.push_back(Subscriber::create(_instance, subscriber));
}
catch(const Ice::Exception& ex)
{
@@ -416,7 +417,7 @@ TopicImpl::TopicImpl(
out << _name << " recreate " << _instance->communicator()->identityToString(ident);
if(traceLevels->topic > 1)
{
- out << " endpoints: " << IceStormInternal::describeEndpoints(p->obj);
+ out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber.obj);
}
out << " failed: " << ex;
}
@@ -424,16 +425,14 @@ TopicImpl::TopicImpl(
if(_instance->observer())
{
- _observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, 0));
+ _observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, nullptr));
}
}
- catch(...)
+ catch(const std::exception&)
{
shutdown();
- __setNoDelete(false);
throw;
}
- __setNoDelete(false);
}
string
@@ -443,7 +442,7 @@ TopicImpl::getName() const
return _name;
}
-Ice::ObjectPrx
+shared_ptr<Ice::ObjectPrx>
TopicImpl::getPublisher() const
{
// Immutable
@@ -454,7 +453,7 @@ TopicImpl::getPublisher() const
return _publisherPrx;
}
-Ice::ObjectPrx
+shared_ptr<Ice::ObjectPrx>
TopicImpl::getNonReplicatedPublisher() const
{
// If there is an adapter id configured then we're using icegrid
@@ -472,10 +471,10 @@ TopicImpl::getNonReplicatedPublisher() const
namespace
{
void
-trace(Ice::Trace& out, const PersistentInstancePtr& instance, const vector<SubscriberPtr>& s)
+trace(Ice::Trace& out, const shared_ptr<PersistentInstance>& instance, const vector<shared_ptr<Subscriber>>& s)
{
out << '[';
- for(vector<SubscriberPtr>::const_iterator p = s.begin(); p != s.end(); ++p)
+ for(auto p = s.cbegin(); p != s.cend(); ++p)
{
if(p != s.begin())
{
@@ -487,12 +486,12 @@ trace(Ice::Trace& out, const PersistentInstancePtr& instance, const vector<Subsc
}
}
-Ice::ObjectPrx
-TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
+shared_ptr<Ice::ObjectPrx>
+TopicImpl::subscribeAndGetPublisher(QoS qos, shared_ptr<Ice::ObjectPrx> obj)
{
if(!obj)
{
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -500,9 +499,9 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
}
throw InvalidSubscriber("subscriber is a null proxy");
}
- Ice::Identity id = obj->ice_getIdentity();
+ auto id = obj->ice_getIdentity();
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -525,7 +524,7 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
}
}
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lg(_subscribersMutex);
SubscriberRecord record;
record.id = id;
@@ -535,15 +534,14 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
record.link = false;
record.cost = 0;
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
- if(p != _subscribers.end())
+ if(find(_subscribers.begin(), _subscribers.end(), record.id) != _subscribers.end())
{
throw AlreadySubscribed();
}
LogUpdate llu;
- SubscriberPtr subscriber = Subscriber::create(_instance, record);
+ auto subscriber = Subscriber::create(_instance, record);
try
{
IceDB::ReadWriteTxn txn(_instance->dbEnv());
@@ -572,9 +570,9 @@ TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
}
void
-TopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber)
+TopicImpl::unsubscribe(const shared_ptr<Ice::ObjectPrx>& subscriber)
{
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(!subscriber)
{
if(traceLevels->topic > 0)
@@ -599,31 +597,32 @@ TopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber)
}
}
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lg(_subscribersMutex);
+
Ice::IdentitySeq ids;
ids.push_back(id);
removeSubscribers(ids);
}
-TopicLinkPrx
+shared_ptr<TopicLinkPrx>
TopicImpl::getLinkProxy()
{
// immutable
if(_instance->publisherReplicaProxy())
{
- return TopicLinkPrx::uncheckedCast(_instance->publisherReplicaProxy()->ice_identity(
+ return Ice::uncheckedCast<TopicLinkPrx>(_instance->publisherReplicaProxy()->ice_identity(
_linkPrx->ice_getIdentity()));
}
return _linkPrx;
}
void
-TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
+TopicImpl::link(const shared_ptr<TopicPrx>& topic, int cost)
{
- TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic);
- TopicLinkPrx link = internal->getLinkProxy();
+ auto internal = Ice::uncheckedCast<TopicInternalPrx>(topic);
+ auto link = internal->getLinkProxy();
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -631,7 +630,7 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
<< " cost " << cost;
}
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lg(_subscribersMutex);
Ice::Identity id = topic->ice_getIdentity();
@@ -643,8 +642,7 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
record.link = true;
record.cost = cost;
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
- if(p != _subscribers.end())
+ if(find(_subscribers.begin(), _subscribers.end(), record.id) != _subscribers.end())
{
string name = IceStormInternal::identityToTopicName(id);
throw LinkExists(name);
@@ -652,7 +650,7 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
LogUpdate llu;
- SubscriberPtr subscriber = Subscriber::create(_instance, record);
+ auto subscriber = Subscriber::create(_instance, record);
try
{
@@ -680,9 +678,10 @@ TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
}
void
-TopicImpl::unlink(const TopicPrx& topic)
+TopicImpl::unlink(const shared_ptr<TopicPrx>& topic)
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lg(_subscribersMutex);
+
if(_destroyed)
{
throw Ice::ObjectNotExistException(__FILE__, __LINE__);
@@ -690,11 +689,12 @@ TopicImpl::unlink(const TopicPrx& topic)
Ice::Identity id = topic->ice_getIdentity();
- vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(), id);
- if(p == _subscribers.end())
+ auto traceLevels = _instance->traceLevels();
+
+ if(find(_subscribers.begin(), _subscribers.end(), id) == _subscribers.end())
{
string name = IceStormInternal::identityToTopicName(id);
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -704,7 +704,6 @@ TopicImpl::unlink(const TopicPrx& topic)
throw NoSuchLink(name);
}
- TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -719,9 +718,9 @@ TopicImpl::unlink(const TopicPrx& topic)
void
TopicImpl::reap(const Ice::IdentitySeq& ids)
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lg(_subscribersMutex);
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -742,13 +741,14 @@ TopicImpl::reap(const Ice::IdentitySeq& ids)
void
TopicImpl::shutdown()
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lg(_subscribersMutex);
+
_servant = 0;
// Shutdown each subscriber. This waits for the event queues to drain.
- for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ for(const auto& subscriber : _subscribers)
{
- (*p)->shutdown();
+ subscriber->shutdown();
}
_observer.detach();
@@ -757,13 +757,13 @@ TopicImpl::shutdown()
LinkInfoSeq
TopicImpl::getLinkInfoSeq() const
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lg(_subscribersMutex);
LinkInfoSeq seq;
- for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ for(const auto& subscriber : _subscribers)
{
- SubscriberRecord record = (*p)->record();
- if(record.link && !(*p)->errored())
+ SubscriberRecord record = subscriber->record();
+ if(record.link && !subscriber->errored())
{
LinkInfo info;
info.name = IceStormInternal::identityToTopicName(record.theTopic->ice_getIdentity());
@@ -778,12 +778,12 @@ TopicImpl::getLinkInfoSeq() const
Ice::IdentitySeq
TopicImpl::getSubscribers() const
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lg(_subscribersMutex);
Ice::IdentitySeq subscribers;
- for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ for(const auto& subscriber : _subscribers)
{
- subscribers.push_back((*p)->id());
+ subscribers.push_back(subscriber->id());
}
return subscribers;
}
@@ -791,7 +791,7 @@ TopicImpl::getSubscribers() const
void
TopicImpl::destroy()
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lg(_subscribersMutex);
if(_destroyed)
{
@@ -799,7 +799,7 @@ TopicImpl::destroy()
}
_destroyed = true;
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -807,7 +807,7 @@ TopicImpl::destroy()
}
// destroyInternal clears out the topic content.
- LogUpdate llu = {0,0};
+ LogUpdate llu = { 0,0 };
_instance->observers()->destroyTopic(destroyInternal(llu, true), _name);
_observer.detach();
@@ -816,18 +816,18 @@ TopicImpl::destroy()
TopicContent
TopicImpl::getContent() const
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lg(_subscribersMutex);
TopicContent content;
content.id = _id;
- for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ for(const auto& subscriber : _subscribers)
{
// Don't return errored subscribers (subscribers that have
// errored out, but not reaped due to a failure with the
// master). This means we can avoid the reaping step later.
- if(!(*p)->errored())
+ if(!subscriber->errored())
{
- content.records.push_back((*p)->record());
+ content.records.push_back(subscriber->record());
}
}
return content;
@@ -836,7 +836,7 @@ TopicImpl::getContent() const
void
TopicImpl::update(const SubscriberRecordSeq& records)
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lg(_subscribersMutex);
// We do this with two scans. The first runs through the subscribers
// that we have and removes those not in the init list. The second
@@ -844,7 +844,7 @@ TopicImpl::update(const SubscriberRecordSeq& records)
// exist.
{
- vector<SubscriberPtr>::iterator p = _subscribers.begin();
+ auto p = _subscribers.begin();
while(p != _subscribers.end())
{
SubscriberRecordSeq::const_iterator q;
@@ -871,19 +871,19 @@ TopicImpl::update(const SubscriberRecordSeq& records)
}
}
- for(SubscriberRecordSeq::const_iterator p = records.begin(); p != records.end(); ++p)
+ for(const auto& record : records)
{
- vector<SubscriberPtr>::iterator q;
+ vector<shared_ptr<Subscriber>>::iterator q;
for(q = _subscribers.begin(); q != _subscribers.end(); ++q)
{
- if((*q)->id() == p->id)
+ if((*q)->id() == record.id)
{
break;
}
}
if(q == _subscribers.end())
{
- SubscriberPtr subscriber = Subscriber::create(_instance, *p);
+ auto subscriber = Subscriber::create(_instance, record);
_subscribers.push_back(subscriber);
}
}
@@ -892,7 +892,8 @@ TopicImpl::update(const SubscriberRecordSeq& records)
bool
TopicImpl::destroyed() const
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lg(_subscribersMutex);
+
return _destroyed;
}
@@ -903,11 +904,11 @@ TopicImpl::id() const
return _id;
}
-TopicPrx
+shared_ptr<TopicPrx>
TopicImpl::proxy() const
{
// immutable
- Ice::ObjectPrx prx;
+ shared_ptr<Ice::ObjectPrx> prx;
if(_instance->topicReplicaProxy())
{
prx = _instance->topicReplicaProxy()->ice_identity(_id);
@@ -916,44 +917,13 @@ TopicImpl::proxy() const
{
prx = _instance->topicAdapter()->createProxy(_id);
}
- return TopicPrx::uncheckedCast(prx);
-}
-
-namespace
-{
-
-class TopicInternalReapCB : public IceUtil::Shared
-{
-public:
-
- TopicInternalReapCB(const PersistentInstancePtr& instance, Ice::Long generation) :
- _instance(instance), _generation(generation)
- {
- }
-
- virtual void exception(const Ice::Exception& ex)
- {
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "exception when calling `reap' on the master replica: " << ex;
- }
- _instance->node()->recovery(_generation);
- }
-
-private:
-
- const PersistentInstancePtr _instance;
- const Ice::Long _generation;
-};
-
+ return Ice::uncheckedCast<TopicPrx>(prx);
}
void
TopicImpl::publish(bool forwarded, const EventDataSeq& events)
{
- TopicInternalPrx masterInternal;
+ shared_ptr<TopicInternalPrx> masterInternal;
Ice::Long generation = -1;
Ice::IdentitySeq reap;
{
@@ -964,9 +934,10 @@ TopicImpl::publish(bool forwarded, const EventDataSeq& events)
// Copy of the subscriber list so that event publishing can occur
// in parallel.
//
- vector<SubscriberPtr> copy;
+ vector<shared_ptr<Subscriber>> copy;
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lock(_subscribersMutex);
+
if(_observer)
{
if(forwarded)
@@ -985,11 +956,11 @@ TopicImpl::publish(bool forwarded, const EventDataSeq& events)
// Queue each event, gathering a list of those subscribers that
// must be reaped.
//
- for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
+ for(const auto& subscriber : copy)
{
- if(!(*p)->queue(forwarded, events) && (*p)->reap())
+ if(!subscriber->queue(forwarded, events) && subscriber->reap())
{
- reap.push_back((*p)->id());
+ reap.push_back(subscriber->id());
}
}
@@ -1000,11 +971,11 @@ TopicImpl::publish(bool forwarded, const EventDataSeq& events)
}
if(!unlock.getMaster())
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lock(_subscribersMutex);
removeSubscribers(reap);
return;
}
- masterInternal = TopicInternalPrx::uncheckedCast(unlock.getMaster()->ice_identity(_id));
+ masterInternal = Ice::uncheckedCast<TopicInternalPrx>(unlock.getMaster()->ice_identity(_id));
generation = unlock.generation();
}
@@ -1017,16 +988,32 @@ TopicImpl::publish(bool forwarded, const EventDataSeq& events)
// call may raise an exception in the caller (that is directly
// call ice_exception) which calls recover() on the node which
// would result in a deadlock since the node is locked.
- masterInternal->begin_reap(reap, newCallback_TopicInternal_reap(new TopicInternalReapCB(_instance, generation),
- &TopicInternalReapCB::exception));
+
+ masterInternal->reapAsync(reap, nullptr, [instance = _instance, generation](exception_ptr ex)
+ {
+ auto traceLevels = instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ try
+ {
+ rethrow_exception(ex);
+ }
+ catch(const std::exception& e)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "exception when calling `reap' on the master replica: " << e;
+ }
+ }
+ instance->node()->recovery(generation);
+ });
}
void
TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& record)
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lock(_subscribersMutex);
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -1048,8 +1035,7 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r
out << " llu: " << llu.generation << "/" << llu.iteration;
}
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
- if(p != _subscribers.end())
+ if(find(_subscribers.begin(), _subscribers.end(), record.id) != _subscribers.end())
{
// If the subscriber is already in the database display a
// diagnostic.
@@ -1061,7 +1047,7 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r
return;
}
- SubscriberPtr subscriber = Subscriber::create(_instance, record);
+ auto subscriber = Subscriber::create(_instance, record);
try
{
IceDB::ReadWriteTxn txn(_instance->dbEnv());
@@ -1089,7 +1075,7 @@ TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& r
void
TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq& ids)
{
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -1105,19 +1091,16 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq
out << " llu: " << llu.generation << "/" << llu.iteration;
}
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lock(_subscribersMutex);
// First remove from the database.
try
{
IceDB::ReadWriteTxn txn(_instance->dbEnv());
- for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
+ for(const auto& id : ids)
{
- SubscriberRecordKey key;
- key.topic = _id;
- key.id = *id;
-
+ SubscriberRecordKey key = { _id, id };
_subscriberMap.del(txn, key);
}
@@ -1134,9 +1117,9 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq
// Then remove the subscriber from the subscribers list. If the
// subscriber had a local failure and was removed from the
// subscriber list it could already be gone. That's not a problem.
- for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
+ for(const auto& id : ids)
{
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
+ auto p = find(_subscribers.begin(), _subscribers.end(), id);
if(p != _subscribers.end())
{
(*p)->destroy();
@@ -1148,7 +1131,7 @@ TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq
void
TopicImpl::observerDestroyTopic(const LogUpdate& llu)
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lock(_subscribersMutex);
if(_destroyed)
{
@@ -1156,7 +1139,7 @@ TopicImpl::observerDestroyTopic(const LogUpdate& llu)
}
_destroyed = true;
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -1166,7 +1149,7 @@ TopicImpl::observerDestroyTopic(const LogUpdate& llu)
destroyInternal(llu, false);
}
-Ice::ObjectPtr
+shared_ptr<Ice::Object>
TopicImpl::getServant() const
{
return _servant;
@@ -1175,7 +1158,8 @@ TopicImpl::getServant() const
void
TopicImpl::updateObserver()
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
+ lock_guard<mutex> lock(_subscribersMutex);
+
if(_instance->observer())
{
_observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, _observer.get()));
@@ -1185,10 +1169,11 @@ TopicImpl::updateObserver()
void
TopicImpl::updateSubscriberObservers()
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
- for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ lock_guard<mutex> lock(_subscribersMutex);
+
+ for(const auto& subscriber : _subscribers)
{
- (*p)->updateObserver();
+ subscriber->updateObserver();
}
}
@@ -1243,9 +1228,9 @@ TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
_instance->topicReaper()->add(_name);
// Destroy each of the subscribers.
- for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ for(const auto& subscriber : _subscribers)
{
- (*p)->destroy();
+ subscriber->destroy();
}
_subscribers.clear();
@@ -1267,11 +1252,9 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
{
IceDB::ReadWriteTxn txn(_instance->dbEnv());
- for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
+ for(const auto& id : ids)
{
- SubscriberRecordKey key;
- key.topic = _id;
- key.id = *id;
+ SubscriberRecordKey key = { _id, id };
if(_subscriberMap.del(txn, key))
{
@@ -1305,7 +1288,7 @@ TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
// removed.
for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
{
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
+ auto p = find(_subscribers.begin(), _subscribers.end(), *id);
if(p != _subscribers.end())
{
(*p)->destroy();