diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 255 |
1 files changed, 183 insertions, 72 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index edc1c1c3fd2..3edda8722e2 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -41,8 +41,8 @@ class PublisherProxyI : public Ice::Blobject { public: - PublisherProxyI(const IceStorm::TopicSubscribersPtr& s) : - _subscribers(s) + PublisherProxyI(const IceStorm::TopicSubscribersPtr& s, const string& type) : + _subscribers(s), _type(type) { } @@ -58,11 +58,16 @@ private: // Set of associated subscribers // IceStorm::TopicSubscribersPtr _subscribers; + + // + // The topic type + // + string _type; }; // // The servant has a 1-1 association with a topic. It is used to -// receive events from linked Topics.. +// receive events from linked Topics. // class TopicLinkI : public TopicLink { @@ -115,8 +120,8 @@ public: // If a subscriber with this identity is already subscribed // then mark the subscriber as replaced. // - // Note that this doesn't actually remove the Subscribe from - // the list of subscribers - it marks the Subscriber as + // Note that this doesn't actually remove the subscriber from + // the list of subscribers - it marks the subscriber as // replaced, and it's removed on the next event publish. // for(SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i) @@ -132,17 +137,16 @@ public: } } - // - // Add to the set of subscribers + // Add to the set of subscribers. // _subscribers.push_back(subscriber); } // - // Unsubscribe the Subscriber with the given identity. Note that - // this doesn't remove the Subscriber from the list of subscribers - // - it marks the Subscriber as unsubscribed, and it's removed on + // Unsubscribe the subscriber with the given identity. Note that + // this doesn't remove the subscriber from the list of subscribers + // - it marks the subscriber as unsubscribed, and it's removed on // the next event publish. // void @@ -189,7 +193,7 @@ public: publish(const Event& event) { // - // Copy of the subscriber list so that event publishing can + // Copy the subscriber list so that event publishing can // occur in parallel. // // TODO: Find out whether this is a false optimization - how @@ -286,27 +290,100 @@ PublisherProxyI::ice_invoke(const vector< Ice::Byte>& inParams, vector< Ice::Byt { const Ice::Context& context = current.ctx; - Event event; - event.forwarded = false; - Ice::Context::const_iterator p = context.find("cost"); - if(p != context.end()) + // + // Intercept the operations ice_isA, ice_id, and ice_ids so that + // this publisher object can appear to have the topic type. + // + if(current.operation == "ice_isA") { - event.cost = atoi(p->second.c_str()); + IceInternal::InstancePtr instance = IceInternal::getInstance(current.adapter->getCommunicator()); + IceInternal::BasicStream is(instance.get()); + is.b = inParams; + is.i = is.b.begin(); + string type; + is.read(type); + bool b; + if(type == _type) + { + b = true; + } + else + { + b = Ice::Blobject::ice_isA(type); + } + IceInternal::BasicStream os(instance.get()); + os.write(b); + outParam = os.b; + } + else if(current.operation == "ice_id") + { + IceInternal::InstancePtr instance = IceInternal::getInstance(current.adapter->getCommunicator()); + IceInternal::BasicStream os(instance.get()); + os.write(_type); + outParam = os.b; + } + else if(current.operation == "ice_ids") + { + IceInternal::InstancePtr instance = IceInternal::getInstance(current.adapter->getCommunicator()); + IceInternal::BasicStream os(instance.get()); + vector<string> ids = Ice::Blobject::ice_ids(); + ids.insert(ids.begin(), _type); + os.write(ids); + outParam = os.b; } else { - event.cost = 0; // TODO: Default comes from property? + Event event; + event.forwarded = false; + Ice::Context::const_iterator p = context.find("cost"); + if(p != context.end()) + { + event.cost = atoi(p->second.c_str()); + } + else + { + event.cost = 0; // TODO: Default comes from property? + } + event.op = current.operation; + event.mode = current.mode; + event.data = inParams; + event.context = context; + + _subscribers->publish(event); } - event.op = current.operation; - event.mode = current.mode; - event.data = inParams; - event.context = context; - - _subscribers->publish(event); return true; } +#if 0 +bool +PublisherProxyI::ice_isA(const string& id, const Ice::Current& current) +{ + if(id == _type) + { + return true; + } + else + { + return Ice::Blobject::ice_isA(id, current); + } +} + +vector<string> +PublisherProxyI::ice_ids(const Ice::Current& current) +{ + vector<string> ids = Ice::Blobject::ice_ids(current); + ids.insert(ids.begin(), _type); + return ids; +} + +const string& +PublisherProxyI::ice_id(const Ice::Current&) +{ + return _type; +} +#endif + // // Incoming events from linked topics. // @@ -326,10 +403,11 @@ TopicLinkI::forward(const string& op, Ice::OperationMode mode, const ByteSeq& da } TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& traceLevels, const string& name, - const SubscriberFactoryPtr& factory, const Freeze::DBPtr& db) : + const string& type, const SubscriberFactoryPtr& factory, const Freeze::DBPtr& db) : _adapter(adapter), _traceLevels(traceLevels), _name(name), + _type(type), _factory(factory), _destroyed(false), _links(db), @@ -338,11 +416,11 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace _subscribers = new TopicSubscribers(_traceLevels); // - // Create a servant per Topic to receive event data. The servants - // Identity is category=<topicname>, name=publish. Activate the + // Create a servant per topic to receive event data. The servant's + // identity is category=<topicname>, name=publish. Activate the // object and save a reference to give to publishers. // - _publisher = new PublisherProxyI(_subscribers); + _publisher = new PublisherProxyI(_subscribers, type); Ice::Identity id; id.category = _name; @@ -350,8 +428,8 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace _publisherPrx = _adapter->add(_publisher, id); // - // Create a servant per Topic to receive linked event data. The - // servants Identity is category=<topicname>, name=link. Activate + // Create a servant per topic to receive linked event data. The + // servant's identity is category=<topicname>, name=link. Activate // the object and save a reference to give to linked topics. // _link = new TopicLinkI(_subscribers); @@ -371,8 +449,8 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace } // - // Create the Subscriber object and add to the set of - // Subscribers. + // Create the subscriber object and add it to the set of + // subscribers. // SubscriberPtr subscriber = _factory->createLinkSubscriber(p->second.obj, p->second.info.cost); _subscribers->add(subscriber); @@ -390,6 +468,13 @@ TopicI::getName(const Ice::Current&) const return _name; } +string +TopicI::getType(const Ice::Current&) const +{ + // Immutable + return _type; +} + Ice::ObjectPrx TopicI::getPublisher(const Ice::Current&) const { @@ -409,8 +494,7 @@ TopicI::destroy(const Ice::Current&) _destroyed = true; // - // See the comment in the constructor for the derevation of the - // Identity. + // See the comment in the constructor for the format of the identity. // Ice::Identity id; id.category = _name; @@ -429,6 +513,71 @@ TopicI::destroy(const Ice::Current&) } void +TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& subscriber, const Ice::Current&) +{ + IceUtil::RecMutex::Lock sync(*this); + + if(_destroyed) + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + + Ice::Identity ident = subscriber->ice_getIdentity(); + if(_traceLevels->topic > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat); + out << "Subscribe: " << Ice::identityToString(ident); + if(_traceLevels->topic > 1) + { + out << " QoS: "; + for(QoS::const_iterator qi = qos.begin(); qi != qos.end() ; ++qi) + { + if(qi != qos.begin()) + { + out << ','; + } + out << '[' << qi->first << "," << qi->second << ']'; + } + } + } + + reap(); + + // + // Add this subscriber to the set of subscribers. + // + SubscriberPtr sub = _factory->createSubscriber(qos, subscriber); + _subscribers->add(sub); +} + +void +TopicI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&) +{ + IceUtil::RecMutex::Lock sync(*this); + + if(_destroyed) + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + + Ice::Identity ident = subscriber->ice_getIdentity(); + + if(_traceLevels->topic > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat); + + out << "Unsubscribe: " << Ice::identityToString(ident); + } + + reap(); + + // + // Unsubscribe the subscriber with this identity. + // + _subscribers->remove(subscriber); +} + +void TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) { IceUtil::RecMutex::Lock sync(*this); @@ -464,8 +613,7 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) dbInfo.info.cost = cost; // - // Create the Subscriber object and add to the setup of - // subscribers. + // Create the subscriber object and add it to the set of subscribers. // SubscriberPtr subscriber = _factory->createLinkSubscriber(dbInfo.obj, dbInfo.info.cost); _subscribers->add(subscriber); @@ -544,43 +692,6 @@ TopicI::destroyed() const } void -TopicI::subscribe(const Ice::ObjectPrx& obj, const QoS& qos) -{ - IceUtil::RecMutex::Lock sync(*this); - - if(_destroyed) - { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); - } - - reap(); - - // - // Add this subscriber to the set of subscribers. - // - SubscriberPtr subscriber = _factory->createSubscriber(qos, obj); - _subscribers->add(subscriber); -} - -void -TopicI::unsubscribe(const Ice::ObjectPrx& obj) -{ - IceUtil::RecMutex::Lock sync(*this); - - if(_destroyed) - { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); - } - - reap(); - - // - // Unsubscribe the subscriber with this identity. - // - _subscribers->remove(obj); -} - -void TopicI::reap() { IceUtil::RecMutex::Lock sync(*this); |