summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/SubscriberPool.cpp
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2006-11-20 09:53:45 +0000
committerMatthew Newhook <matthew@zeroc.com>2006-11-20 09:53:45 +0000
commit2ee5941662889fca72b75379844d04ba61865081 (patch)
tree386ae9824dbde1846a2725149741e0f442db569a /cpp/src/IceStorm/SubscriberPool.cpp
parentAdded shutdown state. (diff)
downloadice-2ee5941662889fca72b75379844d04ba61865081.tar.bz2
ice-2ee5941662889fca72b75379844d04ba61865081.tar.xz
ice-2ee5941662889fca72b75379844d04ba61865081.zip
Removed Shutdown state.
Diffstat (limited to 'cpp/src/IceStorm/SubscriberPool.cpp')
-rw-r--r--cpp/src/IceStorm/SubscriberPool.cpp68
1 files changed, 20 insertions, 48 deletions
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp
index f118874295f..126a5b7fdd6 100644
--- a/cpp/src/IceStorm/SubscriberPool.cpp
+++ b/cpp/src/IceStorm/SubscriberPool.cpp
@@ -112,6 +112,10 @@ SubscriberPoolMonitor::run()
{
continue;
}
+ if(_destroyed)
+ {
+ return;
+ }
}
else
{
@@ -153,7 +157,7 @@ SubscriberPoolMonitor::destroy()
}
SubscriberPool::SubscriberPool(const InstancePtr& instance) :
- _instance(instance),
+ _traceLevels(instance->traceLevels()),
_sizeMax(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.SizeMax", 0)),
_sizeWarn(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.SizeWarn", 0)),
_size(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.Size", 1)),
@@ -178,7 +182,7 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) :
catch(const IceUtil::Exception& ex)
{
{
- Ice::Error out(_instance->traceLevels()->logger);
+ Ice::Error out(_traceLevels->logger);
out << "SubscriberPool: " << ex;
}
destroy();
@@ -197,10 +201,6 @@ void
SubscriberPool::flush(list<SubscriberPtr>& subscribers)
{
Lock sync(*this);
- if(_destroyed)
- {
- return;
- }
//
// Splice on the new set of subscribers to SubscriberPool.
//
@@ -213,10 +213,6 @@ void
SubscriberPool::flush(const SubscriberPtr& subscriber)
{
Lock sync(*this);
- if(_destroyed)
- {
- return;
- }
_pending.push_back(subscriber);
assert(invariants());
notify();
@@ -226,10 +222,6 @@ void
SubscriberPool::add(const SubscriberPtr& subscriber)
{
Lock sync(*this);
- if(_destroyed)
- {
- return;
- }
_subscribers.push_back(subscriber);
assert(invariants());
}
@@ -238,10 +230,6 @@ void
SubscriberPool::remove(const SubscriberPtr& subscriber)
{
Lock sync(*this);
- if(_destroyed)
- {
- return;
- }
//
// Note that this cannot remove based on the subscriber id because
// the pool is TopicManager scoped and not topic scoped therefore
@@ -261,12 +249,6 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
{
Lock sync(*this);
- if(_destroyed)
- {
- subscriber = 0;
- return;
- }
-
if(subscriber)
{
if(requeue)
@@ -292,16 +274,15 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
//
if(_sizeMax != 1)
{
- TraceLevelsPtr traceLevels = _instance->traceLevels();
//
// Reap dead workers, if necessary.
//
if(_reap > 0)
{
- if(traceLevels->subscriberPool > 0)
+ if(_traceLevels->subscriberPool > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
out << "reaping: " << _reap << " workers";
}
list<IceUtil::ThreadPtr>::iterator p = _workers.begin();
@@ -354,18 +335,18 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
}
}
- if(traceLevels->subscriberPool > 0)
+ if(_traceLevels->subscriberPool > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
out << "checking stalls. extra workers: " << _workers.size() - _size
<< " subscribers: " << _subscribers.size() << " stalls: " << stalls;
}
if((_workers.size() - _size) > stalls)
{
- if(traceLevels->subscriberPool > 0)
+ if(_traceLevels->subscriberPool > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
out << "destroying workers";
}
++_reap;
@@ -423,8 +404,6 @@ SubscriberPool::destroy()
// threads to unblock and terminate. We also copy the list of subscribers
// for shutdown. No new subscribers can be added once _destroyed is set.
//
- std::list<SubscriberPtr> subscribers;
-
{
Lock sync(*this);
_destroyed = true;
@@ -433,8 +412,6 @@ SubscriberPool::destroy()
{
_subscriberPoolMonitor->destroy();
}
- subscribers = _subscribers;
- _subscribers.clear();
}
//
// Next join with each worker.
@@ -444,14 +421,6 @@ SubscriberPool::destroy()
(*p)->getThreadControl().join();
}
_workers.clear();
-
- //
- // Shutdown each subscriber.
- //
- for(list<SubscriberPtr>::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q)
- {
- (*q)->shutdown();
- }
//
// Once all of the workers have gone then we'll no longer have
@@ -469,14 +438,17 @@ void
SubscriberPool::check()
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
- TraceLevelsPtr traceLevels = _instance->traceLevels();
IceUtil::Time now = IceUtil::Time::now();
IceUtil::Time interval = now - _lastDequeue;
/*
- if(traceLevels->subscriberPool > 1)
+ if(_traceLevels->subscriberPool > 1)
{
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
out << "check called: interval: " << interval << " timeout: " << _timeout
<< " pending: " << _pending.size() << " running: " << _workers.size()
<< " sizeMax: " << _sizeMax;
@@ -485,9 +457,9 @@ SubscriberPool::check()
if(interval > _timeout && _pending.size() > 0 && (_workers.size() < _sizeMax || _sizeMax == 0))
{
- if(traceLevels->subscriberPool > 0)
+ if(_traceLevels->subscriberPool > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
out << "detected stall: creating thread: threads: " << _workers.size();
}