diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index 3c354627413..7f1ea6b67b9 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -479,6 +479,11 @@ TopicImpl::TopicImpl( out << " failed: " << ex; } } + + if(_instance->observer()) + { + _observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, 0)); + } } catch(...) { @@ -992,6 +997,8 @@ TopicImpl::shutdown() { (*p)->shutdown(); } + + _observer.detach(); } LinkInfoSeq @@ -1036,6 +1043,8 @@ TopicImpl::destroy() // destroyInternal clears out the topic content. LogUpdate llu = {0,0}; _instance->observers()->destroyTopic(destroyInternal(llu, true), _name); + + _observer.detach(); } TopicContent @@ -1192,6 +1201,17 @@ TopicImpl::publish(bool forwarded, const EventDataSeq& events) vector<SubscriberPtr> copy; { IceUtil::Mutex::Lock sync(_subscribersMutex); + if(_observer) + { + if(forwarded) + { + _observer->forwarded(); + } + else + { + _observer->published(); + } + } copy = _subscribers; } @@ -1408,6 +1428,26 @@ TopicImpl::getServant() const return _servant; } +void +TopicImpl::updateObserver() +{ + IceUtil::Mutex::Lock sync(_subscribersMutex); + if(_instance->observer()) + { + _observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, _observer.get())); + } +} + +void +TopicImpl::updateSubscriberObservers() +{ + IceUtil::Mutex::Lock sync(_subscribersMutex); + for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + { + (*p)->updateObserver(); + } +} + LogUpdate TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master) { |