diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 484 |
1 files changed, 242 insertions, 242 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index a88bc1c1744..1831a746ac5 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -35,33 +35,33 @@ class PublisherI : public Ice::BlobjectArray public: PublisherI(const TopicIPtr& topic) : - _topic(topic) + _topic(topic) { } virtual bool ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams, - Ice::ByteSeq&, - const Ice::Current& current) + Ice::ByteSeq&, + const Ice::Current& current) { - EventDataPtr event = new EventData( - current.operation, - current.mode, - Ice::ByteSeq(), - current.ctx); + EventDataPtr event = new EventData( + current.operation, + current.mode, + Ice::ByteSeq(), + current.ctx); - // - // COMPILERBUG: gcc 4.0.1 doesn't like this. - // - //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); - Ice::ByteSeq data(inParams.first, inParams.second); - event->data.swap(data); - - EventDataSeq v; - v.push_back(event); - _topic->publish(false, v); + // + // COMPILERBUG: gcc 4.0.1 doesn't like this. + // + //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + Ice::ByteSeq data(inParams.first, inParams.second); + event->data.swap(data); + + EventDataSeq v; + v.push_back(event); + _topic->publish(false, v); - return true; + return true; } private: @@ -78,14 +78,14 @@ class TopicLinkI : public TopicLink public: TopicLinkI(const TopicIPtr& topic) : - _topic(topic) + _topic(topic) { } virtual void forward(const EventDataSeq& v, const Ice::Current& current) { - _topic->publish(true, v); + _topic->publish(true, v); } private: @@ -128,17 +128,17 @@ TopicI::TopicI( Ice::Identity linkid; if(id.category.empty()) { - pubid.category = _name; - pubid.name = "publish"; - linkid.category = _name; - linkid.name = "link"; + pubid.category = _name; + pubid.name = "publish"; + linkid.category = _name; + linkid.name = "link"; } else { - pubid.category = id.category; - pubid.name = _name + ".publish"; - linkid.category = id.category; - linkid.name = _name + ".link"; + pubid.category = id.category; + pubid.name = _name + ".publish"; + linkid.category = id.category; + linkid.name = _name + ".link"; } _publisherPrx = _instance->objectAdapter()->add(new PublisherI(this), pubid); @@ -149,20 +149,20 @@ TopicI::TopicI( // for(LinkRecordSeq::const_iterator p = _topicRecord.begin(); p != _topicRecord.end(); ++p) { - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " relink " << _instance->communicator()->identityToString(p->theTopic->ice_getIdentity()); - } - - // - // Create the subscriber object add it to the set of - // subscribers. - // - SubscriberPtr subscriber = Subscriber::create(_instance, p->obj, p->cost); - _subscribers.push_back(subscriber); - _instance->subscriberPool()->add(subscriber); + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << " relink " << _instance->communicator()->identityToString(p->theTopic->ice_getIdentity()); + } + + // + // Create the subscriber object add it to the set of + // subscribers. + // + SubscriberPtr subscriber = Subscriber::create(_instance, p->obj, p->cost); + _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); } } @@ -193,11 +193,11 @@ find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end, { while(start != end) { - if(*start == ident) - { - return start; - } - ++start; + if(*start == ident) + { + return start; + } + ++start; } return end; } @@ -211,74 +211,74 @@ TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Curr QoS qos = origQoS; if(traceLevels->topic > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "Subscribe: " << _instance->communicator()->identityToString(id); - if(traceLevels->topic > 1) - { - out << " QoS: "; - for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p) - { - if(p != qos.begin()) - { - out << ','; - } - out << '[' << p->first << "," << p->second << ']'; - } - } + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "Subscribe: " << _instance->communicator()->identityToString(id); + if(traceLevels->topic > 1) + { + out << " QoS: "; + for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p) + { + if(p != qos.begin()) + { + out << ','; + } + out << '[' << p->first << "," << p->second << ']'; + } + } } string reliability = "oneway"; { - QoS::iterator p = qos.find("reliability"); - if(p != qos.end()) - { - reliability = p->second; - qos.erase(p); - } + QoS::iterator p = qos.find("reliability"); + if(p != qos.end()) + { + reliability = p->second; + qos.erase(p); + } } Ice::ObjectPrx newObj = obj; if(reliability == "batch") { - if(newObj->ice_isDatagram()) - { - newObj = newObj->ice_batchDatagram(); - } - else - { - newObj = newObj->ice_batchOneway(); - } + if(newObj->ice_isDatagram()) + { + newObj = newObj->ice_batchDatagram(); + } + else + { + newObj = newObj->ice_batchOneway(); + } } else if(reliability == "twoway") { - newObj = newObj->ice_twoway(); + newObj = newObj->ice_twoway(); } else if(reliability == "twoway ordered") { - qos["reliability"] = "ordered"; - newObj = newObj->ice_twoway(); + qos["reliability"] = "ordered"; + newObj = newObj->ice_twoway(); } else // reliability == "oneway" { - if(reliability != "oneway" && traceLevels->subscriber > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << reliability <<" mode not understood."; - } - if(!newObj->ice_isDatagram()) - { - newObj = newObj->ice_oneway(); - } + if(reliability != "oneway" && traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << reliability <<" mode not understood."; + } + if(!newObj->ice_isDatagram()) + { + newObj = newObj->ice_oneway(); + } } IceUtil::Mutex::Lock sync(_subscribersMutex); vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); if(p != _subscribers.end()) { - (*p)->destroy(); - _instance->subscriberPool()->remove(*p); - _subscribers.erase(p); + (*p)->destroy(); + _instance->subscriberPool()->remove(*p); + _subscribers.erase(p); } SubscriberPtr subscriber = Subscriber::create(_instance, newObj, qos); @@ -293,27 +293,27 @@ TopicI::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, cons TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "Subscribe: " << _instance->communicator()->identityToString(id); - if(traceLevels->topic > 1) - { - out << " QoS: "; - for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p) - { - if(p != qos.begin()) - { - out << ','; - } - out << '[' << p->first << "," << p->second << ']'; - } - } + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "Subscribe: " << _instance->communicator()->identityToString(id); + if(traceLevels->topic > 1) + { + out << " QoS: "; + for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p) + { + if(p != qos.begin()) + { + out << ','; + } + out << '[' << p->first << "," << p->second << ']'; + } + } } IceUtil::Mutex::Lock sync(_subscribersMutex); vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); if(p != _subscribers.end()) { - throw AlreadySubscribed(); + throw AlreadySubscribed(); } SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos); @@ -329,20 +329,20 @@ TopicI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&) TraceLevelsPtr traceLevels = _instance->traceLevels(); if(!subscriber) { - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "unsubscribe with null subscriber."; - } - return; + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "unsubscribe with null subscriber."; + } + return; } Ice::Identity id = subscriber->ice_getIdentity(); if(traceLevels->topic > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "Unsubscribe: " << _instance->communicator()->identityToString(id); + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "Unsubscribe: " << _instance->communicator()->identityToString(id); } // @@ -367,7 +367,7 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) IceUtil::RecMutex::Lock topicSync(_topicRecordMutex); if(_destroyed) { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); + throw Ice::ObjectNotExistException(__FILE__, __LINE__); } reap(); @@ -379,20 +379,20 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) // link. for(LinkRecordSeq::const_iterator p = _topicRecord.begin(); p != _topicRecord.end(); ++p) { - if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity()) - { - LinkExists ex; - ex.name = name; - throw ex; - } + if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity()) + { + LinkExists ex; + ex.name = name; + throw ex; + } } TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " link " << _instance->communicator()->identityToString(id) - << " cost " << cost; + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << " link " << _instance->communicator()->identityToString(id) + << " cost " << cost; } SubscriberPtr subscriber = Subscriber::create(_instance, link, cost); @@ -422,7 +422,7 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current) IceUtil::RecMutex::Lock topicSync(_topicRecordMutex); if(_destroyed) { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); + throw Ice::ObjectNotExistException(__FILE__, __LINE__); } reap(); @@ -433,24 +433,24 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current) LinkRecordSeq::iterator p = _topicRecord.begin(); while(p != _topicRecord.end()) { - if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity()) - { - break; - } - ++p; + if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity()) + { + break; + } + ++p; } if(p == _topicRecord.end()) { - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " unlink " << name << " failed - not linked"; - } + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << " unlink " << name << " failed - not linked"; + } - NoSuchLink ex; - ex.name = name; - throw ex; + NoSuchLink ex; + ex.name = name; + throw ex; } Ice::ObjectPrx subscriber = p->obj; @@ -464,8 +464,8 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current) TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " unlink " << _instance->communicator()->identityToString(id); + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << " unlink " << _instance->communicator()->identityToString(id); } removeSubscriber(subscriber); } @@ -481,11 +481,11 @@ TopicI::getLinkInfoSeq(const Ice::Current&) const for(LinkRecordSeq::const_iterator q = _topicRecord.begin(); q != _topicRecord.end(); ++q) { - LinkInfo info; - info.name = identityToTopicName(q->theTopic->ice_getIdentity()); - info.cost = q->cost; - info.theTopic = q->theTopic; - seq.push_back(info); + LinkInfo info; + info.name = identityToTopicName(q->theTopic->ice_getIdentity()); + info.cost = q->cost; + info.theTopic = q->theTopic; + seq.push_back(info); } return seq; @@ -498,18 +498,18 @@ TopicI::destroy(const Ice::Current&) if(_destroyed) { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); + throw Ice::ObjectNotExistException(__FILE__, __LINE__); } _destroyed = true; try { - _instance->objectAdapter()->remove(_linkPrx->ice_getIdentity()); - _instance->objectAdapter()->remove(_publisherPrx->ice_getIdentity()); + _instance->objectAdapter()->remove(_linkPrx->ice_getIdentity()); + _instance->objectAdapter()->remove(_publisherPrx->ice_getIdentity()); } catch(const Ice::ObjectAdapterDeactivatedException&) { - // Ignore -- this could occur on shutdown. + // Ignore -- this could occur on shutdown. } } @@ -533,7 +533,7 @@ TopicI::reap() IceUtil::RecMutex::Lock topicSync(_topicRecordMutex); if(_destroyed) { - return; + return; } bool updated = false; @@ -543,46 +543,46 @@ TopicI::reap() // list<SubscriberPtr> error; { - IceUtil::Mutex::Lock errorSync(_errorMutex); - _error.swap(error); + IceUtil::Mutex::Lock errorSync(_errorMutex); + _error.swap(error); } TraceLevelsPtr traceLevels = _instance->traceLevels(); for(list<SubscriberPtr>::const_iterator p = error.begin(); p != error.end(); ++p) { - SubscriberPtr subscriber = *p; - assert(subscriber->persistent()); // Only persistent subscribers need to be reaped. - - bool found = false; - // - // If this turns out to be a performance problem then we - // can create an in memory map cache. - // - LinkRecordSeq::iterator q = _topicRecord.begin(); - while(q != _topicRecord.end()) - { - if(q->obj->ice_getIdentity() == subscriber->id()) - { - _topicRecord.erase(q); - updated = true; - found = true; - break; - } - ++q; - } - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "reaping " << _instance->communicator()->identityToString(subscriber->id()); - if(!found) - { - out << ": failed - not in database"; - } - } + SubscriberPtr subscriber = *p; + assert(subscriber->persistent()); // Only persistent subscribers need to be reaped. + + bool found = false; + // + // If this turns out to be a performance problem then we + // can create an in memory map cache. + // + LinkRecordSeq::iterator q = _topicRecord.begin(); + while(q != _topicRecord.end()) + { + if(q->obj->ice_getIdentity() == subscriber->id()) + { + _topicRecord.erase(q); + updated = true; + found = true; + break; + } + ++q; + } + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "reaping " << _instance->communicator()->identityToString(subscriber->id()); + if(!found) + { + out << ": failed - not in database"; + } + } } if(updated) { - _topics.put(PersistentTopicMap::value_type(_id, _topicRecord)); + _topics.put(PersistentTopicMap::value_type(_id, _topicRecord)); } } @@ -595,8 +595,8 @@ TopicI::publish(bool forwarded, const EventDataSeq& events) // vector<SubscriberPtr> copy; { - IceUtil::Mutex::Lock sync(_subscribersMutex); - copy = _subscribers; + IceUtil::Mutex::Lock sync(_subscribersMutex); + copy = _subscribers; } // @@ -608,18 +608,18 @@ TopicI::publish(bool forwarded, const EventDataSeq& events) list<SubscriberPtr> flush; for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p) { - Subscriber::QueueState state = (*p)->queue(forwarded, events); - switch(state) - { - case Subscriber::QueueStateError: - e.push_back((*p)->id()); - break; - case Subscriber::QueueStateFlush: - flush.push_back(*p); - break; - case Subscriber::QueueStateNoFlush: - break; - } + Subscriber::QueueState state = (*p)->queue(forwarded, events); + switch(state) + { + case Subscriber::QueueStateError: + e.push_back((*p)->id()); + break; + case Subscriber::QueueStateFlush: + flush.push_back(*p); + break; + case Subscriber::QueueStateNoFlush: + break; + } } // @@ -627,7 +627,7 @@ TopicI::publish(bool forwarded, const EventDataSeq& events) // if(!flush.empty()) { - _instance->subscriberPool()->flush(flush); + _instance->subscriberPool()->flush(flush); } // @@ -637,48 +637,48 @@ TopicI::publish(bool forwarded, const EventDataSeq& events) list<SubscriberPtr> reap; if(!e.empty()) { - IceUtil::Mutex::Lock sync(_subscribersMutex); - for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep) - { - // - // Its possible for the subscriber to already have been - // removed since the copy is iterated over outside of - // mutex protection. - // - // Note that although this could be quicker if we used a - // map, the most optimal case should be pushing around - // events not searching for a particular subscriber. - // - // The subscriber is immediately destroyed & removed from - // the _subscribers list. If the subscriber is persistent - // its added to an list of error'd subscribers and removed - // from the database on the next reap. - // - vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep); - if(q != _subscribers.end()) - { - // - // Destroy the subscriber in any case. - // - (*q)->destroy(); - if((*q)->persistent()) - { - reap.push_back(*q); - } - _instance->subscriberPool()->remove(*q); - _subscribers.erase(q); - } - } + IceUtil::Mutex::Lock sync(_subscribersMutex); + for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep) + { + // + // Its possible for the subscriber to already have been + // removed since the copy is iterated over outside of + // mutex protection. + // + // Note that although this could be quicker if we used a + // map, the most optimal case should be pushing around + // events not searching for a particular subscriber. + // + // The subscriber is immediately destroyed & removed from + // the _subscribers list. If the subscriber is persistent + // its added to an list of error'd subscribers and removed + // from the database on the next reap. + // + vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep); + if(q != _subscribers.end()) + { + // + // Destroy the subscriber in any case. + // + (*q)->destroy(); + if((*q)->persistent()) + { + reap.push_back(*q); + } + _instance->subscriberPool()->remove(*q); + _subscribers.erase(q); + } + } } if(!reap.empty()) { - // - // This is why _error is a list, so we can splice on the - // reaped subscribers. - // - IceUtil::Mutex::Lock errorSync(_errorMutex); - _error.splice(_error.begin(), reap); + // + // This is why _error is a list, so we can splice on the + // reaped subscribers. + // + IceUtil::Mutex::Lock errorSync(_errorMutex); + _error.splice(_error.begin(), reap); } } @@ -691,10 +691,10 @@ TopicI::removeSubscriber(const Ice::ObjectPrx& obj) vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); if(p != _subscribers.end()) { - (*p)->destroy(); - _instance->subscriberPool()->remove(*p); - _subscribers.erase(p); - return; + (*p)->destroy(); + _instance->subscriberPool()->remove(*p); + _subscribers.erase(p); + return; } // @@ -703,7 +703,7 @@ TopicI::removeSubscriber(const Ice::ObjectPrx& obj) TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _instance->communicator()->identityToString(id) << ": not subscribed."; + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _instance->communicator()->identityToString(id) << ": not subscribed."; } } |