diff options
author | Bernard Normier <bernard@zeroc.com> | 2007-02-01 17:09:49 +0000 |
---|---|---|
committer | Bernard Normier <bernard@zeroc.com> | 2007-02-01 17:09:49 +0000 |
commit | abada90e3f84dc703b8ddc9efcbed8a946fadead (patch) | |
tree | 2c6f9dccd510ea97cb927a7bd635422efaae547a /cpp/src/IceStorm/SubscriberPool.cpp | |
parent | removing trace message (diff) | |
download | ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.bz2 ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.xz ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.zip |
Expanded tabs into spaces
Diffstat (limited to 'cpp/src/IceStorm/SubscriberPool.cpp')
-rw-r--r-- | cpp/src/IceStorm/SubscriberPool.cpp | 422 |
1 files changed, 211 insertions, 211 deletions
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp index 42c92fd97cf..1d1fd773028 100644 --- a/cpp/src/IceStorm/SubscriberPool.cpp +++ b/cpp/src/IceStorm/SubscriberPool.cpp @@ -27,9 +27,9 @@ class SubscriberPoolWorker : public IceUtil::Thread public: SubscriberPoolWorker(const SubscriberPoolPtr& manager) : - _manager(manager) + _manager(manager) { - start(); + start(); } ~SubscriberPoolWorker() @@ -39,35 +39,35 @@ public: virtual void run() { - IceUtil::Time interval = IceUtil::Time::seconds(24 * 60); // A long time. - SubscriberPtr sub; - bool requeue = false; - bool computeInterval = false; - while(true) - { - _manager->dequeue(sub, requeue, interval, computeInterval); - if(!sub) - { - return; - } - - // - // If SubscriberPool returns true then the subscriber - // needs to be SubscriberPooled again, so therefore we - // will re-enqueue the subscriber in the call to dequeue. - // - if(computeInterval) - { - IceUtil::Time start = IceUtil::Time::now(); - requeue = sub->flush(); - interval = IceUtil::Time::now() - start; - } - else - { - requeue = sub->flush(); - interval = IceUtil::Time::seconds(24 * 60); // A long time. - } - } + IceUtil::Time interval = IceUtil::Time::seconds(24 * 60); // A long time. + SubscriberPtr sub; + bool requeue = false; + bool computeInterval = false; + while(true) + { + _manager->dequeue(sub, requeue, interval, computeInterval); + if(!sub) + { + return; + } + + // + // If SubscriberPool returns true then the subscriber + // needs to be SubscriberPooled again, so therefore we + // will re-enqueue the subscriber in the call to dequeue. + // + if(computeInterval) + { + IceUtil::Time start = IceUtil::Time::now(); + requeue = sub->flush(); + interval = IceUtil::Time::now() - start; + } + else + { + requeue = sub->flush(); + interval = IceUtil::Time::seconds(24 * 60); // A long time. + } + } } private: @@ -96,32 +96,32 @@ SubscriberPoolMonitor::run() for(;;) { { - Lock sync(*this); - if(_destroyed) - { - return; - } + Lock sync(*this); + if(_destroyed) + { + return; + } - if(_needCheck) - { - timedWait(_timeout); - // - // Monitoring was stopped. - // - if(!_needCheck) - { - continue; - } - if(_destroyed) - { - return; - } - } - else - { - wait(); - continue; - } + if(_needCheck) + { + timedWait(_timeout); + // + // Monitoring was stopped. + // + if(!_needCheck) + { + continue; + } + if(_destroyed) + { + return; + } + } + else + { + wait(); + continue; + } } // // Call outside of the lock to prevent any deadlocks. @@ -163,7 +163,7 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) : _size(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.Size", 1)), // minimum 50ms, default 1s. _timeout(IceUtil::Time::milliSeconds(max(instance->properties()->getPropertyAsIntWithDefault( - "IceStorm.SubscriberPool.Timeout", 1000), 50))), + "IceStorm.SubscriberPool.Timeout", 1000), 50))), // 10 * the stall timeout. _stallCheck(_timeout * 10), _destroyed(false), @@ -171,23 +171,23 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) : { try { - __setNoDelete(true); - _subscriberPoolMonitor = new SubscriberPoolMonitor(this, _timeout); - for(unsigned int i = 0; i < _size; ++i) - { - ++_inUse; - _workers.push_back(new SubscriberPoolWorker(this)); - } + __setNoDelete(true); + _subscriberPoolMonitor = new SubscriberPoolMonitor(this, _timeout); + for(unsigned int i = 0; i < _size; ++i) + { + ++_inUse; + _workers.push_back(new SubscriberPoolWorker(this)); + } } catch(const IceUtil::Exception& ex) { - { - Ice::Error out(_traceLevels->logger); - out << "SubscriberPool: " << ex; - } - destroy(); - __setNoDelete(false); - throw; + { + Ice::Error out(_traceLevels->logger); + out << "SubscriberPool: " << ex; + } + destroy(); + __setNoDelete(false); + throw; } __setNoDelete(false); @@ -203,7 +203,7 @@ SubscriberPool::flush(list<SubscriberPtr>& subscribers) Lock sync(*this); if(_destroyed) { - return; + return; } // // Splice on the new set of subscribers to SubscriberPool. @@ -219,7 +219,7 @@ SubscriberPool::flush(const SubscriberPtr& subscriber) Lock sync(*this); if(_destroyed) { - return; + return; } _pending.push_back(subscriber); assert(invariants()); @@ -232,7 +232,7 @@ SubscriberPool::add(const SubscriberPtr& subscriber) Lock sync(*this); if(_destroyed) { - return; + return; } _subscribers.push_back(subscriber); assert(invariants()); @@ -244,7 +244,7 @@ SubscriberPool::remove(const SubscriberPtr& subscriber) Lock sync(*this); if(_destroyed) { - return; + return; } // // Note that this cannot remove based on the subscriber id because @@ -267,18 +267,18 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: if(_destroyed) { - subscriber = 0; - return; + subscriber = 0; + return; } if(subscriber) { - if(requeue) - { - _pending.push_back(subscriber); - assert(invariants()); - } - subscriber->flushTime(interval); + if(requeue) + { + _pending.push_back(subscriber); + assert(invariants()); + } + subscriber->flushTime(interval); } // // Clear the reference. @@ -297,99 +297,99 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: if(_sizeMax != 1) { - // - // Reap dead workers, if necessary. - // - if(_reap > 0) - { - if(_traceLevels->subscriberPool > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "reaping: " << _reap << " workers"; - } - list<IceUtil::ThreadPtr>::iterator p = _workers.begin(); - while(p != _workers.end() && _reap > 0) - { - if(!(*p)->isAlive()) - { - (*p)->getThreadControl().join(); - p = _workers.erase(p); - --_reap; - } - else - { - ++p; - } - } - } - - // - // If we have extra workers every _stallCheck period we run - // through the complete set of subscribers and determine how - // many have stalled since the last check. If this number is - // less than the number of extra threads then we terminate the - // calling worker. - // - // - The flush time is protected by the subscriber pool mutex. - // - The flush time is only computed if we have extra threads, - // otherwise it is set to some large value. - // - The max flush time is reset to the next sending interval - // after after _stallCheck period. - // - Every subscriber is considered to be stalled iff it has - // never sent an event or we have just created the first - // additional worker. The first handles the case where a - // subscriber stalls for a long time on the first message - // send. The second means that we can disable computation of - // the flush latency if there are no additional threads. - // - if(_workers.size() > _size) - { - IceUtil::Time now = IceUtil::Time::now(); - if(now - _lastStallCheck > _stallCheck) - { - _lastStallCheck = now; - unsigned int stalls = 0; - for(list<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) - { - if((*p)->pollMaxFlushTime(now) > _timeout) - { - ++stalls; - } - } - - if(_traceLevels->subscriberPool > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "checking stalls. extra workers: " << _workers.size() - _size - << " subscribers: " << _subscribers.size() << " stalls: " << stalls; - } - - if((_workers.size() - _size) > stalls) - { - if(_traceLevels->subscriberPool > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "destroying workers"; - } - ++_reap; - return; - } - } - } + // + // Reap dead workers, if necessary. + // + if(_reap > 0) + { + if(_traceLevels->subscriberPool > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); + out << "reaping: " << _reap << " workers"; + } + list<IceUtil::ThreadPtr>::iterator p = _workers.begin(); + while(p != _workers.end() && _reap > 0) + { + if(!(*p)->isAlive()) + { + (*p)->getThreadControl().join(); + p = _workers.erase(p); + --_reap; + } + else + { + ++p; + } + } + } + + // + // If we have extra workers every _stallCheck period we run + // through the complete set of subscribers and determine how + // many have stalled since the last check. If this number is + // less than the number of extra threads then we terminate the + // calling worker. + // + // - The flush time is protected by the subscriber pool mutex. + // - The flush time is only computed if we have extra threads, + // otherwise it is set to some large value. + // - The max flush time is reset to the next sending interval + // after after _stallCheck period. + // - Every subscriber is considered to be stalled iff it has + // never sent an event or we have just created the first + // additional worker. The first handles the case where a + // subscriber stalls for a long time on the first message + // send. The second means that we can disable computation of + // the flush latency if there are no additional threads. + // + if(_workers.size() > _size) + { + IceUtil::Time now = IceUtil::Time::now(); + if(now - _lastStallCheck > _stallCheck) + { + _lastStallCheck = now; + unsigned int stalls = 0; + for(list<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + { + if((*p)->pollMaxFlushTime(now) > _timeout) + { + ++stalls; + } + } + + if(_traceLevels->subscriberPool > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); + out << "checking stalls. extra workers: " << _workers.size() - _size + << " subscribers: " << _subscribers.size() << " stalls: " << stalls; + } + + if((_workers.size() - _size) > stalls) + { + if(_traceLevels->subscriberPool > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); + out << "destroying workers"; + } + ++_reap; + return; + } + } + } } - + while(_pending.empty() && !_destroyed) { - // - // If we wait then there is no need to monitor anymore. - // - _subscriberPoolMonitor->stopMonitor(); - wait(); + // + // If we wait then there is no need to monitor anymore. + // + _subscriberPoolMonitor->stopMonitor(); + wait(); } if(_destroyed) { - return; + return; } _lastDequeue = IceUtil::Time::now(); @@ -405,11 +405,11 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: // if(_inUse == _workers.size() && (_workers.size() < _sizeMax || _sizeMax != 1)) { - _subscriberPoolMonitor->startMonitor(); + _subscriberPoolMonitor->startMonitor(); } else { - _subscriberPoolMonitor->stopMonitor(); + _subscriberPoolMonitor->stopMonitor(); } // // We only need to compute the push interval if we've created @@ -429,22 +429,22 @@ SubscriberPool::destroy() // _destroyed is set. // { - Lock sync(*this); - _destroyed = true; - notifyAll(); - if(_subscriberPoolMonitor) - { - _subscriberPoolMonitor->destroy(); - } - _subscribers.clear(); - _pending.clear(); + Lock sync(*this); + _destroyed = true; + notifyAll(); + if(_subscriberPoolMonitor) + { + _subscriberPoolMonitor->destroy(); + } + _subscribers.clear(); + _pending.clear(); } // // Next join with each worker. // for(list<IceUtil::ThreadPtr>::const_iterator p = _workers.begin(); p != _workers.end(); ++p) { - (*p)->getThreadControl().join(); + (*p)->getThreadControl().join(); } _workers.clear(); @@ -455,8 +455,8 @@ SubscriberPool::destroy() // if(_subscriberPoolMonitor) { - _subscriberPoolMonitor->getThreadControl().join(); - _subscriberPoolMonitor = 0; + _subscriberPoolMonitor->getThreadControl().join(); + _subscriberPoolMonitor = 0; } } @@ -466,7 +466,7 @@ SubscriberPool::check() Lock sync(*this); if(_destroyed) { - return; + return; } IceUtil::Time now = IceUtil::Time::now(); @@ -474,34 +474,34 @@ SubscriberPool::check() /* if(_traceLevels->subscriberPool > 1) { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "check called: interval: " << interval << " timeout: " << _timeout - << " pending: " << _pending.size() << " running: " << _workers.size() - << " sizeMax: " << _sizeMax; + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); + out << "check called: interval: " << interval << " timeout: " << _timeout + << " pending: " << _pending.size() << " running: " << _workers.size() + << " sizeMax: " << _sizeMax; } */ if(interval > _timeout && _pending.size() > 0 && (_workers.size() < _sizeMax || _sizeMax == 0)) { - if(_traceLevels->subscriberPool > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "detected stall: creating thread: threads: " << _workers.size(); - } - - // - // We'll now start stall checking at regular intervals if this - // is the first newly created worker. Here we need to - // initially set the stall check and the number of requests at - // this point. - // - if(_workers.size() == _size) - { - _lastStallCheck = now; - } - - ++_inUse; - _workers.push_back(new SubscriberPoolWorker(this)); + if(_traceLevels->subscriberPool > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); + out << "detected stall: creating thread: threads: " << _workers.size(); + } + + // + // We'll now start stall checking at regular intervals if this + // is the first newly created worker. Here we need to + // initially set the stall check and the number of requests at + // this point. + // + if(_workers.size() == _size) + { + _lastStallCheck = now; + } + + ++_inUse; + _workers.push_back(new SubscriberPoolWorker(this)); } } @@ -512,14 +512,14 @@ SubscriberPool::invariants() list<SubscriberPtr>::const_iterator p; for(p = _subscribers.begin(); p != _subscribers.end(); ++p) { - assert(subs.find(*p) == subs.end()); - subs.insert(*p); + assert(subs.find(*p) == subs.end()); + subs.insert(*p); } subs.clear(); for(p = _pending.begin(); p != _pending.end(); ++p) { - assert(subs.find(*p) == subs.end()); - subs.insert(*p); + assert(subs.find(*p) == subs.end()); + subs.insert(*p); } return true; } |