summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicManagerI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicManagerI.cpp226
1 files changed, 123 insertions, 103 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp
index 53de6965b46..90658ba7470 100644
--- a/cpp/src/IceStorm/TopicManagerI.cpp
+++ b/cpp/src/IceStorm/TopicManagerI.cpp
@@ -19,15 +19,17 @@
#include <IceStorm/TraceLevels.h>
#include <functional>
+#include <ctype.h>
using namespace IceStorm;
using namespace std;
-TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice::ObjectAdapterPtr& adapter,
- const TraceLevelsPtr& traceLevels, const Freeze::DBEnvironmentPtr& dbEnv,
- const Freeze::DBPtr& db) :
+TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice::ObjectAdapterPtr& topicAdapter,
+ const Ice::ObjectAdapterPtr& publishAdapter, const TraceLevelsPtr& traceLevels,
+ const Freeze::DBEnvironmentPtr& dbEnv, const Freeze::DBPtr& db) :
_communicator(communicator),
- _adapter(adapter),
+ _topicAdapter(topicAdapter),
+ _publishAdapter(publishAdapter),
_traceLevels(traceLevels),
_dbEnv(dbEnv),
_topics(db)
@@ -39,15 +41,15 @@ TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice
// Recreate each of the topics in the dictionary. If the topic
// database doesn't exist then the topic was previously destroyed,
// but not removed from the _topics dictionary. Normally this
- // should only occur upon a crash.
+ // should only occur in the event of a crash.
//
- StringBoolDict::iterator p = _topics.begin();
+ StringStringDict::iterator p = _topics.begin();
while(p != _topics.end())
{
assert(_topicIMap.find(p->first) == _topicIMap.end());
try
{
- installTopic("recreate", p->first, false);
+ installTopic("recreate", p->first, p->second, false);
++p;
}
catch(const Freeze::DBNotFoundException& ex)
@@ -57,7 +59,7 @@ TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice
Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat);
out << ex;
}
- StringBoolDict::iterator tmp = p;
+ StringStringDict::iterator tmp = p;
++p;
_topics.erase(tmp);
}
@@ -69,8 +71,10 @@ TopicManagerI::~TopicManagerI()
}
TopicPrx
-TopicManagerI::create(const string& name, const Ice::Current&)
+TopicManagerI::create(const string& name, const string& type, const Ice::Current&)
{
+ validateType(type);
+
// TODO: reader/writer mutex
IceUtil::Mutex::Lock sync(*this);
@@ -83,15 +87,15 @@ TopicManagerI::create(const string& name, const Ice::Current&)
throw ex;
}
- installTopic("create", name, true);
- _topics.insert(make_pair(name, true));
+ installTopic("create", name, type, true);
+ _topics.insert(make_pair(name, type));
//
// The identity is the name of the Topic.
//
Ice::Identity id;
id.name = name;
- return TopicPrx::uncheckedCast(_adapter->createProxy(id));
+ return TopicPrx::uncheckedCast(_topicAdapter->createProxy(id));
}
TopicPrx
@@ -106,7 +110,7 @@ TopicManagerI::retrieve(const string& name, const Ice::Current&) const
{
Ice::Identity id;
id.name = name;
- return TopicPrx::uncheckedCast(_adapter->createProxy(id));
+ return TopicPrx::uncheckedCast(_topicAdapter->createProxy(id));
}
NoSuchTopic ex;
@@ -119,21 +123,20 @@ TopicManagerI::retrieve(const string& name, const Ice::Current&) const
//
struct TransformToTopicDict : public std::unary_function<TopicIMap::value_type, TopicDict::value_type>
{
+ TransformToTopicDict(const Ice::ObjectAdapterPtr& adapter) :
+ _adapter(adapter)
+ {
+ }
- TransformToTopicDict(const Ice::ObjectAdapterPtr& adapter) :
- _adapter(adapter)
- {
- }
-
- TopicDict::value_type
- operator()(TopicIMap::value_type p)
- {
- Ice::Identity id;
- id.name = p.first;
- return TopicDict::value_type(p.first, TopicPrx::uncheckedCast(_adapter->createProxy(id)));
- }
+ TopicDict::value_type
+ operator()(TopicIMap::value_type p)
+ {
+ Ice::Identity id;
+ id.name = p.first;
+ return TopicDict::value_type(p.first, TopicPrx::uncheckedCast(_adapter->createProxy(id)));
+ }
- Ice::ObjectAdapterPtr _adapter;
+ Ice::ObjectAdapterPtr _adapter;
};
TopicDict
@@ -146,83 +149,12 @@ TopicManagerI::retrieveAll(const Ice::Current&) const
TopicDict all;
transform(_topicIMap.begin(), _topicIMap.end(), inserter(all, all.begin()),
- TransformToTopicDict(_adapter));
+ TransformToTopicDict(_topicAdapter));
return all;
}
void
-TopicManagerI::subscribe(const QoS& qos, const Ice::ObjectPrx& subscriber, const Ice::Current&)
-{
- IceUtil::Mutex::Lock sync(*this);
-
- Ice::Identity ident = subscriber->ice_getIdentity();
- if(_traceLevels->topicMgr > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat);
- out << "Subscribe: " << Ice::identityToString(ident);
- if(_traceLevels->topicMgr > 1)
- {
- out << " QoS: ";
- for(QoS::const_iterator qi = qos.begin(); qi != qos.end() ; ++qi)
- {
- if(qi != qos.begin())
- {
- out << ',';
- }
- out << '[' << qi->first << "," << qi->second << ']';
- }
- }
- }
-
- //
- // Ensure that the identities category refers to an existing
- // channel.
- //
- TopicIMap::iterator elem = _topicIMap.find(ident.category);
- if(elem == _topicIMap.end())
- {
- NoSuchTopic ex;
- ex.name = ident.category;
- throw ex;
- }
-
- //
- // Subscribe to the topic.
- //
- elem->second->subscribe(subscriber, qos);
-}
-
-void
-TopicManagerI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&)
-{
- IceUtil::Mutex::Lock sync(*this);
-
- Ice::Identity ident = subscriber->ice_getIdentity();
-
- if(_traceLevels->topicMgr > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat);
-
- out << "Unsubscribe: " << Ice::identityToString(ident);
- }
-
-
- TopicIMap::iterator elem = _topicIMap.find(ident.category);
- if(elem != _topicIMap.end())
- {
- elem->second->unsubscribe(subscriber);
- }
-}
-
-void
-TopicManagerI::shutdown(const Ice::Current&)
-{
- _flusher->stopFlushing();
- _communicator->shutdown();
-}
-
-void
TopicManagerI::reap()
{
//
@@ -251,7 +183,7 @@ TopicManagerI::reap()
}
void
-TopicManagerI::installTopic(const std::string& message, const std::string& name, bool create)
+TopicManagerI::installTopic(const string& message, const string& name, const string& type, bool create)
{
if(_traceLevels->topicMgr > 0)
{
@@ -259,21 +191,109 @@ TopicManagerI::installTopic(const std::string& message, const std::string& name,
out << message << ' ' << name;
}
+ //
+ // Prepend "topic-" to the topic name in order to form a
+ // unique name for the Freeze database. Since the name we
+ // supply to openDB is also used as a filename, we call
+ // getDatabaseName to obtain a name with any questionable
+ // filename characters converted to hex.
+ //
// TODO: instance
- // TODO: reserved names?
// TODO: failure? cleanup database?
- Freeze::DBPtr db = _dbEnv->openDB(name, create);
+ //
+ string dbName = "topic-" + getDatabaseName(name);
+ Freeze::DBPtr db = _dbEnv->openDB(dbName, create);
//
// Create topic implementation
//
- TopicIPtr topicI = new TopicI(_adapter, _traceLevels, name, _factory, db);
+ TopicIPtr topicI = new TopicI(_publishAdapter, _traceLevels, name, type, _factory, db);
//
// The identity is the name of the Topic.
//
Ice::Identity id;
id.name = name;
- _adapter->add(topicI, id);
+ _topicAdapter->add(topicI, id);
_topicIMap.insert(TopicIMap::value_type(name, topicI));
}
+
+void
+TopicManagerI::validateType(const string& type)
+{
+ bool fail = false;
+ const string::size_type len = type.size();
+ string::size_type pos = type.find("::");
+ if(pos != 0)
+ {
+ fail = true;
+ }
+
+ bool checkAlpha = false;
+ while(!fail && pos < len)
+ {
+ if(checkAlpha)
+ {
+ if(!isalpha(type[pos]))
+ {
+ fail = true;
+ }
+ else
+ {
+ checkAlpha = false;
+ }
+ }
+ else if(type[pos] == ':')
+ {
+ pos++;
+ if(pos == len || type[pos] != ':')
+ {
+ fail = true;
+ }
+ else
+ {
+ checkAlpha = true;
+ }
+ }
+ else if(!isalnum(type[pos]))
+ {
+ fail = true;
+ }
+ pos++;
+ }
+
+ if(checkAlpha) // type ended with "::"
+ {
+ fail = true;
+ }
+
+ if(fail)
+ {
+ InvalidType ex;
+ ex.type = type;
+ throw ex;
+ }
+}
+
+string
+TopicManagerI::getDatabaseName(const string& name)
+{
+ string result;
+ result.reserve(name.size());
+
+ for(string::size_type i = 0; i < name.size(); i++)
+ {
+ if(isalnum(name[i]) || name[i] == '.' || name[i] == '-' || name[i] == '_')
+ {
+ result.push_back(name[i]);
+ }
+ else
+ {
+ ostringstream ostr;
+ ostr << '%' << hex << static_cast<int>(name[i]);
+ result.append(ostr.str());
+ }
+ }
+
+ return result;
+}