summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/SubscriberPool.cpp
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2006-11-16 06:44:20 +0000
committerMatthew Newhook <matthew@zeroc.com>2006-11-16 06:44:20 +0000
commit1cf08b7d8947437614e65619e17f9853898a7f85 (patch)
treeb80eef7a29df082b7a90ae8e02c85fabb6e58012 /cpp/src/IceStorm/SubscriberPool.cpp
parentRemove debug statements (diff)
downloadice-1cf08b7d8947437614e65619e17f9853898a7f85.tar.bz2
ice-1cf08b7d8947437614e65619e17f9853898a7f85.tar.xz
ice-1cf08b7d8947437614e65619e17f9853898a7f85.zip
Removed upstream pinging from IceStorm.
Diffstat (limited to 'cpp/src/IceStorm/SubscriberPool.cpp')
-rw-r--r--cpp/src/IceStorm/SubscriberPool.cpp78
1 files changed, 46 insertions, 32 deletions
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp
index 4f1e568583c..ee3865bc74d 100644
--- a/cpp/src/IceStorm/SubscriberPool.cpp
+++ b/cpp/src/IceStorm/SubscriberPool.cpp
@@ -45,7 +45,7 @@ public:
bool computeInterval = false;
while(true)
{
- sub = _manager->dequeue(sub, requeue, interval, computeInterval);
+ _manager->dequeue(sub, requeue, interval, computeInterval);
if(!sub)
{
return;
@@ -154,15 +154,14 @@ SubscriberPoolMonitor::destroy()
SubscriberPool::SubscriberPool(const InstancePtr& instance) :
_instance(instance),
- _sizeMax(instance->properties()->getPropertyAsIntWithDefault(
- "IceStorm.SubscriberPool.SizeMax", 0)),
- _sizeWarn(instance->properties()->getPropertyAsIntWithDefault(
- "IceStorm.SubscriberPool.SizeWarn", 0)),
- _size(instance->properties()->getPropertyAsIntWithDefault(
- "IceStorm.SubscriberPool.Size", 1)),
+ _sizeMax(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.SizeMax", 0)),
+ _sizeWarn(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.SizeWarn", 0)),
+ _size(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.Size", 1)),
+ // minimum 50ms, default 1s.
_timeout(IceUtil::Time::milliSeconds(max(instance->properties()->getPropertyAsIntWithDefault(
- "IceStorm.SubscriberPool.Timeout", 250), 50))), // minimum 50ms.
- _stallCheck(_timeout * 10), // 10 * the stall timeout.
+ "IceStorm.SubscriberPool.Timeout", 1000), 50))),
+ // 10 * the stall timeout.
+ _stallCheck(_timeout * 10),
_destroy(false),
_reap(0)
{
@@ -202,6 +201,7 @@ SubscriberPool::flush(list<SubscriberPtr>& subscribers)
// Splice on the new set of subscribers to SubscriberPool.
//
_pending.splice(_pending.end(), subscribers);
+ assert(invariants());
notifyAll();
}
@@ -210,6 +210,7 @@ SubscriberPool::flush(const SubscriberPtr& subscriber)
{
Lock sync(*this);
_pending.push_back(subscriber);
+ assert(invariants());
notify();
}
@@ -218,6 +219,7 @@ SubscriberPool::add(const SubscriberPtr& subscriber)
{
Lock sync(*this);
_subscribers.push_back(subscriber);
+ assert(invariants());
}
void
@@ -230,38 +232,32 @@ SubscriberPool::remove(const SubscriberPtr& subscriber)
// its quite possible to have two subscribers with the same id in
// the list.
//
- list<SubscriberPtr>::iterator p = _subscribers.begin();
- while(p != _subscribers.end())
- {
- if((*p).get() == subscriber.get())
- {
- _subscribers.erase(p);
- return;
- }
- ++p;
- }
-
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- Ice::Error err(traceLevels->logger);
- err << "SubscriberPool: subscriber not found: " << _instance->communicator()->identityToString(subscriber->id());
+ list<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), subscriber);
+ assert(p != _subscribers.end());
+ _subscribers.erase(p);
}
//
// The passed subscriber need to be enqueued again.
//
-SubscriberPtr
-SubscriberPool::dequeue(const SubscriberPtr& sub, bool requeue, const IceUtil::Time& interval, bool& computeInterval)
+void
+SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::Time& interval, bool& computeInterval)
{
Lock sync(*this);
- if(sub)
+ if(subscriber)
{
if(requeue)
{
- _pending.push_back(sub);
+ _pending.push_back(subscriber);
+ assert(invariants());
}
- sub->flushTime(interval);
+ subscriber->flushTime(interval);
}
+ //
+ // Clear the reference.
+ //
+ subscriber = 0;
//
// The worker is no longer in use.
@@ -351,7 +347,7 @@ SubscriberPool::dequeue(const SubscriberPtr& sub, bool requeue, const IceUtil::T
out << "destroying workers";
}
++_reap;
- return 0;
+ return;
}
}
}
@@ -368,12 +364,12 @@ SubscriberPool::dequeue(const SubscriberPtr& sub, bool requeue, const IceUtil::T
if(_destroy)
{
- return 0;
+ return;
}
_lastDequeue = IceUtil::Time::now();
- SubscriberPtr subscriber = _pending.front();
+ subscriber = _pending.front();
_pending.pop_front();
++_inUse;
@@ -395,7 +391,6 @@ SubscriberPool::dequeue(const SubscriberPtr& sub, bool requeue, const IceUtil::T
// stall threads.
//
computeInterval = (_workers.size() - _size) > 0;
- return subscriber;
}
void
@@ -477,3 +472,22 @@ SubscriberPool::check()
_workers.push_back(new SubscriberPoolWorker(this));
}
}
+
+bool
+SubscriberPool::invariants()
+{
+ set<SubscriberPtr> subs;
+ list<SubscriberPtr>::const_iterator p;
+ for(p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ {
+ assert(subs.find(*p) == subs.end());
+ subs.insert(*p);
+ }
+ subs.clear();
+ for(p = _pending.begin(); p != _pending.end(); ++p)
+ {
+ assert(subs.find(*p) == subs.end());
+ subs.insert(*p);
+ }
+ return true;
+}