diff options
author | Matthew Newhook <matthew@zeroc.com> | 2006-11-20 09:53:45 +0000 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2006-11-20 09:53:45 +0000 |
commit | 2ee5941662889fca72b75379844d04ba61865081 (patch) | |
tree | 386ae9824dbde1846a2725149741e0f442db569a /cpp/src/IceStorm/SubscriberPool.cpp | |
parent | Added shutdown state. (diff) | |
download | ice-2ee5941662889fca72b75379844d04ba61865081.tar.bz2 ice-2ee5941662889fca72b75379844d04ba61865081.tar.xz ice-2ee5941662889fca72b75379844d04ba61865081.zip |
Removed Shutdown state.
Diffstat (limited to 'cpp/src/IceStorm/SubscriberPool.cpp')
-rw-r--r-- | cpp/src/IceStorm/SubscriberPool.cpp | 68 |
1 files changed, 20 insertions, 48 deletions
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp index f118874295f..126a5b7fdd6 100644 --- a/cpp/src/IceStorm/SubscriberPool.cpp +++ b/cpp/src/IceStorm/SubscriberPool.cpp @@ -112,6 +112,10 @@ SubscriberPoolMonitor::run() { continue; } + if(_destroyed) + { + return; + } } else { @@ -153,7 +157,7 @@ SubscriberPoolMonitor::destroy() } SubscriberPool::SubscriberPool(const InstancePtr& instance) : - _instance(instance), + _traceLevels(instance->traceLevels()), _sizeMax(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.SizeMax", 0)), _sizeWarn(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.SizeWarn", 0)), _size(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.Size", 1)), @@ -178,7 +182,7 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) : catch(const IceUtil::Exception& ex) { { - Ice::Error out(_instance->traceLevels()->logger); + Ice::Error out(_traceLevels->logger); out << "SubscriberPool: " << ex; } destroy(); @@ -197,10 +201,6 @@ void SubscriberPool::flush(list<SubscriberPtr>& subscribers) { Lock sync(*this); - if(_destroyed) - { - return; - } // // Splice on the new set of subscribers to SubscriberPool. // @@ -213,10 +213,6 @@ void SubscriberPool::flush(const SubscriberPtr& subscriber) { Lock sync(*this); - if(_destroyed) - { - return; - } _pending.push_back(subscriber); assert(invariants()); notify(); @@ -226,10 +222,6 @@ void SubscriberPool::add(const SubscriberPtr& subscriber) { Lock sync(*this); - if(_destroyed) - { - return; - } _subscribers.push_back(subscriber); assert(invariants()); } @@ -238,10 +230,6 @@ void SubscriberPool::remove(const SubscriberPtr& subscriber) { Lock sync(*this); - if(_destroyed) - { - return; - } // // Note that this cannot remove based on the subscriber id because // the pool is TopicManager scoped and not topic scoped therefore @@ -261,12 +249,6 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: { Lock sync(*this); - if(_destroyed) - { - subscriber = 0; - return; - } - if(subscriber) { if(requeue) @@ -292,16 +274,15 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: // if(_sizeMax != 1) { - TraceLevelsPtr traceLevels = _instance->traceLevels(); // // Reap dead workers, if necessary. // if(_reap > 0) { - if(traceLevels->subscriberPool > 0) + if(_traceLevels->subscriberPool > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); out << "reaping: " << _reap << " workers"; } list<IceUtil::ThreadPtr>::iterator p = _workers.begin(); @@ -354,18 +335,18 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: } } - if(traceLevels->subscriberPool > 0) + if(_traceLevels->subscriberPool > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); out << "checking stalls. extra workers: " << _workers.size() - _size << " subscribers: " << _subscribers.size() << " stalls: " << stalls; } if((_workers.size() - _size) > stalls) { - if(traceLevels->subscriberPool > 0) + if(_traceLevels->subscriberPool > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); out << "destroying workers"; } ++_reap; @@ -423,8 +404,6 @@ SubscriberPool::destroy() // threads to unblock and terminate. We also copy the list of subscribers // for shutdown. No new subscribers can be added once _destroyed is set. // - std::list<SubscriberPtr> subscribers; - { Lock sync(*this); _destroyed = true; @@ -433,8 +412,6 @@ SubscriberPool::destroy() { _subscriberPoolMonitor->destroy(); } - subscribers = _subscribers; - _subscribers.clear(); } // // Next join with each worker. @@ -444,14 +421,6 @@ SubscriberPool::destroy() (*p)->getThreadControl().join(); } _workers.clear(); - - // - // Shutdown each subscriber. - // - for(list<SubscriberPtr>::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q) - { - (*q)->shutdown(); - } // // Once all of the workers have gone then we'll no longer have @@ -469,14 +438,17 @@ void SubscriberPool::check() { Lock sync(*this); + if(_destroyed) + { + return; + } - TraceLevelsPtr traceLevels = _instance->traceLevels(); IceUtil::Time now = IceUtil::Time::now(); IceUtil::Time interval = now - _lastDequeue; /* - if(traceLevels->subscriberPool > 1) + if(_traceLevels->subscriberPool > 1) { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); out << "check called: interval: " << interval << " timeout: " << _timeout << " pending: " << _pending.size() << " running: " << _workers.size() << " sizeMax: " << _sizeMax; @@ -485,9 +457,9 @@ SubscriberPool::check() if(interval > _timeout && _pending.size() > 0 && (_workers.size() < _sizeMax || _sizeMax == 0)) { - if(traceLevels->subscriberPool > 0) + if(_traceLevels->subscriberPool > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat); + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); out << "detected stall: creating thread: threads: " << _workers.size(); } |