diff options
Diffstat (limited to 'cpp/src/IceStorm/SubscriberPool.cpp')
-rw-r--r-- | cpp/src/IceStorm/SubscriberPool.cpp | 52 |
1 files changed, 43 insertions, 9 deletions
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp index ee3865bc74d..f118874295f 100644 --- a/cpp/src/IceStorm/SubscriberPool.cpp +++ b/cpp/src/IceStorm/SubscriberPool.cpp @@ -81,7 +81,7 @@ SubscriberPoolMonitor::SubscriberPoolMonitor(const SubscriberPoolPtr& manager, c _manager(manager), _timeout(timeout), _needCheck(false), - _destroy(false) + _destroyed(false) { start(); } @@ -97,7 +97,7 @@ SubscriberPoolMonitor::run() { { Lock sync(*this); - if(_destroy) + if(_destroyed) { return; } @@ -148,7 +148,7 @@ void SubscriberPoolMonitor::destroy() { Lock sync(*this); - _destroy = true; + _destroyed = true; notify(); } @@ -162,7 +162,7 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) : "IceStorm.SubscriberPool.Timeout", 1000), 50))), // 10 * the stall timeout. _stallCheck(_timeout * 10), - _destroy(false), + _destroyed(false), _reap(0) { try @@ -197,6 +197,10 @@ void SubscriberPool::flush(list<SubscriberPtr>& subscribers) { Lock sync(*this); + if(_destroyed) + { + return; + } // // Splice on the new set of subscribers to SubscriberPool. // @@ -209,6 +213,10 @@ void SubscriberPool::flush(const SubscriberPtr& subscriber) { Lock sync(*this); + if(_destroyed) + { + return; + } _pending.push_back(subscriber); assert(invariants()); notify(); @@ -218,6 +226,10 @@ void SubscriberPool::add(const SubscriberPtr& subscriber) { Lock sync(*this); + if(_destroyed) + { + return; + } _subscribers.push_back(subscriber); assert(invariants()); } @@ -226,6 +238,10 @@ void SubscriberPool::remove(const SubscriberPtr& subscriber) { Lock sync(*this); + if(_destroyed) + { + return; + } // // Note that this cannot remove based on the subscriber id because // the pool is TopicManager scoped and not topic scoped therefore @@ -245,6 +261,12 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: { Lock sync(*this); + if(_destroyed) + { + subscriber = 0; + return; + } + if(subscriber) { if(requeue) @@ -353,7 +375,7 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: } } - while(_pending.empty() && !_destroy) + while(_pending.empty() && !_destroyed) { // // If we wait then there is no need to monitor anymore. @@ -362,7 +384,7 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: wait(); } - if(_destroy) + if(_destroyed) { return; } @@ -398,18 +420,22 @@ SubscriberPool::destroy() { // // First mark the pool as destroyed. This causes all of the worker - // threads to unblock and terminate. + // threads to unblock and terminate. We also copy the list of subscribers + // for shutdown. No new subscribers can be added once _destroyed is set. // + std::list<SubscriberPtr> subscribers; + { Lock sync(*this); - _destroy = true; + _destroyed = true; notifyAll(); if(_subscriberPoolMonitor) { _subscriberPoolMonitor->destroy(); } + subscribers = _subscribers; + _subscribers.clear(); } - // // Next join with each worker. // @@ -420,6 +446,14 @@ SubscriberPool::destroy() _workers.clear(); // + // Shutdown each subscriber. + // + for(list<SubscriberPtr>::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q) + { + (*q)->shutdown(); + } + + // // Once all of the workers have gone then we'll no longer have // concurrent access to the pool monitor, so we can join with it // and then clear to remove the circular reference count. |