summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/SubscriberPool.cpp
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2006-11-20 02:09:09 +0000
committerMatthew Newhook <matthew@zeroc.com>2006-11-20 02:09:09 +0000
commitd994d0d5549aa90a50559888a2c29f6b709ce70d (patch)
tree09a78e7f6bc71af7d2d99d3d9f0abaed0f991853 /cpp/src/IceStorm/SubscriberPool.cpp
parentminor fix (diff)
downloadice-d994d0d5549aa90a50559888a2c29f6b709ce70d.tar.bz2
ice-d994d0d5549aa90a50559888a2c29f6b709ce70d.tar.xz
ice-d994d0d5549aa90a50559888a2c29f6b709ce70d.zip
Added shutdown state.
Diffstat (limited to 'cpp/src/IceStorm/SubscriberPool.cpp')
-rw-r--r--cpp/src/IceStorm/SubscriberPool.cpp52
1 files changed, 43 insertions, 9 deletions
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp
index ee3865bc74d..f118874295f 100644
--- a/cpp/src/IceStorm/SubscriberPool.cpp
+++ b/cpp/src/IceStorm/SubscriberPool.cpp
@@ -81,7 +81,7 @@ SubscriberPoolMonitor::SubscriberPoolMonitor(const SubscriberPoolPtr& manager, c
_manager(manager),
_timeout(timeout),
_needCheck(false),
- _destroy(false)
+ _destroyed(false)
{
start();
}
@@ -97,7 +97,7 @@ SubscriberPoolMonitor::run()
{
{
Lock sync(*this);
- if(_destroy)
+ if(_destroyed)
{
return;
}
@@ -148,7 +148,7 @@ void
SubscriberPoolMonitor::destroy()
{
Lock sync(*this);
- _destroy = true;
+ _destroyed = true;
notify();
}
@@ -162,7 +162,7 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) :
"IceStorm.SubscriberPool.Timeout", 1000), 50))),
// 10 * the stall timeout.
_stallCheck(_timeout * 10),
- _destroy(false),
+ _destroyed(false),
_reap(0)
{
try
@@ -197,6 +197,10 @@ void
SubscriberPool::flush(list<SubscriberPtr>& subscribers)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
//
// Splice on the new set of subscribers to SubscriberPool.
//
@@ -209,6 +213,10 @@ void
SubscriberPool::flush(const SubscriberPtr& subscriber)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
_pending.push_back(subscriber);
assert(invariants());
notify();
@@ -218,6 +226,10 @@ void
SubscriberPool::add(const SubscriberPtr& subscriber)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
_subscribers.push_back(subscriber);
assert(invariants());
}
@@ -226,6 +238,10 @@ void
SubscriberPool::remove(const SubscriberPtr& subscriber)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
//
// Note that this cannot remove based on the subscriber id because
// the pool is TopicManager scoped and not topic scoped therefore
@@ -245,6 +261,12 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ subscriber = 0;
+ return;
+ }
+
if(subscriber)
{
if(requeue)
@@ -353,7 +375,7 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
}
}
- while(_pending.empty() && !_destroy)
+ while(_pending.empty() && !_destroyed)
{
//
// If we wait then there is no need to monitor anymore.
@@ -362,7 +384,7 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
wait();
}
- if(_destroy)
+ if(_destroyed)
{
return;
}
@@ -398,18 +420,22 @@ SubscriberPool::destroy()
{
//
// First mark the pool as destroyed. This causes all of the worker
- // threads to unblock and terminate.
+ // threads to unblock and terminate. We also copy the list of subscribers
+ // for shutdown. No new subscribers can be added once _destroyed is set.
//
+ std::list<SubscriberPtr> subscribers;
+
{
Lock sync(*this);
- _destroy = true;
+ _destroyed = true;
notifyAll();
if(_subscriberPoolMonitor)
{
_subscriberPoolMonitor->destroy();
}
+ subscribers = _subscribers;
+ _subscribers.clear();
}
-
//
// Next join with each worker.
//
@@ -420,6 +446,14 @@ SubscriberPool::destroy()
_workers.clear();
//
+ // Shutdown each subscriber.
+ //
+ for(list<SubscriberPtr>::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q)
+ {
+ (*q)->shutdown();
+ }
+
+ //
// Once all of the workers have gone then we'll no longer have
// concurrent access to the pool monitor, so we can join with it
// and then clear to remove the circular reference count.