summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicManagerI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicManagerI.cpp190
1 files changed, 190 insertions, 0 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp
new file mode 100644
index 00000000000..f04198df675
--- /dev/null
+++ b/cpp/src/IceStorm/TopicManagerI.cpp
@@ -0,0 +1,190 @@
+// **********************************************************************
+//
+// Copyright (c) 2001
+// MutableRealms, Inc.
+// Huntsville, AL, USA
+//
+// All Rights Reserved
+//
+// **********************************************************************
+
+#include <Ice/Ice.h>
+
+#include <IceStorm/TopicManagerI.h>
+#include <IceStorm/TopicI.h>
+#include <IceStorm/Flusher.h>
+#include <IceStorm/TraceLevels.h>
+
+#include <functional>
+
+using namespace IceStorm;
+using namespace std;
+
+TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice::ObjectAdapterPtr& adapter,
+ const TraceLevelsPtr& traceLevels) :
+ _communicator(communicator),
+ _adapter(adapter),
+ _traceLevels(traceLevels)
+
+{
+ _flusher = new Flusher(_communicator, _traceLevels);
+}
+
+TopicManagerI::~TopicManagerI()
+{
+}
+
+TopicPrx
+TopicManagerI::create(const string& name)
+{
+ // TODO: reader/writer mutex
+ JTCSyncT<JTCMutex> sync(*this);
+
+ reap();
+
+ if (_topicIMap.find(name) != _topicIMap.end())
+ {
+ throw TopicExists();
+ }
+
+ if (_traceLevels->topicMgr > 0)
+ {
+ ostringstream s;
+ s << "Create " << name;
+ _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ }
+
+ //
+ // Create topic implementation
+ //
+ TopicIPtr topicI = new TopicI(_adapter, _traceLevels, _communicator->getLogger(), name, _flusher);
+ _adapter->add(topicI, name);
+ _topicIMap.insert(TopicIMap::value_type(name, topicI));
+
+ return TopicPrx::uncheckedCast(_adapter->createProxy(name));
+}
+
+TopicPrx
+TopicManagerI::retrieve(const string& name)
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ reap();
+
+ if (_topicIMap.find(name) != _topicIMap.end())
+ return TopicPrx::uncheckedCast(_adapter->createProxy(name));
+ throw NoSuchTopic();
+}
+
+//
+// The arguments cannot be const & (for some reason)
+//
+static TopicDict::value_type
+transformToTopicDict(TopicIMap::value_type p, Ice::ObjectAdapterPtr adapter)
+{
+ return TopicDict::value_type(p.first, TopicPrx::uncheckedCast(adapter->createProxy(p.first)));
+}
+
+TopicDict
+TopicManagerI::retrieveAll()
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ reap();
+
+ TopicDict all;
+ transform(_topicIMap.begin(), _topicIMap.end(), inserter(all, all.begin()),
+ bind2nd(ptr_fun(transformToTopicDict), _adapter));
+
+ return all;
+}
+
+void
+TopicManagerI::subscribe(const Ice::ObjectPrx& tmpl, const string& id, const QoS& qos, const StringSeq& topics)
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (_traceLevels->topicMgr > 0)
+ {
+ ostringstream s;
+ s << "Subscribe: " << id;
+ if (_traceLevels->topicMgr > 1)
+ {
+ s << " QoS: ";
+ for (QoS::const_iterator qi = qos.begin(); qi != qos.end() ; ++qi)
+ {
+ if (qi != qos.begin())
+ {
+ s << ',';
+ }
+ s << '[' << qi->first << "," << qi->second << ']';
+ }
+ s << " Topics: ";
+ for (StringSeq::const_iterator ti = topics.begin(); ti != topics.end() ; ++ti)
+ {
+ if (ti != topics.begin())
+ {
+ s << ",";
+ }
+ s << *ti;
+ }
+ }
+ _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ }
+
+ for (StringSeq::const_iterator i = topics.begin() ; i != topics.end() ; ++i)
+ {
+ TopicIMap::iterator elem = _topicIMap.find(*i);
+ if (elem != _topicIMap.end())
+ {
+ elem->second->subscribe(tmpl, id, qos);
+ }
+ }
+}
+
+void
+TopicManagerI::unsubscribe(const string& id, const StringSeq& topics)
+{
+ JTCSyncT<JTCMutex> sync(*this);
+
+ if (_traceLevels->topicMgr > 0)
+ {
+ ostringstream s;
+ s << "Unsubscribe: " << id;
+ if (_traceLevels->topicMgr > 1)
+ {
+ s << " Topics: ";
+ for (StringSeq::const_iterator ti = topics.begin(); ti != topics.end() ; ++ti)
+ {
+ if (ti != topics.begin())
+ {
+ s << ",";
+ }
+ s << *ti;
+ }
+ }
+ _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ }
+
+ for (StringSeq::const_iterator i = topics.begin() ; i != topics.end() ; ++i)
+ {
+ TopicIMap::iterator elem = _topicIMap.find(*i);
+ if (elem != _topicIMap.end())
+ {
+ elem->second->unsubscribe(id);
+ }
+ }
+}
+
+void
+TopicManagerI::shutdown()
+{
+ _flusher->stopFlushing();
+ _communicator->shutdown();
+}
+
+void
+TopicManagerI::reap()
+{
+}
+