summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Flusher.cpp
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2001-11-26 15:53:00 +0000
committerMatthew Newhook <matthew@zeroc.com>2001-11-26 15:53:00 +0000
commit021c6615fd0b299d7742c82f7d6b76f4a813f397 (patch)
treeaf38dfba0fc3204ce8a0572777b92883e6e657f8 /cpp/src/IceStorm/Flusher.cpp
parentBlobject (diff)
downloadice-021c6615fd0b299d7742c82f7d6b76f4a813f397.tar.bz2
ice-021c6615fd0b299d7742c82f7d6b76f4a813f397.tar.xz
ice-021c6615fd0b299d7742c82f7d6b76f4a813f397.zip
Initial version of IceStorm.
Diffstat (limited to 'cpp/src/IceStorm/Flusher.cpp')
-rw-r--r--cpp/src/IceStorm/Flusher.cpp211
1 files changed, 211 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/Flusher.cpp b/cpp/src/IceStorm/Flusher.cpp
new file mode 100644
index 00000000000..3fb95ec0715
--- /dev/null
+++ b/cpp/src/IceStorm/Flusher.cpp
@@ -0,0 +1,211 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#include <Ice/Ice.h>
+#include <Ice/Functional.h>
+
+#include <IceStorm/Flusher.h>
+#include <IceStorm/Subscriber.h>
+#include <IceStorm/TraceLevels.h>
+
+#include <algorithm>
+
+using namespace IceStorm;
+using namespace std;
+
+void IceStorm::incRef(Flusher* p) { p->__incRef(); }
+void IceStorm::decRef(Flusher* p) { p->__decRef(); }
+
+namespace IceStorm
+{
+
+class FlusherThread : public JTCThread, public JTCMonitor
+{
+public:
+
+ FlusherThread(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels) :
+ _traceLevels(traceLevels),
+ _logger(communicator->getLogger())
+ {
+ Ice::PropertiesPtr properties = communicator->getProperties();
+ string value;
+ value = properties->getProperty("IceStorm.Flush.Timeout");
+ if (!value.empty())
+ {
+ _flushTime = atoi(value.c_str());
+ if (_flushTime < 100)
+ {
+ _flushTime = 100; // Minimum of 100 ms
+ }
+ }
+ else
+ {
+ _flushTime = 1000; // Default of 1 second
+ }
+ }
+
+ ~FlusherThread()
+ {
+ }
+
+ virtual void
+ run()
+ {
+ JTCSyncT<JTCMonitor> sync(*this);
+ while (!_destroy)
+ {
+ long tout = calcTimeout();
+ try
+ {
+ if (tout == 0)
+ {
+ wait();
+ }
+ else
+ {
+ wait(tout);
+ }
+ }
+ catch(const JTCInterruptedException&)
+ {
+ }
+ if (_destroy)
+ {
+ continue;
+ }
+ flushAll();
+ }
+ }
+
+ void
+ destroy()
+ {
+ JTCSyncT<JTCMonitor> sync(*this);
+ _destroy = true;
+ notify();
+ }
+
+ //
+ // It would be possible to write add/remove in such a way as to
+ // avoid blocking while flushing by having a queue of actions
+ // which are only performed before flushing. For now, however,
+ // this issue is ignored.
+ //
+ void
+ add(const SubscriberPtr& subscriber)
+ {
+ JTCSyncT<JTCMonitor> sync(*this);
+ bool isEmpty = _subscribers.empty();
+ _subscribers.push_back(subscriber);
+
+ //
+ // If the set of subscribers was previously empty then wake up
+ // the flushing thread since it will be waiting indefinitely
+ //
+ if (isEmpty)
+ {
+ notify();
+ }
+ }
+
+ void
+ remove(const SubscriberPtr& subscriber)
+ {
+ JTCSyncT<JTCMonitor> sync(*this);
+ _subscribers.remove(subscriber);
+ }
+
+
+private:
+
+ void
+ flushAll()
+ {
+ // This is always called with the monitor locked
+ //JTCSyncT<JTCMonitor> sync(*this);
+
+ //
+ // Using standard algorithms I don't think there is a way to
+ // do this in one pass. For instance, I thought about using
+ // remove_if - but the predicate needs to be a pure function
+ // (see Meyers for details). If this is fixed then fix TopicI
+ // also.
+ //
+ _subscribers.remove_if(::Ice::constMemFun(&Subscriber::invalid));
+ for_each(_subscribers.begin(), _subscribers.end(), Ice::voidMemFun(&Subscriber::flush));
+
+ //
+ // Trace after the flush so that the correct number of objects
+ // are displayed
+ //
+ if (_traceLevels->flush > 0)
+ {
+ ostringstream s;
+ s << _subscribers.size() << " object(s)";
+
+ _logger->trace(_traceLevels->flushCat, s.str());
+ }
+ }
+
+ long
+ calcTimeout()
+ {
+ return (_subscribers.empty()) ? 0 : _flushTime;
+ }
+
+ TraceLevelsPtr _traceLevels;
+ Ice::LoggerPtr _logger;
+
+ SubscriberList _subscribers;
+ bool _destroy;
+ long _flushTime;
+};
+
+} // End namespace IceStorm
+
+Flusher::Flusher(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels)
+{
+ _thread = new FlusherThread(communicator, traceLevels);
+ _thread->start();
+}
+
+Flusher::~Flusher()
+{
+ _thread->destroy();
+ while(_thread->isAlive())
+ {
+ try
+ {
+ _thread->join();
+ }
+ catch(const JTCInterruptedException&)
+ {
+ }
+ }
+}
+
+void
+Flusher::add(const SubscriberPtr& subscriber)
+{
+ _thread->add(subscriber);
+}
+
+void
+Flusher::remove(const SubscriberPtr& subscriber)
+{
+ _thread->remove(subscriber);
+}
+
+void
+Flusher::stopFlushing()
+{
+ _thread->destroy();
+}
+