summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TransientTopicManagerI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TransientTopicManagerI.cpp')
-rw-r--r--cpp/src/IceStorm/TransientTopicManagerI.cpp185
1 files changed, 185 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/TransientTopicManagerI.cpp b/cpp/src/IceStorm/TransientTopicManagerI.cpp
new file mode 100644
index 00000000000..a928007c975
--- /dev/null
+++ b/cpp/src/IceStorm/TransientTopicManagerI.cpp
@@ -0,0 +1,185 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceUtil/DisableWarnings.h>
+#include <IceStorm/TransientTopicManagerI.h>
+#include <IceStorm/TransientTopicI.h>
+#include <IceStorm/TraceLevels.h>
+#include <IceStorm/Instance.h>
+
+#include <Ice/Ice.h>
+
+#include <Ice/SliceChecksums.h>
+
+#include <functional>
+
+using namespace IceStorm;
+using namespace std;
+
+namespace IceStorm
+{
+
+extern Ice::Identity nameToIdentity(const InstancePtr&, const string&);
+
+}
+
+TransientTopicManagerImpl::TransientTopicManagerImpl(const InstancePtr& instance) :
+ _instance(instance)
+{
+}
+
+TransientTopicManagerImpl::~TransientTopicManagerImpl()
+{
+}
+
+TopicPrx
+TransientTopicManagerImpl::create(const string& name, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ reap();
+
+ if(_topics.find(name) != _topics.end())
+ {
+ TopicExists ex;
+ ex.name = name;
+ throw ex;
+ }
+
+ Ice::Identity id = nameToIdentity(_instance, name);
+
+ //
+ // Called by constructor or with 'this' mutex locked.
+ //
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topicMgr > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
+ out << "creating new topic \"" << name << "\". id: "
+ << _instance->communicator()->identityToString(id);
+ }
+
+ //
+ // Create topic implementation
+ //
+ TransientTopicImplPtr topicImpl = new TransientTopicImpl(_instance, name, id);
+
+ //
+ // The identity is the name of the Topic.
+ //
+ TopicPrx prx = TopicPrx::uncheckedCast(_instance->topicAdapter()->add(topicImpl, id));
+ _topics.insert(map<string, TransientTopicImplPtr>::value_type(name, topicImpl));
+ return prx;
+}
+
+TopicPrx
+TransientTopicManagerImpl::retrieve(const string& name, const Ice::Current&) const
+{
+ Lock sync(*this);
+
+ TransientTopicManagerImpl* This = const_cast<TransientTopicManagerImpl*>(this);
+ This->reap();
+
+ map<string, TransientTopicImplPtr>::const_iterator p = _topics.find(name);
+ if(p == _topics.end())
+ {
+ NoSuchTopic ex;
+ ex.name = name;
+ throw ex;
+ }
+
+ // Here we cannot just reconstruct the identity since the
+ // identity could be either instanceName/topic name, or if
+ // created with pre-3.2 IceStorm / topic name.
+ return TopicPrx::uncheckedCast(_instance->topicAdapter()->createProxy(p->second->id()));
+}
+
+TopicDict
+TransientTopicManagerImpl::retrieveAll(const Ice::Current&) const
+{
+ Lock sync(*this);
+
+ TransientTopicManagerImpl* This = const_cast<TransientTopicManagerImpl*>(this);
+ This->reap();
+
+ TopicDict all;
+ for(map<string, TransientTopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
+ {
+ //
+ // Here we cannot just reconstruct the identity since the
+ // identity could be either "<instanceName>/topic.<topicname>"
+ // name, or if created with pre-3.2 IceStorm "/<topicname>".
+ //
+ all.insert(TopicDict::value_type(
+ p->first, TopicPrx::uncheckedCast(_instance->topicAdapter()->createProxy(p->second->id()))));
+ }
+
+ return all;
+}
+Ice::SliceChecksumDict
+TransientTopicManagerImpl::getSliceChecksums(const Ice::Current&) const
+{
+ return Ice::sliceChecksums();
+}
+
+IceStormElection::NodePrx
+TransientTopicManagerImpl::getReplicaNode(const Ice::Current&) const
+{
+ return IceStormElection::NodePrx();
+}
+
+void
+TransientTopicManagerImpl::reap()
+{
+ //
+ // Always called with mutex locked.
+ //
+ // Lock sync(*this);
+ //
+ map<string, TransientTopicImplPtr>::iterator i = _topics.begin();
+ while(i != _topics.end())
+ {
+ if(i->second->destroyed())
+ {
+ Ice::Identity id = i->second->id();
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topicMgr > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
+ out << "Reaping " << i->first;
+ }
+
+ try
+ {
+ _instance->topicAdapter()->remove(id);
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ // Ignore
+ }
+
+ _topics.erase(i++);
+ }
+ else
+ {
+ ++i;
+ }
+ }
+}
+
+void
+TransientTopicManagerImpl::shutdown()
+{
+ Lock sync(*this);
+
+ for(map<string, TransientTopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
+ {
+ p->second->shutdown();
+ }
+}