summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
authorMark Spruiell <mes@zeroc.com>2003-03-27 22:01:54 +0000
committerMark Spruiell <mes@zeroc.com>2003-03-27 22:01:54 +0000
commitd14d3b45d87d981c2fada77dc439a054a8d21787 (patch)
treeddeb19290df69b801a089608d070fc945fff9a97 /cpp/src/IceStorm/TopicI.cpp
parentdatagram fixes (diff)
downloadice-d14d3b45d87d981c2fada77dc439a054a8d21787.tar.bz2
ice-d14d3b45d87d981c2fada77dc439a054a8d21787.tar.xz
ice-d14d3b45d87d981c2fada77dc439a054a8d21787.zip
cleaning up topic subscription
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp255
1 files changed, 183 insertions, 72 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index edc1c1c3fd2..3edda8722e2 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -41,8 +41,8 @@ class PublisherProxyI : public Ice::Blobject
{
public:
- PublisherProxyI(const IceStorm::TopicSubscribersPtr& s) :
- _subscribers(s)
+ PublisherProxyI(const IceStorm::TopicSubscribersPtr& s, const string& type) :
+ _subscribers(s), _type(type)
{
}
@@ -58,11 +58,16 @@ private:
// Set of associated subscribers
//
IceStorm::TopicSubscribersPtr _subscribers;
+
+ //
+ // The topic type
+ //
+ string _type;
};
//
// The servant has a 1-1 association with a topic. It is used to
-// receive events from linked Topics..
+// receive events from linked Topics.
//
class TopicLinkI : public TopicLink
{
@@ -115,8 +120,8 @@ public:
// If a subscriber with this identity is already subscribed
// then mark the subscriber as replaced.
//
- // Note that this doesn't actually remove the Subscribe from
- // the list of subscribers - it marks the Subscriber as
+ // Note that this doesn't actually remove the subscriber from
+ // the list of subscribers - it marks the subscriber as
// replaced, and it's removed on the next event publish.
//
for(SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i)
@@ -132,17 +137,16 @@ public:
}
}
-
//
- // Add to the set of subscribers
+ // Add to the set of subscribers.
//
_subscribers.push_back(subscriber);
}
//
- // Unsubscribe the Subscriber with the given identity. Note that
- // this doesn't remove the Subscriber from the list of subscribers
- // - it marks the Subscriber as unsubscribed, and it's removed on
+ // Unsubscribe the subscriber with the given identity. Note that
+ // this doesn't remove the subscriber from the list of subscribers
+ // - it marks the subscriber as unsubscribed, and it's removed on
// the next event publish.
//
void
@@ -189,7 +193,7 @@ public:
publish(const Event& event)
{
//
- // Copy of the subscriber list so that event publishing can
+ // Copy the subscriber list so that event publishing can
// occur in parallel.
//
// TODO: Find out whether this is a false optimization - how
@@ -286,27 +290,100 @@ PublisherProxyI::ice_invoke(const vector< Ice::Byte>& inParams, vector< Ice::Byt
{
const Ice::Context& context = current.ctx;
- Event event;
- event.forwarded = false;
- Ice::Context::const_iterator p = context.find("cost");
- if(p != context.end())
+ //
+ // Intercept the operations ice_isA, ice_id, and ice_ids so that
+ // this publisher object can appear to have the topic type.
+ //
+ if(current.operation == "ice_isA")
{
- event.cost = atoi(p->second.c_str());
+ IceInternal::InstancePtr instance = IceInternal::getInstance(current.adapter->getCommunicator());
+ IceInternal::BasicStream is(instance.get());
+ is.b = inParams;
+ is.i = is.b.begin();
+ string type;
+ is.read(type);
+ bool b;
+ if(type == _type)
+ {
+ b = true;
+ }
+ else
+ {
+ b = Ice::Blobject::ice_isA(type);
+ }
+ IceInternal::BasicStream os(instance.get());
+ os.write(b);
+ outParam = os.b;
+ }
+ else if(current.operation == "ice_id")
+ {
+ IceInternal::InstancePtr instance = IceInternal::getInstance(current.adapter->getCommunicator());
+ IceInternal::BasicStream os(instance.get());
+ os.write(_type);
+ outParam = os.b;
+ }
+ else if(current.operation == "ice_ids")
+ {
+ IceInternal::InstancePtr instance = IceInternal::getInstance(current.adapter->getCommunicator());
+ IceInternal::BasicStream os(instance.get());
+ vector<string> ids = Ice::Blobject::ice_ids();
+ ids.insert(ids.begin(), _type);
+ os.write(ids);
+ outParam = os.b;
}
else
{
- event.cost = 0; // TODO: Default comes from property?
+ Event event;
+ event.forwarded = false;
+ Ice::Context::const_iterator p = context.find("cost");
+ if(p != context.end())
+ {
+ event.cost = atoi(p->second.c_str());
+ }
+ else
+ {
+ event.cost = 0; // TODO: Default comes from property?
+ }
+ event.op = current.operation;
+ event.mode = current.mode;
+ event.data = inParams;
+ event.context = context;
+
+ _subscribers->publish(event);
}
- event.op = current.operation;
- event.mode = current.mode;
- event.data = inParams;
- event.context = context;
-
- _subscribers->publish(event);
return true;
}
+#if 0
+bool
+PublisherProxyI::ice_isA(const string& id, const Ice::Current& current)
+{
+ if(id == _type)
+ {
+ return true;
+ }
+ else
+ {
+ return Ice::Blobject::ice_isA(id, current);
+ }
+}
+
+vector<string>
+PublisherProxyI::ice_ids(const Ice::Current& current)
+{
+ vector<string> ids = Ice::Blobject::ice_ids(current);
+ ids.insert(ids.begin(), _type);
+ return ids;
+}
+
+const string&
+PublisherProxyI::ice_id(const Ice::Current&)
+{
+ return _type;
+}
+#endif
+
//
// Incoming events from linked topics.
//
@@ -326,10 +403,11 @@ TopicLinkI::forward(const string& op, Ice::OperationMode mode, const ByteSeq& da
}
TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& traceLevels, const string& name,
- const SubscriberFactoryPtr& factory, const Freeze::DBPtr& db) :
+ const string& type, const SubscriberFactoryPtr& factory, const Freeze::DBPtr& db) :
_adapter(adapter),
_traceLevels(traceLevels),
_name(name),
+ _type(type),
_factory(factory),
_destroyed(false),
_links(db),
@@ -338,11 +416,11 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace
_subscribers = new TopicSubscribers(_traceLevels);
//
- // Create a servant per Topic to receive event data. The servants
- // Identity is category=<topicname>, name=publish. Activate the
+ // Create a servant per topic to receive event data. The servant's
+ // identity is category=<topicname>, name=publish. Activate the
// object and save a reference to give to publishers.
//
- _publisher = new PublisherProxyI(_subscribers);
+ _publisher = new PublisherProxyI(_subscribers, type);
Ice::Identity id;
id.category = _name;
@@ -350,8 +428,8 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace
_publisherPrx = _adapter->add(_publisher, id);
//
- // Create a servant per Topic to receive linked event data. The
- // servants Identity is category=<topicname>, name=link. Activate
+ // Create a servant per topic to receive linked event data. The
+ // servant's identity is category=<topicname>, name=link. Activate
// the object and save a reference to give to linked topics.
//
_link = new TopicLinkI(_subscribers);
@@ -371,8 +449,8 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace
}
//
- // Create the Subscriber object and add to the set of
- // Subscribers.
+ // Create the subscriber object and add it to the set of
+ // subscribers.
//
SubscriberPtr subscriber = _factory->createLinkSubscriber(p->second.obj, p->second.info.cost);
_subscribers->add(subscriber);
@@ -390,6 +468,13 @@ TopicI::getName(const Ice::Current&) const
return _name;
}
+string
+TopicI::getType(const Ice::Current&) const
+{
+ // Immutable
+ return _type;
+}
+
Ice::ObjectPrx
TopicI::getPublisher(const Ice::Current&) const
{
@@ -409,8 +494,7 @@ TopicI::destroy(const Ice::Current&)
_destroyed = true;
//
- // See the comment in the constructor for the derevation of the
- // Identity.
+ // See the comment in the constructor for the format of the identity.
//
Ice::Identity id;
id.category = _name;
@@ -429,6 +513,71 @@ TopicI::destroy(const Ice::Current&)
}
void
+TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& subscriber, const Ice::Current&)
+{
+ IceUtil::RecMutex::Lock sync(*this);
+
+ if(_destroyed)
+ {
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ }
+
+ Ice::Identity ident = subscriber->ice_getIdentity();
+ if(_traceLevels->topic > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat);
+ out << "Subscribe: " << Ice::identityToString(ident);
+ if(_traceLevels->topic > 1)
+ {
+ out << " QoS: ";
+ for(QoS::const_iterator qi = qos.begin(); qi != qos.end() ; ++qi)
+ {
+ if(qi != qos.begin())
+ {
+ out << ',';
+ }
+ out << '[' << qi->first << "," << qi->second << ']';
+ }
+ }
+ }
+
+ reap();
+
+ //
+ // Add this subscriber to the set of subscribers.
+ //
+ SubscriberPtr sub = _factory->createSubscriber(qos, subscriber);
+ _subscribers->add(sub);
+}
+
+void
+TopicI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&)
+{
+ IceUtil::RecMutex::Lock sync(*this);
+
+ if(_destroyed)
+ {
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ }
+
+ Ice::Identity ident = subscriber->ice_getIdentity();
+
+ if(_traceLevels->topic > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat);
+
+ out << "Unsubscribe: " << Ice::identityToString(ident);
+ }
+
+ reap();
+
+ //
+ // Unsubscribe the subscriber with this identity.
+ //
+ _subscribers->remove(subscriber);
+}
+
+void
TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
{
IceUtil::RecMutex::Lock sync(*this);
@@ -464,8 +613,7 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
dbInfo.info.cost = cost;
//
- // Create the Subscriber object and add to the setup of
- // subscribers.
+ // Create the subscriber object and add it to the set of subscribers.
//
SubscriberPtr subscriber = _factory->createLinkSubscriber(dbInfo.obj, dbInfo.info.cost);
_subscribers->add(subscriber);
@@ -544,43 +692,6 @@ TopicI::destroyed() const
}
void
-TopicI::subscribe(const Ice::ObjectPrx& obj, const QoS& qos)
-{
- IceUtil::RecMutex::Lock sync(*this);
-
- if(_destroyed)
- {
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
- }
-
- reap();
-
- //
- // Add this subscriber to the set of subscribers.
- //
- SubscriberPtr subscriber = _factory->createSubscriber(qos, obj);
- _subscribers->add(subscriber);
-}
-
-void
-TopicI::unsubscribe(const Ice::ObjectPrx& obj)
-{
- IceUtil::RecMutex::Lock sync(*this);
-
- if(_destroyed)
- {
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
- }
-
- reap();
-
- //
- // Unsubscribe the subscriber with this identity.
- //
- _subscribers->remove(obj);
-}
-
-void
TopicI::reap()
{
IceUtil::RecMutex::Lock sync(*this);