summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Instance.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Instance.cpp')
-rw-r--r--cpp/src/IceStorm/Instance.cpp29
1 files changed, 26 insertions, 3 deletions
diff --git a/cpp/src/IceStorm/Instance.cpp b/cpp/src/IceStorm/Instance.cpp
index 0f022b4e77c..35c74c96ba3 100644
--- a/cpp/src/IceStorm/Instance.cpp
+++ b/cpp/src/IceStorm/Instance.cpp
@@ -22,6 +22,22 @@ using namespace std;
using namespace IceStorm;
using namespace IceStormElection;
+void
+TopicReaper::add(const string& name)
+{
+ Lock sync(*this);
+ _topics.push_back(name);
+}
+
+vector<string>
+TopicReaper::consumeReapedTopics()
+{
+ Lock sync(*this);
+ vector<string> reaped;
+ reaped.swap(_topics);
+ return reaped;
+}
+
Instance::Instance(
const string& instanceName,
const string& name,
@@ -43,7 +59,8 @@ Instance::Instance(
_flushInterval(IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsIntWithDefault(
name + ".Flush.Timeout", 1000))), // default one second.
// default one minute.
- _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout", 60 * 1000))
+ _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout", 60 * 1000)),
+ _topicReaper(new TopicReaper())
{
try
{
@@ -66,12 +83,12 @@ Instance::Instance(
_observers = new Observers(this);
_batchFlusher = new IceUtil::Timer();
_timer = new IceUtil::Timer();
-
+
//
// If an Ice metrics observer is setup on the communicator, also
// enable metrics for IceStorm.
//
- IceInternal::CommunicatorObserverIPtr o =
+ IceInternal::CommunicatorObserverIPtr o =
IceInternal::CommunicatorObserverIPtr::dynamicCast(communicator->getObserver());
if(o)
{
@@ -196,6 +213,12 @@ Instance::observer() const
return _observer;
}
+IceStorm::TopicReaperPtr
+Instance::topicReaper() const
+{
+ return _topicReaper;
+}
+
IceUtil::Time
Instance::discardInterval() const
{