diff options
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 504 |
1 files changed, 252 insertions, 252 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index 9174eed5949..3162116566a 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -37,7 +37,7 @@ class PerSubscriberPublisherI : public Ice::BlobjectArray public: PerSubscriberPublisherI(const InstancePtr& instance) : - _instance(instance) + _instance(instance) { } @@ -48,36 +48,36 @@ public: void setSubscriber(const SubscriberPtr& subscriber) { - _subscriber = subscriber; + _subscriber = subscriber; } virtual bool ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams, - vector<Ice::Byte>&, - const Ice::Current& current) + vector<Ice::Byte>&, + const Ice::Current& current) { - EventDataPtr event = new EventData( - current.operation, - current.mode, - Ice::ByteSeq(), - current.ctx); + EventDataPtr event = new EventData( + current.operation, + current.mode, + Ice::ByteSeq(), + current.ctx); - // - // COMPILERBUG: gcc 4.0.1 doesn't like this. - // - //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); - Ice::ByteSeq data(inParams.first, inParams.second); - event->data.swap(data); + // + // COMPILERBUG: gcc 4.0.1 doesn't like this. + // + //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + Ice::ByteSeq data(inParams.first, inParams.second); + event->data.swap(data); - EventDataSeq e; - e.push_back(event); - Subscriber::QueueState state = _subscriber->queue(false, e); + EventDataSeq e; + e.push_back(event); + Subscriber::QueueState state = _subscriber->queue(false, e); - if(state == Subscriber::QueueStateFlush) - { - _instance->subscriberPool()->flush(_subscriber); - } - return true; + if(state == Subscriber::QueueStateFlush) + { + _instance->subscriberPool()->flush(_subscriber); + } + return true; } private: @@ -188,7 +188,7 @@ SubscriberOneway::SubscriberOneway( if(_batch) { - _instance->batchFlusher()->add(_obj); + _instance->batchFlusher()->add(_obj); } } @@ -202,66 +202,66 @@ SubscriberOneway::flush() // if(_state == SubscriberStateError) { - return false; + return false; } assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); try { - // - // Get the current set of events, but release the lock before - // attempting to deliver the events. This allows other threads - // to add events in case we block (such as during connection - // establishment). - // - EventDataSeq v; - v.swap(_events); - sync.release(); - - // - // Deliver the events without holding the lock. - // - // If there are more than one event queued and we are not in - // batch sending mode then send the events as a batch and then - // flush immediately, otherwise send one at a time. - // - vector<Ice::Byte> dummy; - if(v.size() > 1 && !_batch) - { - for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) - { - _objBatch->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); - } - Ice::ConnectionPtr conn = _objBatch->ice_getCachedConnection(); - assert(conn); - conn->flushBatchRequests(); - } - else - { - for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) - { - _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); - } - } - - // - // Reacquire the lock before we check the queue again. - // - sync.acquire(); + // + // Get the current set of events, but release the lock before + // attempting to deliver the events. This allows other threads + // to add events in case we block (such as during connection + // establishment). + // + EventDataSeq v; + v.swap(_events); + sync.release(); + + // + // Deliver the events without holding the lock. + // + // If there are more than one event queued and we are not in + // batch sending mode then send the events as a batch and then + // flush immediately, otherwise send one at a time. + // + vector<Ice::Byte> dummy; + if(v.size() > 1 && !_batch) + { + for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) + { + _objBatch->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); + } + Ice::ConnectionPtr conn = _objBatch->ice_getCachedConnection(); + assert(conn); + conn->flushBatchRequests(); + } + else + { + for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) + { + _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); + } + } + + // + // Reacquire the lock before we check the queue again. + // + sync.acquire(); } catch(const Ice::LocalException& ex) { - assert(!sync.acquired()); - // error will re-acquire and release the lock. - error(ex); - return false; + assert(!sync.acquired()); + // error will re-acquire and release the lock. + error(ex); + return false; } if(!_events.empty()) { - assert(_state == SubscriberStateFlushPending); - return true; + assert(_state == SubscriberStateFlushPending); + return true; } _state = SubscriberStateOnline; return false; @@ -272,7 +272,7 @@ SubscriberOneway::destroy() { if(_batch) { - _instance->batchFlusher()->remove(_obj); + _instance->batchFlusher()->remove(_obj); } Subscriber::destroy(); } @@ -285,7 +285,7 @@ class TwowayInvokeI : public Ice::AMI_Object_ice_invoke public: TwowayInvokeI(const SubscriberPtr& subscriber) : - _subscriber(subscriber) + _subscriber(subscriber) { } @@ -297,7 +297,7 @@ public: virtual void ice_exception(const Ice::Exception& e) { - _subscriber->error(e); + _subscriber->error(e); } private: @@ -326,7 +326,7 @@ SubscriberTwoway::flush() // if(_state == SubscriberStateError) { - return false; + return false; } assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); @@ -346,7 +346,7 @@ SubscriberTwoway::flush() // for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) { - _obj->ice_invoke_async(new TwowayInvokeI(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context); + _obj->ice_invoke_async(new TwowayInvokeI(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context); } // @@ -360,8 +360,8 @@ SubscriberTwoway::flush() // if(!_events.empty()) { - assert(_state == SubscriberStateFlushPending); - return true; + assert(_state == SubscriberStateFlushPending); + return true; } _state = SubscriberStateOnline; return false; @@ -375,20 +375,20 @@ class TwowayOrderedInvokeI : public Ice::AMI_Object_ice_invoke public: TwowayOrderedInvokeI(const SubscriberTwowayOrderedPtr& subscriber) : - _subscriber(subscriber) + _subscriber(subscriber) { } virtual void ice_response(bool, const std::vector<Ice::Byte>&) { - _subscriber->response(); + _subscriber->response(); } virtual void ice_exception(const Ice::Exception& ex) { - _subscriber->error(ex); + _subscriber->error(ex); } private: @@ -412,20 +412,20 @@ SubscriberTwowayOrdered::flush() { EventDataPtr e; { - IceUtil::Mutex::Lock sync(_mutex); - - // - // If the subscriber errored out then we're done. - // - if(_state == SubscriberStateError) - { - return false; - } - assert(_state == SubscriberStateFlushPending); - assert(!_events.empty()); - - e = _events.front(); - _events.erase(_events.begin()); + IceUtil::Mutex::Lock sync(_mutex); + + // + // If the subscriber errored out then we're done. + // + if(_state == SubscriberStateError) + { + return false; + } + assert(_state == SubscriberStateFlushPending); + assert(!_events.empty()); + + e = _events.front(); + _events.erase(_events.begin()); } _obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context); @@ -441,8 +441,8 @@ SubscriberTwowayOrdered::response() assert(_state != SubscriberStateError); if(_events.empty()) { - _state = SubscriberStateOnline; - return; + _state = SubscriberStateOnline; + return; } _instance->subscriberPool()->flush(this); } @@ -455,31 +455,31 @@ class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward public: Topiclink_forwardI(const SubscriberLinkPtr& subscriber) : - _subscriber(subscriber) + _subscriber(subscriber) { } virtual void ice_response() { - _subscriber->response(); + _subscriber->response(); } virtual void ice_exception(const Ice::Exception& ex) { - try - { - ex.ice_throw(); - } - catch(const Ice::ObjectNotExistException& ex) - { - _subscriber->error(ex); - } - catch(const Ice::LocalException& ex) - { - _subscriber->offline(ex); - } + try + { + ex.ice_throw(); + } + catch(const Ice::ObjectNotExistException& ex) + { + _subscriber->error(ex); + } + catch(const Ice::LocalException& ex) + { + _subscriber->offline(ex); + } } private: @@ -505,7 +505,7 @@ SubscriberLink::queue(bool forwarded, const EventDataSeq& events) { if(forwarded) { - return QueueStateNoFlush; + return QueueStateNoFlush; } // @@ -518,7 +518,7 @@ SubscriberLink::queue(bool forwarded, const EventDataSeq& events) if(_state == SubscriberStateError) { - return QueueStateError; + return QueueStateError; } // @@ -527,53 +527,53 @@ SubscriberLink::queue(bool forwarded, const EventDataSeq& events) // if(_state == SubscriberStateOffline) { - // - // If there are alot of subscribers offline then we will call - // Time::now() alot, which could be costly. This could be - // optimized to only one per event-batch by making the - // forwarded argument an EventInfo thing where the queue-time - // is lazy initialized. - // - if(IceUtil::Time::now() < _next) - { - return QueueStateNoFlush; - } - - // - // State transition to online. - // - _state = SubscriberStateOnline; + // + // If there are alot of subscribers offline then we will call + // Time::now() alot, which could be costly. This could be + // optimized to only one per event-batch by making the + // forwarded argument an EventInfo thing where the queue-time + // is lazy initialized. + // + if(IceUtil::Time::now() < _next) + { + return QueueStateNoFlush; + } + + // + // State transition to online. + // + _state = SubscriberStateOnline; } int queued = 0; for(EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p) { - if(_cost != 0) - { - // - // Note that we could calculate this cost once and cache - // it in a private form of the event to avoid this if this - // really is a performance problem (this could use the - // EventInfo thing discussed above). - // - int cost = 0; - Ice::Context::const_iterator q = (*p)->context.find("cost"); - if(q != (*p)->context.end()) - { - cost = atoi(q->second.c_str()); - } - if(cost > _cost) - { - continue; - } - } - ++queued; - _events.push_back(*p); + if(_cost != 0) + { + // + // Note that we could calculate this cost once and cache + // it in a private form of the event to avoid this if this + // really is a performance problem (this could use the + // EventInfo thing discussed above). + // + int cost = 0; + Ice::Context::const_iterator q = (*p)->context.find("cost"); + if(q != (*p)->context.end()) + { + cost = atoi(q->second.c_str()); + } + if(cost > _cost) + { + continue; + } + } + ++queued; + _events.push_back(*p); } if(_state == SubscriberStateFlushPending || queued == 0) { - return QueueStateNoFlush; + return QueueStateNoFlush; } _state = SubscriberStateFlushPending; return QueueStateFlush; @@ -584,20 +584,20 @@ SubscriberLink::flush() { EventDataSeq v; { - IceUtil::Mutex::Lock sync(_mutex); - - // - // If the subscriber errored out then we're done. - // - if(_state == SubscriberStateError) - { - return false; - } + IceUtil::Mutex::Lock sync(_mutex); + + // + // If the subscriber errored out then we're done. + // + if(_state == SubscriberStateError) + { + return false; + } - assert(_state == SubscriberStateFlushPending); - assert(!_events.empty()); - - v.swap(_events); + assert(_state == SubscriberStateFlushPending); + assert(!_events.empty()); + + v.swap(_events); } _obj->forward_async(new Topiclink_forwardI(this), v); @@ -622,8 +622,8 @@ SubscriberLink::response() // if(_events.empty()) { - _state = SubscriberStateOnline; - return; + _state = SubscriberStateOnline; + return; } _instance->subscriberPool()->flush(this); } @@ -639,18 +639,18 @@ SubscriberLink::offline(const Ice::Exception& e) TraceLevelsPtr traceLevels = _instance->traceLevels(); if(_warn) { - Ice::Warning warn(traceLevels->logger); - warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_id) - << ": link offline: " << e; + Ice::Warning warn(traceLevels->logger); + warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_id) + << ": link offline: " << e; } else { - if(traceLevels->subscriber > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << _instance->communicator()->identityToString(_id) << ": link offline: " << e - << " discarding events: " << _instance->discardInterval() << "s"; - } + if(traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << _instance->communicator()->identityToString(_id) << ": link offline: " << e + << " discarding events: " << _instance->discardInterval() << "s"; + } } _state = SubscriberStateOffline; @@ -675,57 +675,57 @@ Subscriber::create( try { - string reliability; - QoS::const_iterator p = qos.find("reliability"); - if(p != qos.end()) - { - reliability = p->second; - } - if(!reliability.empty() && reliability != "ordered") - { - throw BadQoS("invalid reliability: " + reliability); - } - - // - // Override the timeout. - // - Ice::ObjectPrx newObj; - try - { - newObj = obj->ice_timeout(instance->sendTimeout()); - } - catch(const Ice::FixedProxyException&) - { - // - // In the event IceStorm is collocated this could be a - // fixed proxy in which case its not possible to set the - // timeout. - // - newObj = obj; - } - if(reliability == "ordered") - { - if(!newObj->ice_isTwoway()) - { - throw BadQoS("ordered reliability requires a twoway proxy"); - } - subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj); - } - else if(newObj->ice_isOneway() || newObj->ice_isDatagram() || - newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram()) - { - subscriber = new SubscriberOneway(instance, proxy, newObj); - } - else if(newObj->ice_isTwoway()) - { - subscriber = new SubscriberTwoway(instance, proxy, newObj); - } - per->setSubscriber(subscriber); + string reliability; + QoS::const_iterator p = qos.find("reliability"); + if(p != qos.end()) + { + reliability = p->second; + } + if(!reliability.empty() && reliability != "ordered") + { + throw BadQoS("invalid reliability: " + reliability); + } + + // + // Override the timeout. + // + Ice::ObjectPrx newObj; + try + { + newObj = obj->ice_timeout(instance->sendTimeout()); + } + catch(const Ice::FixedProxyException&) + { + // + // In the event IceStorm is collocated this could be a + // fixed proxy in which case its not possible to set the + // timeout. + // + newObj = obj; + } + if(reliability == "ordered") + { + if(!newObj->ice_isTwoway()) + { + throw BadQoS("ordered reliability requires a twoway proxy"); + } + subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj); + } + else if(newObj->ice_isOneway() || newObj->ice_isDatagram() || + newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram()) + { + subscriber = new SubscriberOneway(instance, proxy, newObj); + } + else if(newObj->ice_isTwoway()) + { + subscriber = new SubscriberTwoway(instance, proxy, newObj); + } + per->setSubscriber(subscriber); } catch(const Ice::Exception&) { - instance->objectAdapter()->remove(proxy->ice_getIdentity()); - throw; + instance->objectAdapter()->remove(proxy->ice_getIdentity()); + throw; } return subscriber; @@ -738,9 +738,9 @@ Subscriber::create( int cost) { return new SubscriberLink( - instance, - TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())), - cost); + instance, + TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())), + cost); } Subscriber::~Subscriber() @@ -772,13 +772,13 @@ Subscriber::queue(bool, const EventDataSeq& events) if(_state == SubscriberStateError) { - return QueueStateError; + return QueueStateError; } copy(events.begin(), events.end(), back_inserter(_events)); if(_state == SubscriberStateFlushPending) { - return QueueStateNoFlush; + return QueueStateNoFlush; } _state = SubscriberStateFlushPending; @@ -793,18 +793,18 @@ Subscriber::destroy() // if(_proxy) { - try - { - _instance->objectAdapter()->remove(_proxy->ice_getIdentity()); - } - catch(const Ice::NotRegisteredException&) - { - // Ignore - } - catch(const Ice::ObjectAdapterDeactivatedException&) - { - // Ignore - } + try + { + _instance->objectAdapter()->remove(_proxy->ice_getIdentity()); + } + catch(const Ice::NotRegisteredException&) + { + // Ignore + } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + // Ignore + } } } @@ -813,9 +813,9 @@ Subscriber::flushTime(const IceUtil::Time& interval) { if(_resetMax || interval > _maxSend) { - assert(interval != IceUtil::Time()); - _resetMax = false; - _maxSend = interval; + assert(interval != IceUtil::Time()); + _resetMax = false; + _maxSend = interval; } } @@ -833,15 +833,15 @@ Subscriber::error(const Ice::Exception& e) IceUtil::Mutex::Lock sync(_mutex); if(_state != SubscriberStateError) { - _state = SubscriberStateError; - _events.clear(); - - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->subscriber > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << _instance->communicator()->identityToString(_id) << ": topic publish failed: " << e; - } + _state = SubscriberStateError; + _events.clear(); + + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << _instance->communicator()->identityToString(_id) << ": topic publish failed: " << e; + } } } |