summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/SubscriberPool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/SubscriberPool.cpp')
-rw-r--r--cpp/src/IceStorm/SubscriberPool.cpp242
1 files changed, 171 insertions, 71 deletions
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp
index 6f42665af37..7943d3e1129 100644
--- a/cpp/src/IceStorm/SubscriberPool.cpp
+++ b/cpp/src/IceStorm/SubscriberPool.cpp
@@ -19,7 +19,7 @@
using namespace IceStorm;
using namespace std;
-namespace IceStorm
+namespace
{
class SubscriberPoolWorker : public IceUtil::Thread
@@ -39,22 +39,33 @@ public:
virtual void
run()
{
+ IceUtil::Time interval = IceUtil::Time::seconds(24 * 60); // A long time.
SubscriberPtr sub;
+ bool requeue = false;
+ bool computeInterval = false;
while(true)
{
- sub = _manager->dequeue(sub);
+ sub = _manager->dequeue(sub, requeue, interval, computeInterval);
if(!sub)
{
return;
}
+
//
- // If SubscriberPool returns true then the subscriber needs to be
- // SubscriberPooled again, so therefore we will re-enqueue the
- // subscriber in the call to dequeue.
+ // If SubscriberPool returns true then the subscriber
+ // needs to be SubscriberPooled again, so therefore we
+ // will re-enqueue the subscriber in the call to dequeue.
//
- if(!sub->flush())
+ if(computeInterval)
+ {
+ IceUtil::Time start = IceUtil::Time::now();
+ requeue = sub->flush();
+ interval = IceUtil::Time::now() - start;
+ }
+ else
{
- sub = 0;
+ requeue = sub->flush();
+ interval = IceUtil::Time::seconds(24 * 60); // A long time.
}
}
}
@@ -144,25 +155,23 @@ SubscriberPoolMonitor::destroy()
SubscriberPool::SubscriberPool(const InstancePtr& instance) :
_instance(instance),
_sizeMax(instance->properties()->getPropertyAsIntWithDefault(
- "IceStorm.SubscriberPool.SizeMax", -1)),
+ "IceStorm.SubscriberPool.SizeMax", 0)),
_sizeWarn(instance->properties()->getPropertyAsIntWithDefault(
"IceStorm.SubscriberPool.SizeWarn", 0)),
_size(instance->properties()->getPropertyAsIntWithDefault(
"IceStorm.SubscriberPool.Size", 1)),
_timeout(IceUtil::Time::milliSeconds(max(instance->properties()->getPropertyAsIntWithDefault(
"IceStorm.SubscriberPool.Timeout", 250), 50))), // minimum 50ms.
+ _stallCheck(_timeout * 10), // 10 * the stall timeout.
_destroy(false),
- _inUse(0),
- _running(0),
- _load(1.0)
+ _reap(0)
{
try
{
__setNoDelete(true);
_subscriberPoolMonitor = new SubscriberPoolMonitor(this, _timeout);
- for(int i = 0; i < _size; ++i)
+ for(unsigned int i = 0; i < _size; ++i)
{
- ++_running;
++_inUse;
_workers.push_back(new SubscriberPoolWorker(this));
}
@@ -186,9 +195,9 @@ SubscriberPool::~SubscriberPool()
}
void
-SubscriberPool::add(list<SubscriberPtr>& subscribers)
+SubscriberPool::flush(list<SubscriberPtr>& subscribers)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ Lock sync(*this);
//
// Splice on the new set of subscribers to SubscriberPool.
//
@@ -196,76 +205,150 @@ SubscriberPool::add(list<SubscriberPtr>& subscribers)
notifyAll();
}
+void
+SubscriberPool::add(const SubscriberPtr& subscriber)
+{
+ Lock sync(*this);
+ _subscribers.push_back(subscriber);
+}
+
+void
+SubscriberPool::remove(const SubscriberPtr& subscriber)
+{
+ Lock sync(*this);
+ //
+ // Note that this cannot remove based on the subscriber id because
+ // the pool is TopicManager scoped and not topic scoped therefore
+ // its quite possible to have two subscribers with the same id in
+ // the list.
+ //
+ list<SubscriberPtr>::iterator p = _subscribers.begin();
+ while(p != _subscribers.end())
+ {
+ if((*p).get() == subscriber.get())
+ {
+ _subscribers.erase(p);
+ return;
+ }
+ ++p;
+ }
+
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ Ice::Error err(traceLevels->logger);
+ err << "SubscriberPool: subscriber not found: " << _instance->communicator()->identityToString(subscriber->id());
+}
+
//
// The passed subscriber need to be enqueued again.
//
SubscriberPtr
-SubscriberPool::dequeue(const SubscriberPtr& sub)
+SubscriberPool::dequeue(const SubscriberPtr& sub, bool requeue, const IceUtil::Time& interval, bool& computeInterval)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ Lock sync(*this);
if(sub)
{
- _pending.push_back(sub);
+ if(requeue)
+ {
+ _pending.push_back(sub);
+ }
+ sub->flushTime(interval);
}
//
- // Now we check if this thread can be destroyed, based on a
- // load factor.
+ // The worker is no longer in use.
//
- // The load factor jumps immediately to the number of threads
- // that are currently in use, but decays exponentially if the
- // number of threads in use is smaller than the load
- // factor. This reflects that we create threads immediately
- // when they are needed, but want the number of threads to
- // slowly decline to the configured minimum.
- //
- double inUse = static_cast<double>(_inUse);
- if(_load < inUse)
- {
- _load = inUse;
- }
- else
- {
- const double loadFactor = 0.05; // TODO: Configurable?
- const double oneMinusLoadFactor = 1 - loadFactor;
- _load = _load * oneMinusLoadFactor + inUse * loadFactor;
- }
+ --_inUse;
- if(_running > _size)
+ //
+ // If _sizeMax is 1 we never spawn up new threads if a stall is
+ // detected.
+ //
+ if(_sizeMax != 1)
{
TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->subscriberPool > 1)
- {
- IceUtil::Time interval = IceUtil::Time::now() - _lastNext;
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
- out << "load check: " << _load;
- }
- int load = static_cast<int>(_load + 0.5);
-
- if(load < _running)
+ //
+ // Reap dead workers, if necessary.
+ //
+ if(_reap > 0)
{
- assert(_inUse > 0);
- --_inUse;
-
- assert(_running > 0);
- --_running;
-
if(traceLevels->subscriberPool > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
- out << "reducing SubscriberPool threads: load: " << _load << " threads: " << _running;
+ out << "reaping: " << _reap << " workers";
+ }
+ list<IceUtil::ThreadPtr>::iterator p = _workers.begin();
+ while(p != _workers.end() && _reap > 0)
+ {
+ if(!(*p)->isAlive())
+ {
+ (*p)->getThreadControl().join();
+ p = _workers.erase(p);
+ --_reap;
+ }
+ else
+ {
+ ++p;
+ }
}
+ }
- return 0;
+ //
+ // If we have extra workers every _stallCheck period we run
+ // through the complete set of subscribers and determine how
+ // many have stalled since the last check. If this number is
+ // less than the number of extra threads then we terminate the
+ // calling worker.
+ //
+ // - The flush time is protected by the subscriber pool mutex.
+ // - The flush time is only computed if we have extra threads,
+ // otherwise it is set to some large value.
+ // - The max flush time is reset to the next sending interval
+ // after after _stallCheck period.
+ // - Every subscriber is considered to be stalled iff it has
+ // never sent an event or we have just created the first
+ // additional worker. The first handles the case where a
+ // subscriber stalls for a long time on the first message
+ // send. The second means that we can disable computation of
+ // the flush latency if there are no additional threads.
+ //
+ if(_workers.size() > _size)
+ {
+ IceUtil::Time now = IceUtil::Time::now();
+ if(now - _lastStallCheck > _stallCheck)
+ {
+ _lastStallCheck = now;
+ unsigned int stalls = 0;
+ for(list<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ {
+ if((*p)->pollMaxFlushTime(now) > _timeout)
+ {
+ ++stalls;
+ }
+ }
+
+ if(traceLevels->subscriberPool > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
+ out << "checking stalls. extra workers: " << _workers.size() - _size
+ << " subscribers: " << _subscribers.size() << " stalls: " << stalls;
+ }
+
+ if((_workers.size() - _size) > stalls)
+ {
+ if(traceLevels->subscriberPool > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
+ out << "destroying workers";
+ }
+ ++_reap;
+ return 0;
+ }
+ }
}
-
}
-
- assert(_inUse > 0);
- --_inUse;
-
+
while(_pending.empty() && !_destroy)
{
//
@@ -277,11 +360,10 @@ SubscriberPool::dequeue(const SubscriberPtr& sub)
if(_destroy)
{
- --_running;
return 0;
}
- _lastNext = IceUtil::Time::now();
+ _lastDequeue = IceUtil::Time::now();
SubscriberPtr subscriber = _pending.front();
_pending.pop_front();
@@ -292,7 +374,7 @@ SubscriberPool::dequeue(const SubscriberPtr& sub)
// If all threads are now in use then we need to start the
// monitoring, otherwise we don't need to monitor.
//
- if(_inUse == _running && (_running < _sizeMax || _sizeMax == -1))
+ if(_inUse == _workers.size() && (_workers.size() < _sizeMax || _sizeMax != 1))
{
_subscriberPoolMonitor->startMonitor();
}
@@ -300,6 +382,11 @@ SubscriberPool::dequeue(const SubscriberPtr& sub)
{
_subscriberPoolMonitor->stopMonitor();
}
+ //
+ // We only need to compute the push interval if we've created
+ // stall threads.
+ //
+ computeInterval = (_workers.size() - _size) > 0;
return subscriber;
}
@@ -311,7 +398,7 @@ SubscriberPool::destroy()
// threads to unblock and terminate.
//
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ Lock sync(*this);
_destroy = true;
notifyAll();
if(_subscriberPoolMonitor)
@@ -347,24 +434,37 @@ SubscriberPool::check()
Lock sync(*this);
TraceLevelsPtr traceLevels = _instance->traceLevels();
- IceUtil::Time interval = IceUtil::Time::now() - _lastNext;
+ IceUtil::Time now = IceUtil::Time::now();
+ IceUtil::Time interval = now - _lastDequeue;
+/*
if(traceLevels->subscriberPool > 1)
{
Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
out << "check called: interval: " << interval << " timeout: " << _timeout
- << " pending: " << _pending.size() << " running: " << _running
+ << " pending: " << _pending.size() << " running: " << _workers.size()
<< " sizeMax: " << _sizeMax;
}
-
- if(interval > _timeout && _pending.size() > 0 && (_running < _sizeMax || _sizeMax == -1))
+*/
+
+ if(interval > _timeout && _pending.size() > 0 && (_workers.size() < _sizeMax || _sizeMax == 0))
{
if(traceLevels->subscriberPool > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->subscriberPoolCat);
- out << "detected stall: creating thread: load: " << _load << " threads: " << _running;
+ out << "detected stall: creating thread: threads: " << _workers.size();
}
- ++_running;
+ //
+ // We'll now start stall checking at regular intervals if this
+ // is the first newly created worker. Here we need to
+ // initially set the stall check and the number of requests at
+ // this point.
+ //
+ if(_workers.size() == _size)
+ {
+ _lastStallCheck = now;
+ }
+
++_inUse;
_workers.push_back(new SubscriberPoolWorker(this));
}