diff options
Diffstat (limited to 'cpp/src/IceStorm/TransientTopicManagerI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TransientTopicManagerI.cpp | 185 |
1 files changed, 185 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/TransientTopicManagerI.cpp b/cpp/src/IceStorm/TransientTopicManagerI.cpp new file mode 100644 index 00000000000..a928007c975 --- /dev/null +++ b/cpp/src/IceStorm/TransientTopicManagerI.cpp @@ -0,0 +1,185 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <IceUtil/DisableWarnings.h> +#include <IceStorm/TransientTopicManagerI.h> +#include <IceStorm/TransientTopicI.h> +#include <IceStorm/TraceLevels.h> +#include <IceStorm/Instance.h> + +#include <Ice/Ice.h> + +#include <Ice/SliceChecksums.h> + +#include <functional> + +using namespace IceStorm; +using namespace std; + +namespace IceStorm +{ + +extern Ice::Identity nameToIdentity(const InstancePtr&, const string&); + +} + +TransientTopicManagerImpl::TransientTopicManagerImpl(const InstancePtr& instance) : + _instance(instance) +{ +} + +TransientTopicManagerImpl::~TransientTopicManagerImpl() +{ +} + +TopicPrx +TransientTopicManagerImpl::create(const string& name, const Ice::Current&) +{ + Lock sync(*this); + + reap(); + + if(_topics.find(name) != _topics.end()) + { + TopicExists ex; + ex.name = name; + throw ex; + } + + Ice::Identity id = nameToIdentity(_instance, name); + + // + // Called by constructor or with 'this' mutex locked. + // + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topicMgr > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat); + out << "creating new topic \"" << name << "\". id: " + << _instance->communicator()->identityToString(id); + } + + // + // Create topic implementation + // + TransientTopicImplPtr topicImpl = new TransientTopicImpl(_instance, name, id); + + // + // The identity is the name of the Topic. + // + TopicPrx prx = TopicPrx::uncheckedCast(_instance->topicAdapter()->add(topicImpl, id)); + _topics.insert(map<string, TransientTopicImplPtr>::value_type(name, topicImpl)); + return prx; +} + +TopicPrx +TransientTopicManagerImpl::retrieve(const string& name, const Ice::Current&) const +{ + Lock sync(*this); + + TransientTopicManagerImpl* This = const_cast<TransientTopicManagerImpl*>(this); + This->reap(); + + map<string, TransientTopicImplPtr>::const_iterator p = _topics.find(name); + if(p == _topics.end()) + { + NoSuchTopic ex; + ex.name = name; + throw ex; + } + + // Here we cannot just reconstruct the identity since the + // identity could be either instanceName/topic name, or if + // created with pre-3.2 IceStorm / topic name. + return TopicPrx::uncheckedCast(_instance->topicAdapter()->createProxy(p->second->id())); +} + +TopicDict +TransientTopicManagerImpl::retrieveAll(const Ice::Current&) const +{ + Lock sync(*this); + + TransientTopicManagerImpl* This = const_cast<TransientTopicManagerImpl*>(this); + This->reap(); + + TopicDict all; + for(map<string, TransientTopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) + { + // + // Here we cannot just reconstruct the identity since the + // identity could be either "<instanceName>/topic.<topicname>" + // name, or if created with pre-3.2 IceStorm "/<topicname>". + // + all.insert(TopicDict::value_type( + p->first, TopicPrx::uncheckedCast(_instance->topicAdapter()->createProxy(p->second->id())))); + } + + return all; +} +Ice::SliceChecksumDict +TransientTopicManagerImpl::getSliceChecksums(const Ice::Current&) const +{ + return Ice::sliceChecksums(); +} + +IceStormElection::NodePrx +TransientTopicManagerImpl::getReplicaNode(const Ice::Current&) const +{ + return IceStormElection::NodePrx(); +} + +void +TransientTopicManagerImpl::reap() +{ + // + // Always called with mutex locked. + // + // Lock sync(*this); + // + map<string, TransientTopicImplPtr>::iterator i = _topics.begin(); + while(i != _topics.end()) + { + if(i->second->destroyed()) + { + Ice::Identity id = i->second->id(); + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topicMgr > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat); + out << "Reaping " << i->first; + } + + try + { + _instance->topicAdapter()->remove(id); + } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + // Ignore + } + + _topics.erase(i++); + } + else + { + ++i; + } + } +} + +void +TransientTopicManagerImpl::shutdown() +{ + Lock sync(*this); + + for(map<string, TransientTopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p) + { + p->second->shutdown(); + } +} |