diff options
Diffstat (limited to 'cpp/src/IceStorm/Flusher.cpp')
-rw-r--r-- | cpp/src/IceStorm/Flusher.cpp | 211 |
1 files changed, 95 insertions, 116 deletions
diff --git a/cpp/src/IceStorm/Flusher.cpp b/cpp/src/IceStorm/Flusher.cpp index 097de86d24d..0c9142db350 100644 --- a/cpp/src/IceStorm/Flusher.cpp +++ b/cpp/src/IceStorm/Flusher.cpp @@ -14,146 +14,125 @@ #include <IceStorm/TraceLevels.h> #include <IceStorm/Flusher.h> #include <algorithm> -#include <list> using namespace IceStorm; using namespace std; -namespace IceStorm +FlusherThread::FlusherThread(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels) : + _communicator(communicator), + _traceLevels(traceLevels), + _destroy(false) { - -typedef std::list<FlushablePtr> FlushableList; - -class FlusherThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex> -{ -public: - - FlusherThread(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels) : - _communicator(communicator), - _traceLevels(traceLevels), - _destroy(false) + _flushTime = communicator->getProperties()->getPropertyAsIntWithDefault("IceStorm.Flush.Timeout", 1000); + if(_flushTime < 100) { - _flushTime = communicator->getProperties()->getPropertyAsIntWithDefault("IceStorm.Flush.Timeout", 1000); - if(_flushTime < 100) - { - _flushTime = 100; // Minimum of 100 ms - } + _flushTime = 100; // Minimum of 100 ms } +} - ~FlusherThread() - { - } +FlusherThread::~FlusherThread() +{ +} - virtual void - run() +void +FlusherThread::run() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + while(!_destroy) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - while(!_destroy) - { - long tout = calcTimeout(); - if(tout == 0) - { - wait(); - } - else - { - timedWait(IceUtil::Time::milliSeconds(tout)); - } - if(_destroy) - { - continue; - } - flushAll(); - } - - // - // We break a cycle by clearing the subscriber list. - // - _subscribers.clear(); + long tout = calcTimeout(); + if(tout == 0) + { + wait(); + } + else + { + timedWait(IceUtil::Time::milliSeconds(tout)); + } + if(_destroy) + { + continue; + } + flushAll(); } - void - destroy() - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - _destroy = true; - notify(); - } + // + // We break a cycle by clearing the subscriber list. + // + _subscribers.clear(); +} + +void +FlusherThread::destroy() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + _destroy = true; + notify(); +} + +// +// It would be possible to write add/remove in such a way as to +// avoid blocking while flushing by having a queue of actions +// which are only performed before flushing. For now, however, +// this issue is ignored. +// +void +FlusherThread::add(const FlushablePtr& subscriber) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + bool isEmpty = _subscribers.empty(); + _subscribers.push_back(subscriber); // - // It would be possible to write add/remove in such a way as to - // avoid blocking while flushing by having a queue of actions - // which are only performed before flushing. For now, however, - // this issue is ignored. + // If the set of subscribers was previously empty then wake up + // the flushing thread since it will be waiting indefinitely // - void - add(const FlushablePtr& subscriber) + if(isEmpty) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - bool isEmpty = _subscribers.empty(); - _subscribers.push_back(subscriber); - - // - // If the set of subscribers was previously empty then wake up - // the flushing thread since it will be waiting indefinitely - // - if(isEmpty) - { - notify(); - } + notify(); } +} - void - remove(const FlushablePtr& subscriber) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - _subscribers.remove(subscriber); - } +void +FlusherThread::remove(const FlushablePtr& subscriber) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + _subscribers.remove(subscriber); +} -private: +void +FlusherThread::flushAll() +{ + // This is always called with the monitor locked + //IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - void - flushAll() - { - // This is always called with the monitor locked - //IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // remove_if doesn't work with handle types. remove_if also - // isn't present in the STLport implementation - // - // _subscribers.remove_if(IceUtil::constMemFun(&Flushable::inactive)); - // - _subscribers.erase(remove_if(_subscribers.begin(), _subscribers.end(), - IceUtil::constMemFun(&Flushable::inactive)), _subscribers.end()); - - _communicator->flushBatchRequests(); - - // - // Trace after the flush so that the correct number of objects - // are displayed - // - if(_traceLevels->flush > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->flushCat); - out << _subscribers.size() << " object(s)"; - } - } - long - calcTimeout() + // remove_if doesn't work with handle types. remove_if also + // isn't present in the STLport implementation + // + // _subscribers.remove_if(IceUtil::constMemFun(&Flushable::inactive)); + // + _subscribers.erase(remove_if(_subscribers.begin(), _subscribers.end(), + IceUtil::constMemFun(&Flushable::inactive)), _subscribers.end()); + + _communicator->flushBatchRequests(); + + // + // Trace after the flush so that the correct number of objects + // are displayed + // + if(_traceLevels->flush > 0) { - return (_subscribers.empty()) ? 0 : _flushTime; + Ice::Trace out(_traceLevels->logger, _traceLevels->flushCat); + out << _subscribers.size() << " object(s)"; } +} - Ice::CommunicatorPtr _communicator; - TraceLevelsPtr _traceLevels; - FlushableList _subscribers; - bool _destroy; - long _flushTime; -}; - -} // End namespace IceStorm +long +FlusherThread::calcTimeout() +{ + return (_subscribers.empty()) ? 0 : _flushTime; +} Flusher::Flusher(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels) { |