summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/SubscriberPool.cpp
diff options
context:
space:
mode:
authorBernard Normier <bernard@zeroc.com>2007-02-01 17:09:49 +0000
committerBernard Normier <bernard@zeroc.com>2007-02-01 17:09:49 +0000
commitabada90e3f84dc703b8ddc9efcbed8a946fadead (patch)
tree2c6f9dccd510ea97cb927a7bd635422efaae547a /cpp/src/IceStorm/SubscriberPool.cpp
parentremoving trace message (diff)
downloadice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.bz2
ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.xz
ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.zip
Expanded tabs into spaces
Diffstat (limited to 'cpp/src/IceStorm/SubscriberPool.cpp')
-rw-r--r--cpp/src/IceStorm/SubscriberPool.cpp422
1 files changed, 211 insertions, 211 deletions
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp
index 42c92fd97cf..1d1fd773028 100644
--- a/cpp/src/IceStorm/SubscriberPool.cpp
+++ b/cpp/src/IceStorm/SubscriberPool.cpp
@@ -27,9 +27,9 @@ class SubscriberPoolWorker : public IceUtil::Thread
public:
SubscriberPoolWorker(const SubscriberPoolPtr& manager) :
- _manager(manager)
+ _manager(manager)
{
- start();
+ start();
}
~SubscriberPoolWorker()
@@ -39,35 +39,35 @@ public:
virtual void
run()
{
- IceUtil::Time interval = IceUtil::Time::seconds(24 * 60); // A long time.
- SubscriberPtr sub;
- bool requeue = false;
- bool computeInterval = false;
- while(true)
- {
- _manager->dequeue(sub, requeue, interval, computeInterval);
- if(!sub)
- {
- return;
- }
-
- //
- // If SubscriberPool returns true then the subscriber
- // needs to be SubscriberPooled again, so therefore we
- // will re-enqueue the subscriber in the call to dequeue.
- //
- if(computeInterval)
- {
- IceUtil::Time start = IceUtil::Time::now();
- requeue = sub->flush();
- interval = IceUtil::Time::now() - start;
- }
- else
- {
- requeue = sub->flush();
- interval = IceUtil::Time::seconds(24 * 60); // A long time.
- }
- }
+ IceUtil::Time interval = IceUtil::Time::seconds(24 * 60); // A long time.
+ SubscriberPtr sub;
+ bool requeue = false;
+ bool computeInterval = false;
+ while(true)
+ {
+ _manager->dequeue(sub, requeue, interval, computeInterval);
+ if(!sub)
+ {
+ return;
+ }
+
+ //
+ // If SubscriberPool returns true then the subscriber
+ // needs to be SubscriberPooled again, so therefore we
+ // will re-enqueue the subscriber in the call to dequeue.
+ //
+ if(computeInterval)
+ {
+ IceUtil::Time start = IceUtil::Time::now();
+ requeue = sub->flush();
+ interval = IceUtil::Time::now() - start;
+ }
+ else
+ {
+ requeue = sub->flush();
+ interval = IceUtil::Time::seconds(24 * 60); // A long time.
+ }
+ }
}
private:
@@ -96,32 +96,32 @@ SubscriberPoolMonitor::run()
for(;;)
{
{
- Lock sync(*this);
- if(_destroyed)
- {
- return;
- }
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
- if(_needCheck)
- {
- timedWait(_timeout);
- //
- // Monitoring was stopped.
- //
- if(!_needCheck)
- {
- continue;
- }
- if(_destroyed)
- {
- return;
- }
- }
- else
- {
- wait();
- continue;
- }
+ if(_needCheck)
+ {
+ timedWait(_timeout);
+ //
+ // Monitoring was stopped.
+ //
+ if(!_needCheck)
+ {
+ continue;
+ }
+ if(_destroyed)
+ {
+ return;
+ }
+ }
+ else
+ {
+ wait();
+ continue;
+ }
}
//
// Call outside of the lock to prevent any deadlocks.
@@ -163,7 +163,7 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) :
_size(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.Size", 1)),
// minimum 50ms, default 1s.
_timeout(IceUtil::Time::milliSeconds(max(instance->properties()->getPropertyAsIntWithDefault(
- "IceStorm.SubscriberPool.Timeout", 1000), 50))),
+ "IceStorm.SubscriberPool.Timeout", 1000), 50))),
// 10 * the stall timeout.
_stallCheck(_timeout * 10),
_destroyed(false),
@@ -171,23 +171,23 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) :
{
try
{
- __setNoDelete(true);
- _subscriberPoolMonitor = new SubscriberPoolMonitor(this, _timeout);
- for(unsigned int i = 0; i < _size; ++i)
- {
- ++_inUse;
- _workers.push_back(new SubscriberPoolWorker(this));
- }
+ __setNoDelete(true);
+ _subscriberPoolMonitor = new SubscriberPoolMonitor(this, _timeout);
+ for(unsigned int i = 0; i < _size; ++i)
+ {
+ ++_inUse;
+ _workers.push_back(new SubscriberPoolWorker(this));
+ }
}
catch(const IceUtil::Exception& ex)
{
- {
- Ice::Error out(_traceLevels->logger);
- out << "SubscriberPool: " << ex;
- }
- destroy();
- __setNoDelete(false);
- throw;
+ {
+ Ice::Error out(_traceLevels->logger);
+ out << "SubscriberPool: " << ex;
+ }
+ destroy();
+ __setNoDelete(false);
+ throw;
}
__setNoDelete(false);
@@ -203,7 +203,7 @@ SubscriberPool::flush(list<SubscriberPtr>& subscribers)
Lock sync(*this);
if(_destroyed)
{
- return;
+ return;
}
//
// Splice on the new set of subscribers to SubscriberPool.
@@ -219,7 +219,7 @@ SubscriberPool::flush(const SubscriberPtr& subscriber)
Lock sync(*this);
if(_destroyed)
{
- return;
+ return;
}
_pending.push_back(subscriber);
assert(invariants());
@@ -232,7 +232,7 @@ SubscriberPool::add(const SubscriberPtr& subscriber)
Lock sync(*this);
if(_destroyed)
{
- return;
+ return;
}
_subscribers.push_back(subscriber);
assert(invariants());
@@ -244,7 +244,7 @@ SubscriberPool::remove(const SubscriberPtr& subscriber)
Lock sync(*this);
if(_destroyed)
{
- return;
+ return;
}
//
// Note that this cannot remove based on the subscriber id because
@@ -267,18 +267,18 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
if(_destroyed)
{
- subscriber = 0;
- return;
+ subscriber = 0;
+ return;
}
if(subscriber)
{
- if(requeue)
- {
- _pending.push_back(subscriber);
- assert(invariants());
- }
- subscriber->flushTime(interval);
+ if(requeue)
+ {
+ _pending.push_back(subscriber);
+ assert(invariants());
+ }
+ subscriber->flushTime(interval);
}
//
// Clear the reference.
@@ -297,99 +297,99 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
if(_sizeMax != 1)
{
- //
- // Reap dead workers, if necessary.
- //
- if(_reap > 0)
- {
- if(_traceLevels->subscriberPool > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
- out << "reaping: " << _reap << " workers";
- }
- list<IceUtil::ThreadPtr>::iterator p = _workers.begin();
- while(p != _workers.end() && _reap > 0)
- {
- if(!(*p)->isAlive())
- {
- (*p)->getThreadControl().join();
- p = _workers.erase(p);
- --_reap;
- }
- else
- {
- ++p;
- }
- }
- }
-
- //
- // If we have extra workers every _stallCheck period we run
- // through the complete set of subscribers and determine how
- // many have stalled since the last check. If this number is
- // less than the number of extra threads then we terminate the
- // calling worker.
- //
- // - The flush time is protected by the subscriber pool mutex.
- // - The flush time is only computed if we have extra threads,
- // otherwise it is set to some large value.
- // - The max flush time is reset to the next sending interval
- // after after _stallCheck period.
- // - Every subscriber is considered to be stalled iff it has
- // never sent an event or we have just created the first
- // additional worker. The first handles the case where a
- // subscriber stalls for a long time on the first message
- // send. The second means that we can disable computation of
- // the flush latency if there are no additional threads.
- //
- if(_workers.size() > _size)
- {
- IceUtil::Time now = IceUtil::Time::now();
- if(now - _lastStallCheck > _stallCheck)
- {
- _lastStallCheck = now;
- unsigned int stalls = 0;
- for(list<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
- {
- if((*p)->pollMaxFlushTime(now) > _timeout)
- {
- ++stalls;
- }
- }
-
- if(_traceLevels->subscriberPool > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
- out << "checking stalls. extra workers: " << _workers.size() - _size
- << " subscribers: " << _subscribers.size() << " stalls: " << stalls;
- }
-
- if((_workers.size() - _size) > stalls)
- {
- if(_traceLevels->subscriberPool > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
- out << "destroying workers";
- }
- ++_reap;
- return;
- }
- }
- }
+ //
+ // Reap dead workers, if necessary.
+ //
+ if(_reap > 0)
+ {
+ if(_traceLevels->subscriberPool > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
+ out << "reaping: " << _reap << " workers";
+ }
+ list<IceUtil::ThreadPtr>::iterator p = _workers.begin();
+ while(p != _workers.end() && _reap > 0)
+ {
+ if(!(*p)->isAlive())
+ {
+ (*p)->getThreadControl().join();
+ p = _workers.erase(p);
+ --_reap;
+ }
+ else
+ {
+ ++p;
+ }
+ }
+ }
+
+ //
+ // If we have extra workers every _stallCheck period we run
+ // through the complete set of subscribers and determine how
+ // many have stalled since the last check. If this number is
+ // less than the number of extra threads then we terminate the
+ // calling worker.
+ //
+ // - The flush time is protected by the subscriber pool mutex.
+ // - The flush time is only computed if we have extra threads,
+ // otherwise it is set to some large value.
+ // - The max flush time is reset to the next sending interval
+ // after after _stallCheck period.
+ // - Every subscriber is considered to be stalled iff it has
+ // never sent an event or we have just created the first
+ // additional worker. The first handles the case where a
+ // subscriber stalls for a long time on the first message
+ // send. The second means that we can disable computation of
+ // the flush latency if there are no additional threads.
+ //
+ if(_workers.size() > _size)
+ {
+ IceUtil::Time now = IceUtil::Time::now();
+ if(now - _lastStallCheck > _stallCheck)
+ {
+ _lastStallCheck = now;
+ unsigned int stalls = 0;
+ for(list<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ {
+ if((*p)->pollMaxFlushTime(now) > _timeout)
+ {
+ ++stalls;
+ }
+ }
+
+ if(_traceLevels->subscriberPool > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
+ out << "checking stalls. extra workers: " << _workers.size() - _size
+ << " subscribers: " << _subscribers.size() << " stalls: " << stalls;
+ }
+
+ if((_workers.size() - _size) > stalls)
+ {
+ if(_traceLevels->subscriberPool > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
+ out << "destroying workers";
+ }
+ ++_reap;
+ return;
+ }
+ }
+ }
}
-
+
while(_pending.empty() && !_destroyed)
{
- //
- // If we wait then there is no need to monitor anymore.
- //
- _subscriberPoolMonitor->stopMonitor();
- wait();
+ //
+ // If we wait then there is no need to monitor anymore.
+ //
+ _subscriberPoolMonitor->stopMonitor();
+ wait();
}
if(_destroyed)
{
- return;
+ return;
}
_lastDequeue = IceUtil::Time::now();
@@ -405,11 +405,11 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
//
if(_inUse == _workers.size() && (_workers.size() < _sizeMax || _sizeMax != 1))
{
- _subscriberPoolMonitor->startMonitor();
+ _subscriberPoolMonitor->startMonitor();
}
else
{
- _subscriberPoolMonitor->stopMonitor();
+ _subscriberPoolMonitor->stopMonitor();
}
//
// We only need to compute the push interval if we've created
@@ -429,22 +429,22 @@ SubscriberPool::destroy()
// _destroyed is set.
//
{
- Lock sync(*this);
- _destroyed = true;
- notifyAll();
- if(_subscriberPoolMonitor)
- {
- _subscriberPoolMonitor->destroy();
- }
- _subscribers.clear();
- _pending.clear();
+ Lock sync(*this);
+ _destroyed = true;
+ notifyAll();
+ if(_subscriberPoolMonitor)
+ {
+ _subscriberPoolMonitor->destroy();
+ }
+ _subscribers.clear();
+ _pending.clear();
}
//
// Next join with each worker.
//
for(list<IceUtil::ThreadPtr>::const_iterator p = _workers.begin(); p != _workers.end(); ++p)
{
- (*p)->getThreadControl().join();
+ (*p)->getThreadControl().join();
}
_workers.clear();
@@ -455,8 +455,8 @@ SubscriberPool::destroy()
//
if(_subscriberPoolMonitor)
{
- _subscriberPoolMonitor->getThreadControl().join();
- _subscriberPoolMonitor = 0;
+ _subscriberPoolMonitor->getThreadControl().join();
+ _subscriberPoolMonitor = 0;
}
}
@@ -466,7 +466,7 @@ SubscriberPool::check()
Lock sync(*this);
if(_destroyed)
{
- return;
+ return;
}
IceUtil::Time now = IceUtil::Time::now();
@@ -474,34 +474,34 @@ SubscriberPool::check()
/*
if(_traceLevels->subscriberPool > 1)
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
- out << "check called: interval: " << interval << " timeout: " << _timeout
- << " pending: " << _pending.size() << " running: " << _workers.size()
- << " sizeMax: " << _sizeMax;
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
+ out << "check called: interval: " << interval << " timeout: " << _timeout
+ << " pending: " << _pending.size() << " running: " << _workers.size()
+ << " sizeMax: " << _sizeMax;
}
*/
if(interval > _timeout && _pending.size() > 0 && (_workers.size() < _sizeMax || _sizeMax == 0))
{
- if(_traceLevels->subscriberPool > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
- out << "detected stall: creating thread: threads: " << _workers.size();
- }
-
- //
- // We'll now start stall checking at regular intervals if this
- // is the first newly created worker. Here we need to
- // initially set the stall check and the number of requests at
- // this point.
- //
- if(_workers.size() == _size)
- {
- _lastStallCheck = now;
- }
-
- ++_inUse;
- _workers.push_back(new SubscriberPoolWorker(this));
+ if(_traceLevels->subscriberPool > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
+ out << "detected stall: creating thread: threads: " << _workers.size();
+ }
+
+ //
+ // We'll now start stall checking at regular intervals if this
+ // is the first newly created worker. Here we need to
+ // initially set the stall check and the number of requests at
+ // this point.
+ //
+ if(_workers.size() == _size)
+ {
+ _lastStallCheck = now;
+ }
+
+ ++_inUse;
+ _workers.push_back(new SubscriberPoolWorker(this));
}
}
@@ -512,14 +512,14 @@ SubscriberPool::invariants()
list<SubscriberPtr>::const_iterator p;
for(p = _subscribers.begin(); p != _subscribers.end(); ++p)
{
- assert(subs.find(*p) == subs.end());
- subs.insert(*p);
+ assert(subs.find(*p) == subs.end());
+ subs.insert(*p);
}
subs.clear();
for(p = _pending.begin(); p != _pending.end(); ++p)
{
- assert(subs.find(*p) == subs.end());
- subs.insert(*p);
+ assert(subs.find(*p) == subs.end());
+ subs.insert(*p);
}
return true;
}