diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicManagerI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.cpp | 226 |
1 files changed, 123 insertions, 103 deletions
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp index 53de6965b46..90658ba7470 100644 --- a/cpp/src/IceStorm/TopicManagerI.cpp +++ b/cpp/src/IceStorm/TopicManagerI.cpp @@ -19,15 +19,17 @@ #include <IceStorm/TraceLevels.h> #include <functional> +#include <ctype.h> using namespace IceStorm; using namespace std; -TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice::ObjectAdapterPtr& adapter, - const TraceLevelsPtr& traceLevels, const Freeze::DBEnvironmentPtr& dbEnv, - const Freeze::DBPtr& db) : +TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice::ObjectAdapterPtr& topicAdapter, + const Ice::ObjectAdapterPtr& publishAdapter, const TraceLevelsPtr& traceLevels, + const Freeze::DBEnvironmentPtr& dbEnv, const Freeze::DBPtr& db) : _communicator(communicator), - _adapter(adapter), + _topicAdapter(topicAdapter), + _publishAdapter(publishAdapter), _traceLevels(traceLevels), _dbEnv(dbEnv), _topics(db) @@ -39,15 +41,15 @@ TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice // Recreate each of the topics in the dictionary. If the topic // database doesn't exist then the topic was previously destroyed, // but not removed from the _topics dictionary. Normally this - // should only occur upon a crash. + // should only occur in the event of a crash. // - StringBoolDict::iterator p = _topics.begin(); + StringStringDict::iterator p = _topics.begin(); while(p != _topics.end()) { assert(_topicIMap.find(p->first) == _topicIMap.end()); try { - installTopic("recreate", p->first, false); + installTopic("recreate", p->first, p->second, false); ++p; } catch(const Freeze::DBNotFoundException& ex) @@ -57,7 +59,7 @@ TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat); out << ex; } - StringBoolDict::iterator tmp = p; + StringStringDict::iterator tmp = p; ++p; _topics.erase(tmp); } @@ -69,8 +71,10 @@ TopicManagerI::~TopicManagerI() } TopicPrx -TopicManagerI::create(const string& name, const Ice::Current&) +TopicManagerI::create(const string& name, const string& type, const Ice::Current&) { + validateType(type); + // TODO: reader/writer mutex IceUtil::Mutex::Lock sync(*this); @@ -83,15 +87,15 @@ TopicManagerI::create(const string& name, const Ice::Current&) throw ex; } - installTopic("create", name, true); - _topics.insert(make_pair(name, true)); + installTopic("create", name, type, true); + _topics.insert(make_pair(name, type)); // // The identity is the name of the Topic. // Ice::Identity id; id.name = name; - return TopicPrx::uncheckedCast(_adapter->createProxy(id)); + return TopicPrx::uncheckedCast(_topicAdapter->createProxy(id)); } TopicPrx @@ -106,7 +110,7 @@ TopicManagerI::retrieve(const string& name, const Ice::Current&) const { Ice::Identity id; id.name = name; - return TopicPrx::uncheckedCast(_adapter->createProxy(id)); + return TopicPrx::uncheckedCast(_topicAdapter->createProxy(id)); } NoSuchTopic ex; @@ -119,21 +123,20 @@ TopicManagerI::retrieve(const string& name, const Ice::Current&) const // struct TransformToTopicDict : public std::unary_function<TopicIMap::value_type, TopicDict::value_type> { + TransformToTopicDict(const Ice::ObjectAdapterPtr& adapter) : + _adapter(adapter) + { + } - TransformToTopicDict(const Ice::ObjectAdapterPtr& adapter) : - _adapter(adapter) - { - } - - TopicDict::value_type - operator()(TopicIMap::value_type p) - { - Ice::Identity id; - id.name = p.first; - return TopicDict::value_type(p.first, TopicPrx::uncheckedCast(_adapter->createProxy(id))); - } + TopicDict::value_type + operator()(TopicIMap::value_type p) + { + Ice::Identity id; + id.name = p.first; + return TopicDict::value_type(p.first, TopicPrx::uncheckedCast(_adapter->createProxy(id))); + } - Ice::ObjectAdapterPtr _adapter; + Ice::ObjectAdapterPtr _adapter; }; TopicDict @@ -146,83 +149,12 @@ TopicManagerI::retrieveAll(const Ice::Current&) const TopicDict all; transform(_topicIMap.begin(), _topicIMap.end(), inserter(all, all.begin()), - TransformToTopicDict(_adapter)); + TransformToTopicDict(_topicAdapter)); return all; } void -TopicManagerI::subscribe(const QoS& qos, const Ice::ObjectPrx& subscriber, const Ice::Current&) -{ - IceUtil::Mutex::Lock sync(*this); - - Ice::Identity ident = subscriber->ice_getIdentity(); - if(_traceLevels->topicMgr > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat); - out << "Subscribe: " << Ice::identityToString(ident); - if(_traceLevels->topicMgr > 1) - { - out << " QoS: "; - for(QoS::const_iterator qi = qos.begin(); qi != qos.end() ; ++qi) - { - if(qi != qos.begin()) - { - out << ','; - } - out << '[' << qi->first << "," << qi->second << ']'; - } - } - } - - // - // Ensure that the identities category refers to an existing - // channel. - // - TopicIMap::iterator elem = _topicIMap.find(ident.category); - if(elem == _topicIMap.end()) - { - NoSuchTopic ex; - ex.name = ident.category; - throw ex; - } - - // - // Subscribe to the topic. - // - elem->second->subscribe(subscriber, qos); -} - -void -TopicManagerI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&) -{ - IceUtil::Mutex::Lock sync(*this); - - Ice::Identity ident = subscriber->ice_getIdentity(); - - if(_traceLevels->topicMgr > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat); - - out << "Unsubscribe: " << Ice::identityToString(ident); - } - - - TopicIMap::iterator elem = _topicIMap.find(ident.category); - if(elem != _topicIMap.end()) - { - elem->second->unsubscribe(subscriber); - } -} - -void -TopicManagerI::shutdown(const Ice::Current&) -{ - _flusher->stopFlushing(); - _communicator->shutdown(); -} - -void TopicManagerI::reap() { // @@ -251,7 +183,7 @@ TopicManagerI::reap() } void -TopicManagerI::installTopic(const std::string& message, const std::string& name, bool create) +TopicManagerI::installTopic(const string& message, const string& name, const string& type, bool create) { if(_traceLevels->topicMgr > 0) { @@ -259,21 +191,109 @@ TopicManagerI::installTopic(const std::string& message, const std::string& name, out << message << ' ' << name; } + // + // Prepend "topic-" to the topic name in order to form a + // unique name for the Freeze database. Since the name we + // supply to openDB is also used as a filename, we call + // getDatabaseName to obtain a name with any questionable + // filename characters converted to hex. + // // TODO: instance - // TODO: reserved names? // TODO: failure? cleanup database? - Freeze::DBPtr db = _dbEnv->openDB(name, create); + // + string dbName = "topic-" + getDatabaseName(name); + Freeze::DBPtr db = _dbEnv->openDB(dbName, create); // // Create topic implementation // - TopicIPtr topicI = new TopicI(_adapter, _traceLevels, name, _factory, db); + TopicIPtr topicI = new TopicI(_publishAdapter, _traceLevels, name, type, _factory, db); // // The identity is the name of the Topic. // Ice::Identity id; id.name = name; - _adapter->add(topicI, id); + _topicAdapter->add(topicI, id); _topicIMap.insert(TopicIMap::value_type(name, topicI)); } + +void +TopicManagerI::validateType(const string& type) +{ + bool fail = false; + const string::size_type len = type.size(); + string::size_type pos = type.find("::"); + if(pos != 0) + { + fail = true; + } + + bool checkAlpha = false; + while(!fail && pos < len) + { + if(checkAlpha) + { + if(!isalpha(type[pos])) + { + fail = true; + } + else + { + checkAlpha = false; + } + } + else if(type[pos] == ':') + { + pos++; + if(pos == len || type[pos] != ':') + { + fail = true; + } + else + { + checkAlpha = true; + } + } + else if(!isalnum(type[pos])) + { + fail = true; + } + pos++; + } + + if(checkAlpha) // type ended with "::" + { + fail = true; + } + + if(fail) + { + InvalidType ex; + ex.type = type; + throw ex; + } +} + +string +TopicManagerI::getDatabaseName(const string& name) +{ + string result; + result.reserve(name.size()); + + for(string::size_type i = 0; i < name.size(); i++) + { + if(isalnum(name[i]) || name[i] == '.' || name[i] == '-' || name[i] == '_') + { + result.push_back(name[i]); + } + else + { + ostringstream ostr; + ostr << '%' << hex << static_cast<int>(name[i]); + result.append(ostr.str()); + } + } + + return result; +} |