diff options
Diffstat (limited to 'cpp/src/IceStorm/SubscriberPool.cpp')
-rw-r--r-- | cpp/src/IceStorm/SubscriberPool.cpp | 531 |
1 files changed, 0 insertions, 531 deletions
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp deleted file mode 100644 index 8f71d971657..00000000000 --- a/cpp/src/IceStorm/SubscriberPool.cpp +++ /dev/null @@ -1,531 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -#include <IceStorm/SubscriberPool.h> -#include <IceStorm/Instance.h> -#include <IceStorm/TraceLevels.h> -#include <IceStorm/Subscriber.h> - -#include <Ice/Communicator.h> -#include <Ice/Properties.h> -#include <Ice/LoggerUtil.h> - -using namespace IceStorm; -using namespace std; - -namespace -{ - -class SubscriberPoolWorker : public IceUtil::Thread -{ -public: - - SubscriberPoolWorker(const SubscriberPoolPtr& manager) : - _manager(manager) - { - start(); - } - - ~SubscriberPoolWorker() - { - } - - virtual void - run() - { - IceUtil::Time interval = IceUtil::Time::seconds(24 * 60); // A long time. - SubscriberPtr sub; - bool requeue = false; - bool computeInterval = false; - while(true) - { - _manager->dequeue(sub, requeue, interval, computeInterval); - if(!sub) - { - return; - } - - // - // If flush returns true then the subscriber needs to be - // flushed again, so therefore we will re-enqueue the - // subscriber in the call to dequeue. - // - if(computeInterval) - { - IceUtil::Time start = IceUtil::Time::now(IceUtil::Time::Monotonic); - requeue = sub->flush(); - interval = IceUtil::Time::now(IceUtil::Time::Monotonic) - start; - } - else - { - requeue = sub->flush(); - interval = IceUtil::Time::seconds(24 * 60); // A long time. - } - } - } - -private: - - const SubscriberPoolPtr _manager; -}; - -} - -SubscriberPoolMonitor::SubscriberPoolMonitor(const SubscriberPoolPtr& manager, const IceUtil::Time& timeout) : - _manager(manager), - _timeout(timeout), - _needCheck(false), - _destroyed(false) -{ - start(); -} - -SubscriberPoolMonitor::~SubscriberPoolMonitor() -{ -} - -void -SubscriberPoolMonitor::run() -{ - for(;;) - { - { - Lock sync(*this); - if(_destroyed) - { - return; - } - - if(_needCheck) - { - timedWait(_timeout); - // - // Monitoring was stopped. - // - if(!_needCheck) - { - continue; - } - if(_destroyed) - { - return; - } - } - else - { - wait(); - continue; - } - } - // - // Call outside of the lock to prevent any deadlocks. - // - _manager->check(); - } -} - -void -SubscriberPoolMonitor::startMonitor() -{ - Lock sync(*this); - if(!_needCheck) - { - _needCheck = true; - notify(); - } -} - -void -SubscriberPoolMonitor::stopMonitor() -{ - Lock sync(*this); - _needCheck = false; -} - -void -SubscriberPoolMonitor::destroy() -{ - Lock sync(*this); - _destroyed = true; - notify(); -} - -SubscriberPool::SubscriberPool(const InstancePtr& instance) : - _traceLevels(instance->traceLevels()), - _sizeMax(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.SizeMax", 0)), - _sizeWarn(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.SizeWarn", 0)), - _size(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.Size", 1)), - // minimum 50ms, default 1s. - _timeout(IceUtil::Time::milliSeconds(max(instance->properties()->getPropertyAsIntWithDefault( - "IceStorm.SubscriberPool.Timeout", 1000), 50))), - // 10 * the stall timeout. - _stallCheck(_timeout * 10), - _destroyed(false), - _reap(0), - _inUse(0) -{ - try - { - __setNoDelete(true); - _subscriberPoolMonitor = new SubscriberPoolMonitor(this, _timeout); - for(unsigned int i = 0; i < _size; ++i) - { - ++_inUse; - _workers.push_back(new SubscriberPoolWorker(this)); - } - } - catch(const IceUtil::Exception& ex) - { - { - Ice::Error out(_traceLevels->logger); - out << "SubscriberPool: " << ex; - } - destroy(); - __setNoDelete(false); - throw; - } - - __setNoDelete(false); -} - -SubscriberPool::~SubscriberPool() -{ -} - -void -SubscriberPool::flush(list<SubscriberPtr>& subscribers) -{ - Lock sync(*this); - if(_destroyed) - { - return; - } - // - // Splice on the new set of subscribers to SubscriberPool. - // - _pending.splice(_pending.end(), subscribers); - assert(invariants()); - notifyAll(); -} - -void -SubscriberPool::flush(const SubscriberPtr& subscriber) -{ - Lock sync(*this); - if(_destroyed) - { - return; - } - _pending.push_back(subscriber); - assert(invariants()); - notify(); -} - -void -SubscriberPool::add(const SubscriberPtr& subscriber) -{ - Lock sync(*this); - if(_destroyed) - { - return; - } - _subscribers.push_back(subscriber); - assert(invariants()); -} - -void -SubscriberPool::remove(const SubscriberPtr& subscriber) -{ - Lock sync(*this); - if(_destroyed) - { - return; - } - // - // Note that this cannot remove based on the subscriber id because - // the pool is TopicManager scoped and not topic scoped therefore - // its quite possible to have two subscribers with the same id in - // the list. - // - list<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), subscriber); - assert(p != _subscribers.end()); - _subscribers.erase(p); -} - -// -// The passed subscriber need to be enqueued again. -// -void -SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::Time& interval, bool& computeInterval) -{ - Lock sync(*this); - - if(_destroyed) - { - subscriber = 0; - return; - } - - if(subscriber) - { - if(requeue) - { - _pending.push_back(subscriber); - // - // Its necessary to notify here since this thread might go - // on and kill itself in which case if another worker is - // in wait() it will not wake up and process the subscriber. - // - notify(); - assert(invariants()); - } - subscriber->flushTime(interval); - } - // - // Clear the reference. - // - subscriber = 0; - - // - // The worker is no longer in use. - // - --_inUse; - - // - // If _sizeMax is 1 we never spawn up new threads if a stall is - // detected. - // - if(_sizeMax != 1) - { - // - // Reap dead workers, if necessary. - // - if(_reap > 0) - { - if(_traceLevels->subscriberPool > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "reaping: " << _reap << " workers"; - } - list<IceUtil::ThreadPtr>::iterator p = _workers.begin(); - while(p != _workers.end() && _reap > 0) - { - if(!(*p)->isAlive()) - { - (*p)->getThreadControl().join(); - p = _workers.erase(p); - --_reap; - } - else - { - ++p; - } - } - } - - // - // If we have extra workers every _stallCheck period we run - // through the complete set of subscribers and determine how - // many have stalled since the last check. If this number is - // less than the number of extra threads then we terminate the - // calling worker. - // - // - The flush time is protected by the subscriber pool mutex. - // - The flush time is only computed if we have extra threads, - // otherwise it is set to some large value. - // - The max flush time is reset to the next sending interval - // after after _stallCheck period. - // - Every subscriber is considered to be stalled iff it has - // never sent an event or we have just created the first - // additional worker. The first handles the case where a - // subscriber stalls for a long time on the first message - // send. The second means that we can disable computation of - // the flush latency if there are no additional threads. - // - if(_workers.size() > _size) - { - IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic); - if(now - _lastStallCheck > _stallCheck) - { - _lastStallCheck = now; - unsigned int stalls = 0; - for(list<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) - { - if((*p)->pollMaxFlushTime(now) > _timeout) - { - ++stalls; - } - } - - if(_traceLevels->subscriberPool > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "checking stalls. extra workers: " << _workers.size() - _size - << " subscribers: " << _subscribers.size() << " stalls: " << stalls; - } - - if((_workers.size() - _size) > stalls) - { - if(_traceLevels->subscriberPool > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "destroying workers"; - } - ++_reap; - return; - } - } - } - } - - while(_pending.empty() && !_destroyed) - { - // - // If we wait then there is no need to monitor anymore. - // - _subscriberPoolMonitor->stopMonitor(); - wait(); - } - - if(_destroyed) - { - return; - } - - _lastDequeue = IceUtil::Time::now(IceUtil::Time::Monotonic); - - subscriber = _pending.front(); - _pending.pop_front(); - - ++_inUse; - - // - // If all threads are now in use then we need to start the - // monitoring, otherwise we don't need to monitor. - // - if(_inUse == _workers.size() && (_workers.size() < _sizeMax || _sizeMax != 1)) - { - _subscriberPoolMonitor->startMonitor(); - } - else - { - _subscriberPoolMonitor->stopMonitor(); - } - // - // We only need to compute the push interval if we've created - // stall threads. - // - computeInterval = (_workers.size() - _size) > 0; -} - -void -SubscriberPool::destroy() -{ - // - // First mark the pool as destroyed. This causes all of the worker - // threads to unblock and terminate. We also clear the set of - // subscribers here since there is a cycle (instance -> pool -> - // subscribers -> instance). No new subscribers can be added once - // _destroyed is set. - // - { - Lock sync(*this); - _destroyed = true; - notifyAll(); - if(_subscriberPoolMonitor) - { - _subscriberPoolMonitor->destroy(); - } - _subscribers.clear(); - _pending.clear(); - } - // - // Next join with each worker. - // - for(list<IceUtil::ThreadPtr>::const_iterator p = _workers.begin(); p != _workers.end(); ++p) - { - (*p)->getThreadControl().join(); - } - _workers.clear(); - - // - // Once all of the workers have gone then we'll no longer have - // concurrent access to the pool monitor, so we can join with it - // and then clear to remove the circular reference count. - // - if(_subscriberPoolMonitor) - { - _subscriberPoolMonitor->getThreadControl().join(); - _subscriberPoolMonitor = 0; - } -} - -void -SubscriberPool::check() -{ - Lock sync(*this); - if(_destroyed) - { - return; - } - - IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic); - IceUtil::Time interval = now - _lastDequeue; -/* - if(_traceLevels->subscriberPool > 1) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "check called: interval: " << interval << " timeout: " << _timeout - << " pending: " << _pending.size() << " running: " << _workers.size() - << " sizeMax: " << _sizeMax; - } -*/ - - if(interval > _timeout && _pending.size() > 0 && (_workers.size() < _sizeMax || _sizeMax == 0)) - { - if(_traceLevels->subscriberPool > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "detected stall: creating thread: threads: " << _workers.size(); - } - - // - // We'll now start stall checking at regular intervals if this - // is the first newly created worker. Here we need to - // initially set the stall check and the number of requests at - // this point. - // - if(_workers.size() == _size) - { - _lastStallCheck = now; - } - - ++_inUse; - _workers.push_back(new SubscriberPoolWorker(this)); - } -} - -bool -SubscriberPool::invariants() -{ - set<SubscriberPtr> subs; - list<SubscriberPtr>::const_iterator p; - for(p = _subscribers.begin(); p != _subscribers.end(); ++p) - { - assert(subs.find(*p) == subs.end()); - subs.insert(*p); - } - subs.clear(); - for(p = _pending.begin(); p != _pending.end(); ++p) - { - assert(subs.find(*p) == subs.end()); - subs.insert(*p); - } - return true; -} |