diff options
Diffstat (limited to 'cpp/src/IceStorm/SubscriberPool.cpp')
-rw-r--r-- | cpp/src/IceStorm/SubscriberPool.cpp | 78 |
1 files changed, 46 insertions, 32 deletions
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp index 4f1e568583c..ee3865bc74d 100644 --- a/cpp/src/IceStorm/SubscriberPool.cpp +++ b/cpp/src/IceStorm/SubscriberPool.cpp @@ -45,7 +45,7 @@ public: bool computeInterval = false; while(true) { - sub = _manager->dequeue(sub, requeue, interval, computeInterval); + _manager->dequeue(sub, requeue, interval, computeInterval); if(!sub) { return; @@ -154,15 +154,14 @@ SubscriberPoolMonitor::destroy() SubscriberPool::SubscriberPool(const InstancePtr& instance) : _instance(instance), - _sizeMax(instance->properties()->getPropertyAsIntWithDefault( - "IceStorm.SubscriberPool.SizeMax", 0)), - _sizeWarn(instance->properties()->getPropertyAsIntWithDefault( - "IceStorm.SubscriberPool.SizeWarn", 0)), - _size(instance->properties()->getPropertyAsIntWithDefault( - "IceStorm.SubscriberPool.Size", 1)), + _sizeMax(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.SizeMax", 0)), + _sizeWarn(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.SizeWarn", 0)), + _size(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.Size", 1)), + // minimum 50ms, default 1s. _timeout(IceUtil::Time::milliSeconds(max(instance->properties()->getPropertyAsIntWithDefault( - "IceStorm.SubscriberPool.Timeout", 250), 50))), // minimum 50ms. - _stallCheck(_timeout * 10), // 10 * the stall timeout. + "IceStorm.SubscriberPool.Timeout", 1000), 50))), + // 10 * the stall timeout. + _stallCheck(_timeout * 10), _destroy(false), _reap(0) { @@ -202,6 +201,7 @@ SubscriberPool::flush(list<SubscriberPtr>& subscribers) // Splice on the new set of subscribers to SubscriberPool. // _pending.splice(_pending.end(), subscribers); + assert(invariants()); notifyAll(); } @@ -210,6 +210,7 @@ SubscriberPool::flush(const SubscriberPtr& subscriber) { Lock sync(*this); _pending.push_back(subscriber); + assert(invariants()); notify(); } @@ -218,6 +219,7 @@ SubscriberPool::add(const SubscriberPtr& subscriber) { Lock sync(*this); _subscribers.push_back(subscriber); + assert(invariants()); } void @@ -230,38 +232,32 @@ SubscriberPool::remove(const SubscriberPtr& subscriber) // its quite possible to have two subscribers with the same id in // the list. // - list<SubscriberPtr>::iterator p = _subscribers.begin(); - while(p != _subscribers.end()) - { - if((*p).get() == subscriber.get()) - { - _subscribers.erase(p); - return; - } - ++p; - } - - TraceLevelsPtr traceLevels = _instance->traceLevels(); - Ice::Error err(traceLevels->logger); - err << "SubscriberPool: subscriber not found: " << _instance->communicator()->identityToString(subscriber->id()); + list<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), subscriber); + assert(p != _subscribers.end()); + _subscribers.erase(p); } // // The passed subscriber need to be enqueued again. // -SubscriberPtr -SubscriberPool::dequeue(const SubscriberPtr& sub, bool requeue, const IceUtil::Time& interval, bool& computeInterval) +void +SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::Time& interval, bool& computeInterval) { Lock sync(*this); - if(sub) + if(subscriber) { if(requeue) { - _pending.push_back(sub); + _pending.push_back(subscriber); + assert(invariants()); } - sub->flushTime(interval); + subscriber->flushTime(interval); } + // + // Clear the reference. + // + subscriber = 0; // // The worker is no longer in use. @@ -351,7 +347,7 @@ SubscriberPool::dequeue(const SubscriberPtr& sub, bool requeue, const IceUtil::T out << "destroying workers"; } ++_reap; - return 0; + return; } } } @@ -368,12 +364,12 @@ SubscriberPool::dequeue(const SubscriberPtr& sub, bool requeue, const IceUtil::T if(_destroy) { - return 0; + return; } _lastDequeue = IceUtil::Time::now(); - SubscriberPtr subscriber = _pending.front(); + subscriber = _pending.front(); _pending.pop_front(); ++_inUse; @@ -395,7 +391,6 @@ SubscriberPool::dequeue(const SubscriberPtr& sub, bool requeue, const IceUtil::T // stall threads. // computeInterval = (_workers.size() - _size) > 0; - return subscriber; } void @@ -477,3 +472,22 @@ SubscriberPool::check() _workers.push_back(new SubscriberPoolWorker(this)); } } + +bool +SubscriberPool::invariants() +{ + set<SubscriberPtr> subs; + list<SubscriberPtr>::const_iterator p; + for(p = _subscribers.begin(); p != _subscribers.end(); ++p) + { + assert(subs.find(*p) == subs.end()); + subs.insert(*p); + } + subs.clear(); + for(p = _pending.begin(); p != _pending.end(); ++p) + { + assert(subs.find(*p) == subs.end()); + subs.insert(*p); + } + return true; +} |