summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp40
1 files changed, 40 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index 3c354627413..7f1ea6b67b9 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -479,6 +479,11 @@ TopicImpl::TopicImpl(
out << " failed: " << ex;
}
}
+
+ if(_instance->observer())
+ {
+ _observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, 0));
+ }
}
catch(...)
{
@@ -992,6 +997,8 @@ TopicImpl::shutdown()
{
(*p)->shutdown();
}
+
+ _observer.detach();
}
LinkInfoSeq
@@ -1036,6 +1043,8 @@ TopicImpl::destroy()
// destroyInternal clears out the topic content.
LogUpdate llu = {0,0};
_instance->observers()->destroyTopic(destroyInternal(llu, true), _name);
+
+ _observer.detach();
}
TopicContent
@@ -1192,6 +1201,17 @@ TopicImpl::publish(bool forwarded, const EventDataSeq& events)
vector<SubscriberPtr> copy;
{
IceUtil::Mutex::Lock sync(_subscribersMutex);
+ if(_observer)
+ {
+ if(forwarded)
+ {
+ _observer->forwarded();
+ }
+ else
+ {
+ _observer->published();
+ }
+ }
copy = _subscribers;
}
@@ -1408,6 +1428,26 @@ TopicImpl::getServant() const
return _servant;
}
+void
+TopicImpl::updateObserver()
+{
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+ if(_instance->observer())
+ {
+ _observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, _observer.get()));
+ }
+}
+
+void
+TopicImpl::updateSubscriberObservers()
+{
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+ for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ {
+ (*p)->updateObserver();
+ }
+}
+
LogUpdate
TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
{