summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp84
1 files changed, 47 insertions, 37 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index 87dfde4f6c7..8ed30f7330a 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -24,18 +24,18 @@
using namespace IceStorm;
using namespace std;
-namespace IceStorm
+namespace
{
//
// The servant has a 1-1 association with a topic. It is used to
// receive events from Publishers.
//
-class PublisherProxyI : public Ice::BlobjectArray
+class PublisherI : public Ice::BlobjectArray
{
public:
- PublisherProxyI(const TopicIPtr& topic) :
+ PublisherI(const TopicIPtr& topic) :
_topic(topic)
{
}
@@ -51,7 +51,12 @@ public:
Ice::ByteSeq(),
current.ctx);
- event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
+ //
+ // COMPILERBUG: gcc 4.0.1 doesn't like this.
+ //
+ //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
+ Ice::ByteSeq data(inParams.first, inParams.second);
+ event->data.swap(data);
EventSeq v;
v.push_back(event);
@@ -109,7 +114,7 @@ private:
const SubscriberPtr _subscriber;
};
-} // End namespace IceStorm
+}
TopicI::TopicI(
const InstancePtr& instance,
@@ -133,7 +138,7 @@ TopicI::TopicI(
Ice::Identity id;
id.category = _name;
id.name = "publish";
- _publisherPrx = _instance->objectAdapter()->add(new PublisherProxyI(this), id);
+ _publisherPrx = _instance->objectAdapter()->add(new PublisherI(this), id);
//
// Create a servant per topic to receive linked event data. The
@@ -164,6 +169,7 @@ TopicI::TopicI(
_instance->objectAdapter()->add(
new TopicUpstreamLinkI(subscriber), p->second.upstream->ice_getIdentity()));
_subscribers.push_back(subscriber);
+ _instance->subscriberPool()->add(subscriber);
}
PersistentUpstreamMap::const_iterator upI = _upstream.find(_name);
@@ -250,10 +256,13 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&
if(p != _subscribers.end())
{
(*p)->destroy();
+ _instance->subscriberPool()->remove(*p);
_subscribers.erase(p);
}
- _subscribers.push_back(Subscriber::create(_instance, obj, qos));
+ SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos);
+ _subscribers.push_back(subscriber);
+ _instance->subscriberPool()->add(subscriber);
}
Ice::ObjectPrx
@@ -288,6 +297,7 @@ TopicI::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, cons
SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos);
_subscribers.push_back(subscriber);
+ _instance->subscriberPool()->add(subscriber);
return subscriber->proxy();
}
@@ -407,6 +417,7 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
IceUtil::Mutex::Lock subscriberSync(_subscribersMutex);
_subscribers.push_back(subscriber);
+ _instance->subscriberPool()->add(subscriber);
}
void
@@ -622,11 +633,8 @@ TopicI::reap()
//
list<SubscriberPtr> error;
{
- //
- // Uses splice for efficiency
- //
IceUtil::Mutex::Lock errorSync(_errorMutex);
- error.splice(error.begin(), _error);
+ _error.swap(error);
}
TraceLevelsPtr traceLevels = _instance->traceLevels();
@@ -675,31 +683,6 @@ TopicI::reap()
}
void
-TopicI::removeSubscriber(const Ice::ObjectPrx& obj)
-{
- Ice::Identity ident = obj->ice_getIdentity();
-
- IceUtil::Mutex::Lock sync(_subscribersMutex);
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), ident);
- if(p != _subscribers.end())
- {
- (*p)->destroy();
- _subscribers.erase(p);
- return;
- }
-
- //
- // If the subscriber was not found then display a diagnostic.
- //
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _instance->communicator()->identityToString(ident) << ": not subscribed.";
- }
-}
-
-void
TopicI::publish(bool forwarded, const EventSeq& events)
{
//
@@ -740,7 +723,7 @@ TopicI::publish(bool forwarded, const EventSeq& events)
//
if(!flush.empty())
{
- _instance->subscriberPool()->add(flush);
+ _instance->subscriberPool()->flush(flush);
}
//
@@ -778,6 +761,7 @@ TopicI::publish(bool forwarded, const EventSeq& events)
{
reap.push_back(*q);
}
+ _instance->subscriberPool()->remove(*q);
_subscribers.erase(q);
}
}
@@ -793,3 +777,29 @@ TopicI::publish(bool forwarded, const EventSeq& events)
_error.splice(_error.begin(), reap);
}
}
+
+void
+TopicI::removeSubscriber(const Ice::ObjectPrx& obj)
+{
+ Ice::Identity ident = obj->ice_getIdentity();
+
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+ vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), ident);
+ if(p != _subscribers.end())
+ {
+ (*p)->destroy();
+ _instance->subscriberPool()->remove(*p);
+ _subscribers.erase(p);
+ return;
+ }
+
+ //
+ // If the subscriber was not found then display a diagnostic.
+ //
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _instance->communicator()->identityToString(ident) << ": not subscribed.";
+ }
+}