diff options
Diffstat (limited to 'cpp/src/IceStorm/TransientTopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TransientTopicI.cpp | 598 |
1 files changed, 598 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/TransientTopicI.cpp b/cpp/src/IceStorm/TransientTopicI.cpp new file mode 100644 index 00000000000..e70f013736f --- /dev/null +++ b/cpp/src/IceStorm/TransientTopicI.cpp @@ -0,0 +1,598 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <IceUtil/DisableWarnings.h> +#include <IceStorm/TransientTopicI.h> +#include <IceStorm/Instance.h> +#include <IceStorm/Subscriber.h> +#include <IceStorm/TraceLevels.h> + +#include <Ice/Ice.h> + +#include <list> +#include <algorithm> + +using namespace IceStorm; +using namespace std; + +namespace +{ + +// +// The servant has a 1-1 association with a topic. It is used to +// receive events from Publishers. +// +class TransientPublisherI : public Ice::BlobjectArray +{ +public: + + TransientPublisherI(const TransientTopicImplPtr& impl) : + _impl(impl) + { + } + + virtual bool + ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams, + Ice::ByteSeq&, + const Ice::Current& current) + { + // Use cached reads. + EventDataPtr event = new EventData( + current.operation, + current.mode, + Ice::ByteSeq(), + current.ctx); + + // + // COMPILERBUG: gcc 4.0.1 doesn't like this. + // + //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + Ice::ByteSeq data(inParams.first, inParams.second); + event->data.swap(data); + + EventDataSeq v; + v.push_back(event); + _impl->publish(false, v); + + return true; + } + +private: + + const TransientTopicImplPtr _impl; +}; + +// +// The servant has a 1-1 association with a topic. It is used to +// receive events from linked Topics. +// +class TransientTopicLinkI : public TopicLink +{ +public: + + TransientTopicLinkI(const TransientTopicImplPtr& impl) : + _impl(impl) + { + } + + virtual void + forward(const EventDataSeq& v, const Ice::Current& current) + { + _impl->publish(true, v); + } + +private: + + const TransientTopicImplPtr _impl; +}; + +} + +namespace IceStorm +{ +extern string identityToTopicName(const Ice::Identity& id); +} + +TransientTopicImpl::TransientTopicImpl( + const InstancePtr& instance, + const string& name, + const Ice::Identity& id) : + _instance(instance), + _name(name), + _id(id), + _destroyed(false) +{ + // + // Create a servant per topic to receive event data. If the + // category is empty then we are in backwards compatibility + // mode. In this case the servant's identity is + // category=<topicname>, name=publish, otherwise the name is + // <instancename>/<topicname>.publish. The same applies to the + // link proxy. + // + // Activate the object and save a reference to give to publishers. + // + Ice::Identity pubid; + Ice::Identity linkid; + if(id.category.empty()) + { + pubid.category = _name; + pubid.name = "publish"; + linkid.category = _name; + linkid.name = "link"; + } + else + { + pubid.category = id.category; + pubid.name = _name + ".publish"; + linkid.category = id.category; + linkid.name = _name + ".link"; + } + + _publisherPrx = _instance->publishAdapter()->add(new TransientPublisherI(this), pubid); + _linkPrx = TopicLinkPrx::uncheckedCast(_instance->publishAdapter()->add(new TransientTopicLinkI(this), linkid)); +} + +TransientTopicImpl::~TransientTopicImpl() +{ +} + +string +TransientTopicImpl::getName(const Ice::Current&) const +{ + // Immutable + return _name; +} + +Ice::ObjectPrx +TransientTopicImpl::getPublisher(const Ice::Current&) const +{ + // Immutable + return _publisherPrx; +} + +Ice::ObjectPrx +TransientTopicImpl::getNonReplicatedPublisher(const Ice::Current&) const +{ + // Immutable + return _publisherPrx; +} + +// +// COMPILERFIX: For some reason with VC6 find reports an error. +// +#if defined(_MSC_VER) && (_MSC_VER < 1300) +namespace +{ +static vector<SubscriberPtr>::iterator +find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end, const Ice::Identity& ident) +{ + while(start != end) + { + if(*start == ident) + { + return start; + } + ++start; + } + return end; +} +} +#endif + +void +TransientTopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&) +{ + Ice::Identity id = obj->ice_getIdentity(); + TraceLevelsPtr traceLevels = _instance->traceLevels(); + QoS qos = origQoS; + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << ": subscribe: " << _instance->communicator()->identityToString(id); + if(traceLevels->topic > 1) + { + out << " QoS: "; + for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p) + { + if(p != qos.begin()) + { + out << ','; + } + out << '[' << p->first << "," << p->second << ']'; + } + } + } + + string reliability = "oneway"; + { + QoS::iterator p = qos.find("reliability"); + if(p != qos.end()) + { + reliability = p->second; + qos.erase(p); + } + } + + Ice::ObjectPrx newObj = obj; + if(reliability == "batch") + { + if(newObj->ice_isDatagram()) + { + newObj = newObj->ice_batchDatagram(); + } + else + { + newObj = newObj->ice_batchOneway(); + } + } + else if(reliability == "twoway") + { + newObj = newObj->ice_twoway(); + } + else if(reliability == "twoway ordered") + { + qos["reliability"] = "ordered"; + newObj = newObj->ice_twoway(); + } + else // reliability == "oneway" + { + if(reliability != "oneway" && traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << reliability <<" mode not understood."; + } + if(!newObj->ice_isDatagram()) + { + newObj = newObj->ice_oneway(); + } + } + + Lock sync(*this); + SubscriberRecord record; + record.id = id; + record.obj = newObj; + record.theQoS = qos; + record.topicName = _name; + record.link = false; + record.cost = 0; + + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); + if(p != _subscribers.end()) + { + // If we already have this subscriber remove it from our + // subscriber list and remove it from the database. + (*p)->destroy(); + _subscribers.erase(p); + } + + SubscriberPtr subscriber = Subscriber::create(_instance, record); + _subscribers.push_back(subscriber); +} + +Ice::ObjectPrx +TransientTopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&) +{ + Ice::Identity id = obj->ice_getIdentity(); + + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << ": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(id); + if(traceLevels->topic > 1) + { + out << " QoS: "; + for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p) + { + if(p != qos.begin()) + { + out << ','; + } + + } + } + } + + Lock sync(*this); + + SubscriberRecord record; + record.id = id; + record.obj = obj; + record.theQoS = qos; + record.topicName = _name; + record.link = false; + record.cost = 0; + + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); + if(p != _subscribers.end()) + { + throw AlreadySubscribed(); + } + + SubscriberPtr subscriber = Subscriber::create(_instance, record); + _subscribers.push_back(subscriber); + + return subscriber->proxy(); +} + +void +TransientTopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&) +{ + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(!subscriber) + { + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "unsubscribe with null subscriber."; + } + return; + } + + Ice::Identity id = subscriber->ice_getIdentity(); + + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << ": unsubscribe: " << _instance->communicator()->identityToString(id); + } + + Lock sync(*this); + // First remove the subscriber from the subscribers list. Note + // that its possible that the subscriber isn't in the list, but is + // in the database if the subscriber was locally reaped. + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); + if(p != _subscribers.end()) + { + (*p)->destroy(); + _subscribers.erase(p); + } +} + +TopicLinkPrx +TransientTopicImpl::getLinkProxy(const Ice::Current&) +{ + // immutable + return _linkPrx; +} + +void +TransientTopicImpl::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) +{ + TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic); + TopicLinkPrx link = internal->getLinkProxy(); + + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << ": link " << _instance->communicator()->identityToString(topic->ice_getIdentity()) + << " cost " << cost; + } + + Lock sync(*this); + + Ice::Identity id = topic->ice_getIdentity(); + + SubscriberRecord record; + record.id = id; + record.obj = link; + record.theTopic = topic; + record.topicName = _name; + record.link = true; + record.cost = cost; + + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id); + if(p != _subscribers.end()) + { + string name = identityToTopicName(id); + LinkExists ex; + ex.name = name; + throw ex; + } + + SubscriberPtr subscriber = Subscriber::create(_instance, record); + _subscribers.push_back(subscriber); +} + +void +TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&) +{ + Lock sync(*this); + if(_destroyed) + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + + Ice::Identity id = topic->ice_getIdentity(); + + vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); + if(p == _subscribers.end()) + { + string name = identityToTopicName(id); + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << ": unlink " << name << " failed - not linked"; + } + + NoSuchLink ex; + ex.name = name; + throw ex; + } + + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << " unlink " << _instance->communicator()->identityToString(id); + } + + // Remove the subscriber from the subscribers list. Note + // that its possible that the subscriber isn't in the list, but is + // in the database if the subscriber was locally reaped. + p = find(_subscribers.begin(), _subscribers.end(), id); + if(p != _subscribers.end()) + { + (*p)->destroy(); + _subscribers.erase(p); + } +} + +LinkInfoSeq +TransientTopicImpl::getLinkInfoSeq(const Ice::Current&) const +{ + Lock sync(*this); + LinkInfoSeq seq; + for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + { + SubscriberRecord record = (*p)->record(); + if(record.link && !(*p)->errored()) + { + LinkInfo info; + info.name = identityToTopicName(record.theTopic->ice_getIdentity()); + info.cost = record.cost; + info.theTopic = record.theTopic; + seq.push_back(info); + } + } + return seq; +} + +void +TransientTopicImpl::destroy(const Ice::Current&) +{ + Lock sync(*this); + + if(_destroyed) + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + _destroyed = true; + + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << ": destroy"; + } + + try + { + _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity()); + _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity()); + } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + // Ignore -- this could occur on shutdown. + } + + // Destroy all of the subscribers. + for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + { + (*p)->destroy(); + } + _subscribers.clear(); +} + +void +TransientTopicImpl::reap(const Ice::IdentitySeq&, const Ice::Current&) +{ +} + +bool +TransientTopicImpl::destroyed() const +{ + Lock sync(*this); + return _destroyed; +} + +Ice::Identity +TransientTopicImpl::id() const +{ + // immutable + return _id; +} + +void +TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events) +{ + // + // Copy of the subscriber list so that event publishing can occur + // in parallel. + // + vector<SubscriberPtr> copy; + { + Lock sync(*this); + copy = _subscribers; + } + + // + // Queue each event, gathering a list of those subscribers that + // must be reaped. + // + vector<Ice::Identity> e; + for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p) + { + if(!(*p)->queue(forwarded, events) && (*p)->reap()) + { + e.push_back((*p)->id()); + } + } + + // + // Run through the error list removing those subscribers that are + // in error from the subscriber list. + // + if(!e.empty()) + { + Lock sync(*this); + for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep) + { + // + // Its possible for the subscriber to already have been + // removed since the copy is iterated over outside of + // mutex protection. + // + // Note that although this could be quicker if we used a + // map, the most optimal case should be pushing around + // events not searching for a particular subscriber. + // + // The subscriber is immediately destroyed & removed from + // the _subscribers list. Add the subscriber to a list of + // error'd subscribers and remove it from the database on + // the next reap. + // + vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep); + if(q != _subscribers.end()) + { + SubscriberPtr subscriber = *q; + // + // Destroy the subscriber. + // + subscriber->destroy(); + _subscribers.erase(q); + } + } + } +} + +void +TransientTopicImpl::shutdown() +{ + Lock sync(*this); + + // Shutdown each subscriber. This waits for the event queues to drain. + for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + { + (*p)->shutdown(); + } +} |