diff options
Diffstat (limited to 'cpp/src/IceStorm/SubscriberPool.cpp')
-rw-r--r-- | cpp/src/IceStorm/SubscriberPool.cpp | 242 |
1 files changed, 171 insertions, 71 deletions
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp index 6f42665af37..7943d3e1129 100644 --- a/cpp/src/IceStorm/SubscriberPool.cpp +++ b/cpp/src/IceStorm/SubscriberPool.cpp @@ -19,7 +19,7 @@ using namespace IceStorm; using namespace std; -namespace IceStorm +namespace { class SubscriberPoolWorker : public IceUtil::Thread @@ -39,22 +39,33 @@ public: virtual void run() { + IceUtil::Time interval = IceUtil::Time::seconds(24 * 60); // A long time. SubscriberPtr sub; + bool requeue = false; + bool computeInterval = false; while(true) { - sub = _manager->dequeue(sub); + sub = _manager->dequeue(sub, requeue, interval, computeInterval); if(!sub) { return; } + // - // If SubscriberPool returns true then the subscriber needs to be - // SubscriberPooled again, so therefore we will re-enqueue the - // subscriber in the call to dequeue. + // If SubscriberPool returns true then the subscriber + // needs to be SubscriberPooled again, so therefore we + // will re-enqueue the subscriber in the call to dequeue. // - if(!sub->flush()) + if(computeInterval) + { + IceUtil::Time start = IceUtil::Time::now(); + requeue = sub->flush(); + interval = IceUtil::Time::now() - start; + } + else { - sub = 0; + requeue = sub->flush(); + interval = IceUtil::Time::seconds(24 * 60); // A long time. } } } @@ -144,25 +155,23 @@ SubscriberPoolMonitor::destroy() SubscriberPool::SubscriberPool(const InstancePtr& instance) : _instance(instance), _sizeMax(instance->properties()->getPropertyAsIntWithDefault( - "IceStorm.SubscriberPool.SizeMax", -1)), + "IceStorm.SubscriberPool.SizeMax", 0)), _sizeWarn(instance->properties()->getPropertyAsIntWithDefault( "IceStorm.SubscriberPool.SizeWarn", 0)), _size(instance->properties()->getPropertyAsIntWithDefault( "IceStorm.SubscriberPool.Size", 1)), _timeout(IceUtil::Time::milliSeconds(max(instance->properties()->getPropertyAsIntWithDefault( "IceStorm.SubscriberPool.Timeout", 250), 50))), // minimum 50ms. + _stallCheck(_timeout * 10), // 10 * the stall timeout. _destroy(false), - _inUse(0), - _running(0), - _load(1.0) + _reap(0) { try { __setNoDelete(true); _subscriberPoolMonitor = new SubscriberPoolMonitor(this, _timeout); - for(int i = 0; i < _size; ++i) + for(unsigned int i = 0; i < _size; ++i) { - ++_running; ++_inUse; _workers.push_back(new SubscriberPoolWorker(this)); } @@ -186,9 +195,9 @@ SubscriberPool::~SubscriberPool() } void -SubscriberPool::add(list<SubscriberPtr>& subscribers) +SubscriberPool::flush(list<SubscriberPtr>& subscribers) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + Lock sync(*this); // // Splice on the new set of subscribers to SubscriberPool. // @@ -196,76 +205,150 @@ SubscriberPool::add(list<SubscriberPtr>& subscribers) notifyAll(); } +void +SubscriberPool::add(const SubscriberPtr& subscriber) +{ + Lock sync(*this); + _subscribers.push_back(subscriber); +} + +void +SubscriberPool::remove(const SubscriberPtr& subscriber) +{ + Lock sync(*this); + // + // Note that this cannot remove based on the subscriber id because + // the pool is TopicManager scoped and not topic scoped therefore + // its quite possible to have two subscribers with the same id in + // the list. + // + list<SubscriberPtr>::iterator p = _subscribers.begin(); + while(p != _subscribers.end()) + { + if((*p).get() == subscriber.get()) + { + _subscribers.erase(p); + return; + } + ++p; + } + + TraceLevelsPtr traceLevels = _instance->traceLevels(); + Ice::Error err(traceLevels->logger); + err << "SubscriberPool: subscriber not found: " << _instance->communicator()->identityToString(subscriber->id()); +} + // // The passed subscriber need to be enqueued again. // SubscriberPtr -SubscriberPool::dequeue(const SubscriberPtr& sub) +SubscriberPool::dequeue(const SubscriberPtr& sub, bool requeue, const IceUtil::Time& interval, bool& computeInterval) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + Lock sync(*this); if(sub) { - _pending.push_back(sub); + if(requeue) + { + _pending.push_back(sub); + } + sub->flushTime(interval); } // - // Now we check if this thread can be destroyed, based on a - // load factor. + // The worker is no longer in use. // - // The load factor jumps immediately to the number of threads - // that are currently in use, but decays exponentially if the - // number of threads in use is smaller than the load - // factor. This reflects that we create threads immediately - // when they are needed, but want the number of threads to - // slowly decline to the configured minimum. - // - double inUse = static_cast<double>(_inUse); - if(_load < inUse) - { - _load = inUse; - } - else - { - const double loadFactor = 0.05; // TODO: Configurable? - const double oneMinusLoadFactor = 1 - loadFactor; - _load = _load * oneMinusLoadFactor + inUse * loadFactor; - } + --_inUse; - if(_running > _size) + // + // If _sizeMax is 1 we never spawn up new threads if a stall is + // detected. + // + if(_sizeMax != 1) { TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->subscriberPool > 1) - { - IceUtil::Time interval = IceUtil::Time::now() - _lastNext; - Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); - out << "load check: " << _load; - } - int load = static_cast<int>(_load + 0.5); - - if(load < _running) + // + // Reap dead workers, if necessary. + // + if(_reap > 0) { - assert(_inUse > 0); - --_inUse; - - assert(_running > 0); - --_running; - if(traceLevels->subscriberPool > 0) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); - out << "reducing SubscriberPool threads: load: " << _load << " threads: " << _running; + out << "reaping: " << _reap << " workers"; + } + list<IceUtil::ThreadPtr>::iterator p = _workers.begin(); + while(p != _workers.end() && _reap > 0) + { + if(!(*p)->isAlive()) + { + (*p)->getThreadControl().join(); + p = _workers.erase(p); + --_reap; + } + else + { + ++p; + } } + } - return 0; + // + // If we have extra workers every _stallCheck period we run + // through the complete set of subscribers and determine how + // many have stalled since the last check. If this number is + // less than the number of extra threads then we terminate the + // calling worker. + // + // - The flush time is protected by the subscriber pool mutex. + // - The flush time is only computed if we have extra threads, + // otherwise it is set to some large value. + // - The max flush time is reset to the next sending interval + // after after _stallCheck period. + // - Every subscriber is considered to be stalled iff it has + // never sent an event or we have just created the first + // additional worker. The first handles the case where a + // subscriber stalls for a long time on the first message + // send. The second means that we can disable computation of + // the flush latency if there are no additional threads. + // + if(_workers.size() > _size) + { + IceUtil::Time now = IceUtil::Time::now(); + if(now - _lastStallCheck > _stallCheck) + { + _lastStallCheck = now; + unsigned int stalls = 0; + for(list<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + { + if((*p)->pollMaxFlushTime(now) > _timeout) + { + ++stalls; + } + } + + if(traceLevels->subscriberPool > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); + out << "checking stalls. extra workers: " << _workers.size() - _size + << " subscribers: " << _subscribers.size() << " stalls: " << stalls; + } + + if((_workers.size() - _size) > stalls) + { + if(traceLevels->subscriberPool > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); + out << "destroying workers"; + } + ++_reap; + return 0; + } + } } - } - - assert(_inUse > 0); - --_inUse; - + while(_pending.empty() && !_destroy) { // @@ -277,11 +360,10 @@ SubscriberPool::dequeue(const SubscriberPtr& sub) if(_destroy) { - --_running; return 0; } - _lastNext = IceUtil::Time::now(); + _lastDequeue = IceUtil::Time::now(); SubscriberPtr subscriber = _pending.front(); _pending.pop_front(); @@ -292,7 +374,7 @@ SubscriberPool::dequeue(const SubscriberPtr& sub) // If all threads are now in use then we need to start the // monitoring, otherwise we don't need to monitor. // - if(_inUse == _running && (_running < _sizeMax || _sizeMax == -1)) + if(_inUse == _workers.size() && (_workers.size() < _sizeMax || _sizeMax != 1)) { _subscriberPoolMonitor->startMonitor(); } @@ -300,6 +382,11 @@ SubscriberPool::dequeue(const SubscriberPtr& sub) { _subscriberPoolMonitor->stopMonitor(); } + // + // We only need to compute the push interval if we've created + // stall threads. + // + computeInterval = (_workers.size() - _size) > 0; return subscriber; } @@ -311,7 +398,7 @@ SubscriberPool::destroy() // threads to unblock and terminate. // { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + Lock sync(*this); _destroy = true; notifyAll(); if(_subscriberPoolMonitor) @@ -347,24 +434,37 @@ SubscriberPool::check() Lock sync(*this); TraceLevelsPtr traceLevels = _instance->traceLevels(); - IceUtil::Time interval = IceUtil::Time::now() - _lastNext; + IceUtil::Time now = IceUtil::Time::now(); + IceUtil::Time interval = now - _lastDequeue; +/* if(traceLevels->subscriberPool > 1) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); out << "check called: interval: " << interval << " timeout: " << _timeout - << " pending: " << _pending.size() << " running: " << _running + << " pending: " << _pending.size() << " running: " << _workers.size() << " sizeMax: " << _sizeMax; } - - if(interval > _timeout && _pending.size() > 0 && (_running < _sizeMax || _sizeMax == -1)) +*/ + + if(interval > _timeout && _pending.size() > 0 && (_workers.size() < _sizeMax || _sizeMax == 0)) { if(traceLevels->subscriberPool > 0) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); - out << "detected stall: creating thread: load: " << _load << " threads: " << _running; + out << "detected stall: creating thread: threads: " << _workers.size(); } - ++_running; + // + // We'll now start stall checking at regular intervals if this + // is the first newly created worker. Here we need to + // initially set the stall check and the number of requests at + // this point. + // + if(_workers.size() == _size) + { + _lastStallCheck = now; + } + ++_inUse; _workers.push_back(new SubscriberPoolWorker(this)); } |