summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp484
1 files changed, 242 insertions, 242 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index a88bc1c1744..1831a746ac5 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -35,33 +35,33 @@ class PublisherI : public Ice::BlobjectArray
public:
PublisherI(const TopicIPtr& topic) :
- _topic(topic)
+ _topic(topic)
{
}
virtual bool
ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
- Ice::ByteSeq&,
- const Ice::Current& current)
+ Ice::ByteSeq&,
+ const Ice::Current& current)
{
- EventDataPtr event = new EventData(
- current.operation,
- current.mode,
- Ice::ByteSeq(),
- current.ctx);
+ EventDataPtr event = new EventData(
+ current.operation,
+ current.mode,
+ Ice::ByteSeq(),
+ current.ctx);
- //
- // COMPILERBUG: gcc 4.0.1 doesn't like this.
- //
- //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
- Ice::ByteSeq data(inParams.first, inParams.second);
- event->data.swap(data);
-
- EventDataSeq v;
- v.push_back(event);
- _topic->publish(false, v);
+ //
+ // COMPILERBUG: gcc 4.0.1 doesn't like this.
+ //
+ //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
+ Ice::ByteSeq data(inParams.first, inParams.second);
+ event->data.swap(data);
+
+ EventDataSeq v;
+ v.push_back(event);
+ _topic->publish(false, v);
- return true;
+ return true;
}
private:
@@ -78,14 +78,14 @@ class TopicLinkI : public TopicLink
public:
TopicLinkI(const TopicIPtr& topic) :
- _topic(topic)
+ _topic(topic)
{
}
virtual void
forward(const EventDataSeq& v, const Ice::Current& current)
{
- _topic->publish(true, v);
+ _topic->publish(true, v);
}
private:
@@ -128,17 +128,17 @@ TopicI::TopicI(
Ice::Identity linkid;
if(id.category.empty())
{
- pubid.category = _name;
- pubid.name = "publish";
- linkid.category = _name;
- linkid.name = "link";
+ pubid.category = _name;
+ pubid.name = "publish";
+ linkid.category = _name;
+ linkid.name = "link";
}
else
{
- pubid.category = id.category;
- pubid.name = _name + ".publish";
- linkid.category = id.category;
- linkid.name = _name + ".link";
+ pubid.category = id.category;
+ pubid.name = _name + ".publish";
+ linkid.category = id.category;
+ linkid.name = _name + ".link";
}
_publisherPrx = _instance->objectAdapter()->add(new PublisherI(this), pubid);
@@ -149,20 +149,20 @@ TopicI::TopicI(
//
for(LinkRecordSeq::const_iterator p = _topicRecord.begin(); p != _topicRecord.end(); ++p)
{
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " relink " << _instance->communicator()->identityToString(p->theTopic->ice_getIdentity());
- }
-
- //
- // Create the subscriber object add it to the set of
- // subscribers.
- //
- SubscriberPtr subscriber = Subscriber::create(_instance, p->obj, p->cost);
- _subscribers.push_back(subscriber);
- _instance->subscriberPool()->add(subscriber);
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << " relink " << _instance->communicator()->identityToString(p->theTopic->ice_getIdentity());
+ }
+
+ //
+ // Create the subscriber object add it to the set of
+ // subscribers.
+ //
+ SubscriberPtr subscriber = Subscriber::create(_instance, p->obj, p->cost);
+ _subscribers.push_back(subscriber);
+ _instance->subscriberPool()->add(subscriber);
}
}
@@ -193,11 +193,11 @@ find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end,
{
while(start != end)
{
- if(*start == ident)
- {
- return start;
- }
- ++start;
+ if(*start == ident)
+ {
+ return start;
+ }
+ ++start;
}
return end;
}
@@ -211,74 +211,74 @@ TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Curr
QoS qos = origQoS;
if(traceLevels->topic > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "Subscribe: " << _instance->communicator()->identityToString(id);
- if(traceLevels->topic > 1)
- {
- out << " QoS: ";
- for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
- {
- if(p != qos.begin())
- {
- out << ',';
- }
- out << '[' << p->first << "," << p->second << ']';
- }
- }
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "Subscribe: " << _instance->communicator()->identityToString(id);
+ if(traceLevels->topic > 1)
+ {
+ out << " QoS: ";
+ for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
+ {
+ if(p != qos.begin())
+ {
+ out << ',';
+ }
+ out << '[' << p->first << "," << p->second << ']';
+ }
+ }
}
string reliability = "oneway";
{
- QoS::iterator p = qos.find("reliability");
- if(p != qos.end())
- {
- reliability = p->second;
- qos.erase(p);
- }
+ QoS::iterator p = qos.find("reliability");
+ if(p != qos.end())
+ {
+ reliability = p->second;
+ qos.erase(p);
+ }
}
Ice::ObjectPrx newObj = obj;
if(reliability == "batch")
{
- if(newObj->ice_isDatagram())
- {
- newObj = newObj->ice_batchDatagram();
- }
- else
- {
- newObj = newObj->ice_batchOneway();
- }
+ if(newObj->ice_isDatagram())
+ {
+ newObj = newObj->ice_batchDatagram();
+ }
+ else
+ {
+ newObj = newObj->ice_batchOneway();
+ }
}
else if(reliability == "twoway")
{
- newObj = newObj->ice_twoway();
+ newObj = newObj->ice_twoway();
}
else if(reliability == "twoway ordered")
{
- qos["reliability"] = "ordered";
- newObj = newObj->ice_twoway();
+ qos["reliability"] = "ordered";
+ newObj = newObj->ice_twoway();
}
else // reliability == "oneway"
{
- if(reliability != "oneway" && traceLevels->subscriber > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << reliability <<" mode not understood.";
- }
- if(!newObj->ice_isDatagram())
- {
- newObj = newObj->ice_oneway();
- }
+ if(reliability != "oneway" && traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << reliability <<" mode not understood.";
+ }
+ if(!newObj->ice_isDatagram())
+ {
+ newObj = newObj->ice_oneway();
+ }
}
IceUtil::Mutex::Lock sync(_subscribersMutex);
vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
if(p != _subscribers.end())
{
- (*p)->destroy();
- _instance->subscriberPool()->remove(*p);
- _subscribers.erase(p);
+ (*p)->destroy();
+ _instance->subscriberPool()->remove(*p);
+ _subscribers.erase(p);
}
SubscriberPtr subscriber = Subscriber::create(_instance, newObj, qos);
@@ -293,27 +293,27 @@ TopicI::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, cons
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "Subscribe: " << _instance->communicator()->identityToString(id);
- if(traceLevels->topic > 1)
- {
- out << " QoS: ";
- for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
- {
- if(p != qos.begin())
- {
- out << ',';
- }
- out << '[' << p->first << "," << p->second << ']';
- }
- }
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "Subscribe: " << _instance->communicator()->identityToString(id);
+ if(traceLevels->topic > 1)
+ {
+ out << " QoS: ";
+ for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
+ {
+ if(p != qos.begin())
+ {
+ out << ',';
+ }
+ out << '[' << p->first << "," << p->second << ']';
+ }
+ }
}
IceUtil::Mutex::Lock sync(_subscribersMutex);
vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
if(p != _subscribers.end())
{
- throw AlreadySubscribed();
+ throw AlreadySubscribed();
}
SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos);
@@ -329,20 +329,20 @@ TopicI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&)
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(!subscriber)
{
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "unsubscribe with null subscriber.";
- }
- return;
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "unsubscribe with null subscriber.";
+ }
+ return;
}
Ice::Identity id = subscriber->ice_getIdentity();
if(traceLevels->topic > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "Unsubscribe: " << _instance->communicator()->identityToString(id);
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "Unsubscribe: " << _instance->communicator()->identityToString(id);
}
//
@@ -367,7 +367,7 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
IceUtil::RecMutex::Lock topicSync(_topicRecordMutex);
if(_destroyed)
{
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
reap();
@@ -379,20 +379,20 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
// link.
for(LinkRecordSeq::const_iterator p = _topicRecord.begin(); p != _topicRecord.end(); ++p)
{
- if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity())
- {
- LinkExists ex;
- ex.name = name;
- throw ex;
- }
+ if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity())
+ {
+ LinkExists ex;
+ ex.name = name;
+ throw ex;
+ }
}
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " link " << _instance->communicator()->identityToString(id)
- << " cost " << cost;
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << " link " << _instance->communicator()->identityToString(id)
+ << " cost " << cost;
}
SubscriberPtr subscriber = Subscriber::create(_instance, link, cost);
@@ -422,7 +422,7 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current)
IceUtil::RecMutex::Lock topicSync(_topicRecordMutex);
if(_destroyed)
{
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
reap();
@@ -433,24 +433,24 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current)
LinkRecordSeq::iterator p = _topicRecord.begin();
while(p != _topicRecord.end())
{
- if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity())
- {
- break;
- }
- ++p;
+ if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity())
+ {
+ break;
+ }
+ ++p;
}
if(p == _topicRecord.end())
{
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " unlink " << name << " failed - not linked";
- }
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << " unlink " << name << " failed - not linked";
+ }
- NoSuchLink ex;
- ex.name = name;
- throw ex;
+ NoSuchLink ex;
+ ex.name = name;
+ throw ex;
}
Ice::ObjectPrx subscriber = p->obj;
@@ -464,8 +464,8 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current)
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " unlink " << _instance->communicator()->identityToString(id);
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << " unlink " << _instance->communicator()->identityToString(id);
}
removeSubscriber(subscriber);
}
@@ -481,11 +481,11 @@ TopicI::getLinkInfoSeq(const Ice::Current&) const
for(LinkRecordSeq::const_iterator q = _topicRecord.begin(); q != _topicRecord.end(); ++q)
{
- LinkInfo info;
- info.name = identityToTopicName(q->theTopic->ice_getIdentity());
- info.cost = q->cost;
- info.theTopic = q->theTopic;
- seq.push_back(info);
+ LinkInfo info;
+ info.name = identityToTopicName(q->theTopic->ice_getIdentity());
+ info.cost = q->cost;
+ info.theTopic = q->theTopic;
+ seq.push_back(info);
}
return seq;
@@ -498,18 +498,18 @@ TopicI::destroy(const Ice::Current&)
if(_destroyed)
{
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
_destroyed = true;
try
{
- _instance->objectAdapter()->remove(_linkPrx->ice_getIdentity());
- _instance->objectAdapter()->remove(_publisherPrx->ice_getIdentity());
+ _instance->objectAdapter()->remove(_linkPrx->ice_getIdentity());
+ _instance->objectAdapter()->remove(_publisherPrx->ice_getIdentity());
}
catch(const Ice::ObjectAdapterDeactivatedException&)
{
- // Ignore -- this could occur on shutdown.
+ // Ignore -- this could occur on shutdown.
}
}
@@ -533,7 +533,7 @@ TopicI::reap()
IceUtil::RecMutex::Lock topicSync(_topicRecordMutex);
if(_destroyed)
{
- return;
+ return;
}
bool updated = false;
@@ -543,46 +543,46 @@ TopicI::reap()
//
list<SubscriberPtr> error;
{
- IceUtil::Mutex::Lock errorSync(_errorMutex);
- _error.swap(error);
+ IceUtil::Mutex::Lock errorSync(_errorMutex);
+ _error.swap(error);
}
TraceLevelsPtr traceLevels = _instance->traceLevels();
for(list<SubscriberPtr>::const_iterator p = error.begin(); p != error.end(); ++p)
{
- SubscriberPtr subscriber = *p;
- assert(subscriber->persistent()); // Only persistent subscribers need to be reaped.
-
- bool found = false;
- //
- // If this turns out to be a performance problem then we
- // can create an in memory map cache.
- //
- LinkRecordSeq::iterator q = _topicRecord.begin();
- while(q != _topicRecord.end())
- {
- if(q->obj->ice_getIdentity() == subscriber->id())
- {
- _topicRecord.erase(q);
- updated = true;
- found = true;
- break;
- }
- ++q;
- }
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "reaping " << _instance->communicator()->identityToString(subscriber->id());
- if(!found)
- {
- out << ": failed - not in database";
- }
- }
+ SubscriberPtr subscriber = *p;
+ assert(subscriber->persistent()); // Only persistent subscribers need to be reaped.
+
+ bool found = false;
+ //
+ // If this turns out to be a performance problem then we
+ // can create an in memory map cache.
+ //
+ LinkRecordSeq::iterator q = _topicRecord.begin();
+ while(q != _topicRecord.end())
+ {
+ if(q->obj->ice_getIdentity() == subscriber->id())
+ {
+ _topicRecord.erase(q);
+ updated = true;
+ found = true;
+ break;
+ }
+ ++q;
+ }
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "reaping " << _instance->communicator()->identityToString(subscriber->id());
+ if(!found)
+ {
+ out << ": failed - not in database";
+ }
+ }
}
if(updated)
{
- _topics.put(PersistentTopicMap::value_type(_id, _topicRecord));
+ _topics.put(PersistentTopicMap::value_type(_id, _topicRecord));
}
}
@@ -595,8 +595,8 @@ TopicI::publish(bool forwarded, const EventDataSeq& events)
//
vector<SubscriberPtr> copy;
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
- copy = _subscribers;
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+ copy = _subscribers;
}
//
@@ -608,18 +608,18 @@ TopicI::publish(bool forwarded, const EventDataSeq& events)
list<SubscriberPtr> flush;
for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
{
- Subscriber::QueueState state = (*p)->queue(forwarded, events);
- switch(state)
- {
- case Subscriber::QueueStateError:
- e.push_back((*p)->id());
- break;
- case Subscriber::QueueStateFlush:
- flush.push_back(*p);
- break;
- case Subscriber::QueueStateNoFlush:
- break;
- }
+ Subscriber::QueueState state = (*p)->queue(forwarded, events);
+ switch(state)
+ {
+ case Subscriber::QueueStateError:
+ e.push_back((*p)->id());
+ break;
+ case Subscriber::QueueStateFlush:
+ flush.push_back(*p);
+ break;
+ case Subscriber::QueueStateNoFlush:
+ break;
+ }
}
//
@@ -627,7 +627,7 @@ TopicI::publish(bool forwarded, const EventDataSeq& events)
//
if(!flush.empty())
{
- _instance->subscriberPool()->flush(flush);
+ _instance->subscriberPool()->flush(flush);
}
//
@@ -637,48 +637,48 @@ TopicI::publish(bool forwarded, const EventDataSeq& events)
list<SubscriberPtr> reap;
if(!e.empty())
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
- for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep)
- {
- //
- // Its possible for the subscriber to already have been
- // removed since the copy is iterated over outside of
- // mutex protection.
- //
- // Note that although this could be quicker if we used a
- // map, the most optimal case should be pushing around
- // events not searching for a particular subscriber.
- //
- // The subscriber is immediately destroyed & removed from
- // the _subscribers list. If the subscriber is persistent
- // its added to an list of error'd subscribers and removed
- // from the database on the next reap.
- //
- vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep);
- if(q != _subscribers.end())
- {
- //
- // Destroy the subscriber in any case.
- //
- (*q)->destroy();
- if((*q)->persistent())
- {
- reap.push_back(*q);
- }
- _instance->subscriberPool()->remove(*q);
- _subscribers.erase(q);
- }
- }
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+ for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep)
+ {
+ //
+ // Its possible for the subscriber to already have been
+ // removed since the copy is iterated over outside of
+ // mutex protection.
+ //
+ // Note that although this could be quicker if we used a
+ // map, the most optimal case should be pushing around
+ // events not searching for a particular subscriber.
+ //
+ // The subscriber is immediately destroyed & removed from
+ // the _subscribers list. If the subscriber is persistent
+ // its added to an list of error'd subscribers and removed
+ // from the database on the next reap.
+ //
+ vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep);
+ if(q != _subscribers.end())
+ {
+ //
+ // Destroy the subscriber in any case.
+ //
+ (*q)->destroy();
+ if((*q)->persistent())
+ {
+ reap.push_back(*q);
+ }
+ _instance->subscriberPool()->remove(*q);
+ _subscribers.erase(q);
+ }
+ }
}
if(!reap.empty())
{
- //
- // This is why _error is a list, so we can splice on the
- // reaped subscribers.
- //
- IceUtil::Mutex::Lock errorSync(_errorMutex);
- _error.splice(_error.begin(), reap);
+ //
+ // This is why _error is a list, so we can splice on the
+ // reaped subscribers.
+ //
+ IceUtil::Mutex::Lock errorSync(_errorMutex);
+ _error.splice(_error.begin(), reap);
}
}
@@ -691,10 +691,10 @@ TopicI::removeSubscriber(const Ice::ObjectPrx& obj)
vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
if(p != _subscribers.end())
{
- (*p)->destroy();
- _instance->subscriberPool()->remove(*p);
- _subscribers.erase(p);
- return;
+ (*p)->destroy();
+ _instance->subscriberPool()->remove(*p);
+ _subscribers.erase(p);
+ return;
}
//
@@ -703,7 +703,7 @@ TopicI::removeSubscriber(const Ice::ObjectPrx& obj)
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _instance->communicator()->identityToString(id) << ": not subscribed.";
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _instance->communicator()->identityToString(id) << ": not subscribed.";
}
}