diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 45 |
1 files changed, 21 insertions, 24 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index 8dcdbd9f64c..f5e40eb76a6 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -149,25 +149,6 @@ private: const TopicLinkPrx _obj; }; -class ResponseTimerTask : public IceUtil::TimerTask -{ -public: - ResponseTimerTask(const SubscriberPtr& subscriber) : - _subscriber(subscriber) - { - } - - virtual void - runTimerTask() - { - _subscriber->flush(); - } - -private: - - const SubscriberPtr _subscriber; -}; - class OnewayIceInvokeI : public Ice::AMI_Object_ice_invoke, public Ice::AMISentCallback { public: @@ -309,14 +290,30 @@ SubscriberBatch::doFlush() { _lock.notify(); } + + // + // If the subscriber isn't online we're done. + // + if(_state != SubscriberStateOnline) + { + return; + } v.swap(_events); assert(!v.empty()); } - vector<Ice::Byte> dummy; - for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) + try { - _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); + vector<Ice::Byte> dummy; + for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) + { + _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); + } + } + catch(const Ice::Exception& ex) + { + error(false, ex); + return; } _obj->ice_flushBatchRequests_async(new FlushBatchI(this)); @@ -398,7 +395,7 @@ SubscriberOneway::sent() { _lock.notify(); } - else if(_outstanding < _maxOutstanding && !_events.empty()) + else if(_outstanding <= 0 && !_events.empty()) { flush(); } @@ -420,7 +417,7 @@ void SubscriberTwoway::flush() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - + // // If the subscriber isn't online we're done. // |