// // Copyright (c) ZeroC, Inc. All rights reserved. // #include #include #include #include #include #include #include using namespace IceStorm; using namespace std; TransientTopicManagerImpl::TransientTopicManagerImpl(const InstancePtr& instance) : _instance(instance) { } TransientTopicManagerImpl::~TransientTopicManagerImpl() { } TopicPrx TransientTopicManagerImpl::create(const string& name, const Ice::Current&) { Lock sync(*this); reap(); if(_topics.find(name) != _topics.end()) { throw TopicExists(name); } Ice::Identity id = IceStormInternal::nameToIdentity(_instance, name); // // Called by constructor or with 'this' mutex locked. // TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topicMgr > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat); out << "creating new topic \"" << name << "\". id: " << _instance->communicator()->identityToString(id); } // // Create topic implementation // TransientTopicImplPtr topicImpl = new TransientTopicImpl(_instance, name, id); // // The identity is the name of the Topic. // TopicPrx prx = TopicPrx::uncheckedCast(_instance->topicAdapter()->add(topicImpl, id)); _topics.insert(map::value_type(name, topicImpl)); return prx; } TopicPrx TransientTopicManagerImpl::retrieve(const string& name, const Ice::Current&) const { Lock sync(*this); TransientTopicManagerImpl* This = const_cast(this); This->reap(); map::const_iterator p = _topics.find(name); if(p == _topics.end()) { throw NoSuchTopic(name); } // Here we cannot just reconstruct the identity since the // identity could be either instanceName/topic name, or if // created with pre-3.2 IceStorm / topic name. return TopicPrx::uncheckedCast(_instance->topicAdapter()->createProxy(p->second->id())); } TopicDict TransientTopicManagerImpl::retrieveAll(const Ice::Current&) const { Lock sync(*this); TransientTopicManagerImpl* This = const_cast(this); This->reap(); TopicDict all; for(map::const_iterator p = _topics.begin(); p != _topics.end(); ++p) { // // Here we cannot just reconstruct the identity since the // identity could be either "/topic." // name, or if created with pre-3.2 IceStorm "/". // all.insert(TopicDict::value_type( p->first, TopicPrx::uncheckedCast(_instance->topicAdapter()->createProxy(p->second->id())))); } return all; } IceStormElection::NodePrx TransientTopicManagerImpl::getReplicaNode(const Ice::Current&) const { return IceStormElection::NodePrx(); } void TransientTopicManagerImpl::reap() { // // Always called with mutex locked. // // Lock sync(*this); // vector reaped = _instance->topicReaper()->consumeReapedTopics(); for(vector::const_iterator p = reaped.begin(); p != reaped.end(); ++p) { map::iterator i = _topics.find(*p); if(i != _topics.end() && i->second->destroyed()) { Ice::Identity id = i->second->id(); TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topicMgr > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat); out << "Reaping " << i->first; } try { _instance->topicAdapter()->remove(id); } catch(const Ice::ObjectAdapterDeactivatedException&) { // Ignore } _topics.erase(i); } } } void TransientTopicManagerImpl::shutdown() { Lock sync(*this); for(map::const_iterator p = _topics.begin(); p != _topics.end(); ++p) { p->second->shutdown(); } }