diff options
Diffstat (limited to 'cpp/src/IceStorm')
-rw-r--r-- | cpp/src/IceStorm/Admin.cpp | 310 | ||||
-rw-r--r-- | cpp/src/IceStorm/BatchFlusher.cpp | 110 | ||||
-rw-r--r-- | cpp/src/IceStorm/Instance.cpp | 20 | ||||
-rw-r--r-- | cpp/src/IceStorm/Parser.cpp | 550 | ||||
-rw-r--r-- | cpp/src/IceStorm/Parser.h | 4 | ||||
-rw-r--r-- | cpp/src/IceStorm/Service.cpp | 60 | ||||
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 504 | ||||
-rw-r--r-- | cpp/src/IceStorm/Subscriber.h | 14 | ||||
-rw-r--r-- | cpp/src/IceStorm/SubscriberPool.cpp | 422 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 484 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.h | 2 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.cpp | 52 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicManagerI.h | 6 |
13 files changed, 1269 insertions, 1269 deletions
diff --git a/cpp/src/IceStorm/Admin.cpp b/cpp/src/IceStorm/Admin.cpp index bd175b818d3..62ab8958c0a 100644 --- a/cpp/src/IceStorm/Admin.cpp +++ b/cpp/src/IceStorm/Admin.cpp @@ -39,17 +39,17 @@ void Client::usage() { cerr << "Usage: " << appName() << " [options] [file...]\n"; - cerr << - "Options:\n" - "-h, --help Show this message.\n" - "-v, --version Display the Ice version.\n" - "-DNAME Define NAME as 1.\n" - "-DNAME=DEF Define NAME as DEF.\n" - "-UNAME Remove any definition for NAME.\n" - "-IDIR Put DIR in the include file search path.\n" - "-e COMMANDS Execute COMMANDS.\n" - "-d, --debug Print debug messages.\n" - ; + cerr << + "Options:\n" + "-h, --help Show this message.\n" + "-v, --version Display the Ice version.\n" + "-DNAME Define NAME as 1.\n" + "-DNAME=DEF Define NAME as DEF.\n" + "-UNAME Remove any definition for NAME.\n" + "-IDIR Put DIR in the include file search path.\n" + "-e COMMANDS Execute COMMANDS.\n" + "-d, --debug Print debug messages.\n" + ; } int @@ -71,64 +71,64 @@ Client::run(int argc, char* argv[]) vector<string> args; try { - args = opts.parse(argc, (const char**)argv); + args = opts.parse(argc, (const char**)argv); } catch(const IceUtil::BadOptException& e) { cerr << e.reason << endl; - usage(); - return EXIT_FAILURE; + usage(); + return EXIT_FAILURE; } if(opts.isSet("help")) { - usage(); - return EXIT_SUCCESS; + usage(); + return EXIT_SUCCESS; } if(opts.isSet("version")) { - cout << ICE_STRING_VERSION << endl; - return EXIT_SUCCESS; + cout << ICE_STRING_VERSION << endl; + return EXIT_SUCCESS; } if(opts.isSet("D")) { - vector<string> optargs = opts.argVec("D"); - for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i) - { - cpp += " -D" + *i; - } + vector<string> optargs = opts.argVec("D"); + for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i) + { + cpp += " -D" + *i; + } } if(opts.isSet("U")) { - vector<string> optargs = opts.argVec("U"); - for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i) - { - cpp += " -U" + *i; - } + vector<string> optargs = opts.argVec("U"); + for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i) + { + cpp += " -U" + *i; + } } if(opts.isSet("I")) { - vector<string> optargs = opts.argVec("I"); - for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i) - { - cpp += " -I" + *i; - } + vector<string> optargs = opts.argVec("I"); + for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i) + { + cpp += " -I" + *i; + } } if(opts.isSet("e")) { - vector<string> optargs = opts.argVec("e"); - for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i) - { - commands += *i + ";"; - } + vector<string> optargs = opts.argVec("e"); + for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i) + { + commands += *i + ";"; + } } debug = opts.isSet("debug"); if(!args.empty() && !commands.empty()) { - cerr << appName() << ": `-e' option cannot be used if input files are given" << endl; - usage(); - return EXIT_FAILURE; + cerr << appName() << ": `-e' option cannot be used if input files are given" << endl; + usage(); + return EXIT_FAILURE; } // The complete set of Ice::Identity -> manager proxies. @@ -141,14 +141,14 @@ Client::run(int argc, char* argv[]) IceStorm::TopicManagerPrx defaultManager; if(!managerProxy.empty()) { - defaultManager = IceStorm::TopicManagerPrx::checkedCast(communicator()->stringToProxy(managerProxy)); - if(!defaultManager) - { - cerr << appName() << ": `" << managerProxy << "' is not running" << endl; - return EXIT_FAILURE; - } - managers.insert(map<Ice::Identity, IceStorm::TopicManagerPrx>::value_type( - defaultManager->ice_getIdentity(), defaultManager)); + defaultManager = IceStorm::TopicManagerPrx::checkedCast(communicator()->stringToProxy(managerProxy)); + if(!defaultManager) + { + cerr << appName() << ": `" << managerProxy << "' is not running" << endl; + return EXIT_FAILURE; + } + managers.insert(map<Ice::Identity, IceStorm::TopicManagerPrx>::value_type( + defaultManager->ice_getIdentity(), defaultManager)); } // @@ -156,134 +156,134 @@ Client::run(int argc, char* argv[]) // Ice::PropertyDict props = communicator()->getProperties()->getPropertiesForPrefix("IceStormAdmin.TopicManager."); { - for(Ice::PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p) - { - try - { - IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::uncheckedCast( - communicator()->stringToProxy(p->second)); - managers.insert(map<Ice::Identity, IceStorm::TopicManagerPrx>::value_type( - manager->ice_getIdentity(), manager)); - } - catch(const Ice::ProxyParseException&) - { - cerr << appName() << ": malformed proxy: " << p->second << endl; - return EXIT_FAILURE; - } - } - if(props.empty() && !defaultManager) - { - cerr << appName() << ": no manager proxies configured" << endl; - return EXIT_FAILURE; - } + for(Ice::PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p) + { + try + { + IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::uncheckedCast( + communicator()->stringToProxy(p->second)); + managers.insert(map<Ice::Identity, IceStorm::TopicManagerPrx>::value_type( + manager->ice_getIdentity(), manager)); + } + catch(const Ice::ProxyParseException&) + { + cerr << appName() << ": malformed proxy: " << p->second << endl; + return EXIT_FAILURE; + } + } + if(props.empty() && !defaultManager) + { + cerr << appName() << ": no manager proxies configured" << endl; + return EXIT_FAILURE; + } - if(!defaultManager) - { - string managerProxy = properties->getProperty("IceStormAdmin.TopicManager.Default"); - if(!managerProxy.empty()) - { - defaultManager = IceStorm::TopicManagerPrx::uncheckedCast( - communicator()->stringToProxy(managerProxy)); - } - else - { - defaultManager = managers.begin()->second; - } - } + if(!defaultManager) + { + string managerProxy = properties->getProperty("IceStormAdmin.TopicManager.Default"); + if(!managerProxy.empty()) + { + defaultManager = IceStorm::TopicManagerPrx::uncheckedCast( + communicator()->stringToProxy(managerProxy)); + } + else + { + defaultManager = managers.begin()->second; + } + } } // Check slice checksums for each manager. { - for(map<Ice::Identity, IceStorm::TopicManagerPrx>::const_iterator p = managers.begin(); p != managers.end(); - ++p) - { - try - { - Ice::SliceChecksumDict serverChecksums = p->second->getSliceChecksums(); - Ice::SliceChecksumDict localChecksums = Ice::sliceChecksums(); - for(Ice::SliceChecksumDict::const_iterator q = localChecksums.begin(); q != localChecksums.end(); ++q) - { - Ice::SliceChecksumDict::const_iterator r = serverChecksums.find(q->first); - if(r == serverChecksums.end()) - { - cerr << appName() << ": " << communicator()->identityToString(p->first) - << " is using unknown Slice type `" << q->first << "'" << endl; - } - else if(q->second != r->second) - { - cerr << appName() << ": " << communicator()->identityToString(p->first) - << " is using a different Slice definition of `" << q->first << "'" << endl; - } - } - } - catch(const Ice::Exception& ex) - { - cerr << communicator()->identityToString(p->first) << ": " << ex << endl; - } - } + for(map<Ice::Identity, IceStorm::TopicManagerPrx>::const_iterator p = managers.begin(); p != managers.end(); + ++p) + { + try + { + Ice::SliceChecksumDict serverChecksums = p->second->getSliceChecksums(); + Ice::SliceChecksumDict localChecksums = Ice::sliceChecksums(); + for(Ice::SliceChecksumDict::const_iterator q = localChecksums.begin(); q != localChecksums.end(); ++q) + { + Ice::SliceChecksumDict::const_iterator r = serverChecksums.find(q->first); + if(r == serverChecksums.end()) + { + cerr << appName() << ": " << communicator()->identityToString(p->first) + << " is using unknown Slice type `" << q->first << "'" << endl; + } + else if(q->second != r->second) + { + cerr << appName() << ": " << communicator()->identityToString(p->first) + << " is using a different Slice definition of `" << q->first << "'" << endl; + } + } + } + catch(const Ice::Exception& ex) + { + cerr << communicator()->identityToString(p->first) << ": " << ex << endl; + } + } } - + ParserPtr p = Parser::createParser(communicator(), defaultManager, managers); int status = EXIT_SUCCESS; if(args.empty()) // No files given { - if(!commands.empty()) // Commands were given - { - int parseStatus = p->parse(commands, debug); - if(parseStatus == EXIT_FAILURE) - { - status = EXIT_FAILURE; - } - } - else // No commands, let's use standard input - { - p->showBanner(); + if(!commands.empty()) // Commands were given + { + int parseStatus = p->parse(commands, debug); + if(parseStatus == EXIT_FAILURE) + { + status = EXIT_FAILURE; + } + } + else // No commands, let's use standard input + { + p->showBanner(); - int parseStatus = p->parse(stdin, debug); - if(parseStatus == EXIT_FAILURE) - { - status = EXIT_FAILURE; - } - } + int parseStatus = p->parse(stdin, debug); + if(parseStatus == EXIT_FAILURE) + { + status = EXIT_FAILURE; + } + } } else // Process files given on the command line { - for(vector<string>::const_iterator i = args.begin(); i != args.end(); ++i) - { - ifstream test(i->c_str()); - if(!test) - { - cerr << appName() << ": can't open `" << *i << "' for reading: " << strerror(errno) << endl; - return EXIT_FAILURE; - } - test.close(); - - string cmd = cpp + " " + *i; + for(vector<string>::const_iterator i = args.begin(); i != args.end(); ++i) + { + ifstream test(i->c_str()); + if(!test) + { + cerr << appName() << ": can't open `" << *i << "' for reading: " << strerror(errno) << endl; + return EXIT_FAILURE; + } + test.close(); + + string cmd = cpp + " " + *i; #ifdef _WIN32 - FILE* cppHandle = _popen(cmd.c_str(), "r"); + FILE* cppHandle = _popen(cmd.c_str(), "r"); #else - FILE* cppHandle = popen(cmd.c_str(), "r"); + FILE* cppHandle = popen(cmd.c_str(), "r"); #endif - if(cppHandle == NULL) - { - cerr << appName() << ": can't run C++ preprocessor: " << strerror(errno) << endl; - return EXIT_FAILURE; - } - - int parseStatus = p->parse(cppHandle, debug); - + if(cppHandle == NULL) + { + cerr << appName() << ": can't run C++ preprocessor: " << strerror(errno) << endl; + return EXIT_FAILURE; + } + + int parseStatus = p->parse(cppHandle, debug); + #ifdef _WIN32 - _pclose(cppHandle); + _pclose(cppHandle); #else - pclose(cppHandle); + pclose(cppHandle); #endif - if(parseStatus == EXIT_FAILURE) - { - status = EXIT_FAILURE; - } - } + if(parseStatus == EXIT_FAILURE) + { + status = EXIT_FAILURE; + } + } } return status; diff --git a/cpp/src/IceStorm/BatchFlusher.cpp b/cpp/src/IceStorm/BatchFlusher.cpp index 699a145db87..d38c93ea649 100644 --- a/cpp/src/IceStorm/BatchFlusher.cpp +++ b/cpp/src/IceStorm/BatchFlusher.cpp @@ -24,8 +24,8 @@ using namespace std; BatchFlusher::BatchFlusher(const InstancePtr& instance) : _traceLevels(instance->traceLevels()), _flushTime(IceUtil::Time::milliSeconds( - max(instance->properties()->getPropertyAsIntWithDefault( - "IceStorm.Flush.Timeout", 1000), 100))), // Minimum of 100ms. + max(instance->properties()->getPropertyAsIntWithDefault( + "IceStorm.Flush.Timeout", 1000), 100))), // Minimum of 100ms. _destroy(false) { start(); @@ -45,7 +45,7 @@ BatchFlusher::add(const Ice::ObjectPrx& subscriber) // if(_subscribers.empty()) { - notify(); + notify(); } _subscribers.push_back(subscriber); } @@ -70,58 +70,58 @@ BatchFlusher::run() { for(;;) { - list<Ice::ObjectPrx> subscribers; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_destroy) - { - return; - } - if(_subscribers.empty()) - { - wait(); - } - else - { - timedWait(_flushTime); - } - if(_destroy) - { - return; - } - subscribers = _subscribers; - } + list<Ice::ObjectPrx> subscribers; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_destroy) + { + return; + } + if(_subscribers.empty()) + { + wait(); + } + else + { + timedWait(_flushTime); + } + if(_destroy) + { + return; + } + subscribers = _subscribers; + } - set<Ice::ConnectionPtr> flushSet; - for(list<Ice::ObjectPrx>::const_iterator p = subscribers.begin(); p != subscribers.end(); ++p) - { - Ice::ConnectionPtr connection = (*p)->ice_getCachedConnection(); - if(connection) - { - flushSet.insert(connection); - } - } - - for(set<Ice::ConnectionPtr>::const_iterator q = flushSet.begin(); q != flushSet.end(); ++q) - { - try - { - (*q)->flushBatchRequests(); - } - catch(const Ice::LocalException&) - { - // Ignore. - } - } - - // - // Trace after the flush so that the correct number of objects - // are displayed - // - if(_traceLevels->flush > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->flushCat); - out << "connections: " << flushSet.size() << " subscribers: " << subscribers.size(); - } + set<Ice::ConnectionPtr> flushSet; + for(list<Ice::ObjectPrx>::const_iterator p = subscribers.begin(); p != subscribers.end(); ++p) + { + Ice::ConnectionPtr connection = (*p)->ice_getCachedConnection(); + if(connection) + { + flushSet.insert(connection); + } + } + + for(set<Ice::ConnectionPtr>::const_iterator q = flushSet.begin(); q != flushSet.end(); ++q) + { + try + { + (*q)->flushBatchRequests(); + } + catch(const Ice::LocalException&) + { + // Ignore. + } + } + + // + // Trace after the flush so that the correct number of objects + // are displayed + // + if(_traceLevels->flush > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->flushCat); + out << "connections: " << flushSet.size() << " subscribers: " << subscribers.size(); + } } } diff --git a/cpp/src/IceStorm/Instance.cpp b/cpp/src/IceStorm/Instance.cpp index a5bcb3628d6..dac531d34f0 100644 --- a/cpp/src/IceStorm/Instance.cpp +++ b/cpp/src/IceStorm/Instance.cpp @@ -28,23 +28,23 @@ Instance::Instance( _adapter(adapter), _traceLevels(new TraceLevels(name, communicator->getProperties(), communicator->getLogger())), _discardInterval(IceUtil::Time::seconds(communicator->getProperties()->getPropertyAsIntWithDefault( - "IceStorm.Discard.Interval", 60))), // default one minute. + "IceStorm.Discard.Interval", 60))), // default one minute. // default one minute. _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault("IceStorm.Send.Timeout", 60 * 1000)) { try { - __setNoDelete(true); + __setNoDelete(true); - _batchFlusher = new BatchFlusher(this); - _subscriberPool = new SubscriberPool(this); + _batchFlusher = new BatchFlusher(this); + _subscriberPool = new SubscriberPool(this); } catch(...) { - shutdown(); - __setNoDelete(false); + shutdown(); + __setNoDelete(false); - throw; + throw; } __setNoDelete(false); } @@ -112,12 +112,12 @@ Instance::shutdown() { if(_batchFlusher) { - _batchFlusher->destroy(); - _batchFlusher->getThreadControl().join(); + _batchFlusher->destroy(); + _batchFlusher->getThreadControl().join(); } if(_subscriberPool) { - _subscriberPool->destroy(); + _subscriberPool->destroy(); } } diff --git a/cpp/src/IceStorm/Parser.cpp b/cpp/src/IceStorm/Parser.cpp index aed113d2aec..3933ce6bd5a 100644 --- a/cpp/src/IceStorm/Parser.cpp +++ b/cpp/src/IceStorm/Parser.cpp @@ -39,8 +39,8 @@ class UnknownManagerException : public Exception public: UnknownManagerException(const string& name, const char* file, int line) : - Exception(file, line), - name(name) + Exception(file, line), + name(name) { } @@ -51,18 +51,18 @@ public: virtual string ice_name() const { - return "UnknownManagerException"; + return "UnknownManagerException"; } virtual Exception* ice_clone() const { - return new UnknownManagerException(*this); + return new UnknownManagerException(*this); } virtual void ice_throw() const { - throw *this; + throw *this; } const string name; }; @@ -71,7 +71,7 @@ public: ParserPtr Parser::createParser(const CommunicatorPtr& communicator, const TopicManagerPrx& admin, - const map<Ice::Identity, IceStorm::TopicManagerPrx>& managers) + const map<Ice::Identity, IceStorm::TopicManagerPrx>& managers) { return new Parser(communicator, admin, managers); } @@ -102,18 +102,18 @@ Parser::create(const list<string>& args) try { - for(list<string>::const_iterator i = args.begin(); i != args.end() ; ++i) - { - string arg; - IceStorm::TopicManagerPrx manager = findManagerById(*i, arg); - manager->create(arg); - } + for(list<string>::const_iterator i = args.begin(); i != args.end() ; ++i) + { + string arg; + IceStorm::TopicManagerPrx manager = findManagerById(*i, arg); + manager->create(arg); + } } catch(const Exception& ex) { - ostringstream s; - s << ex; - error(s.str()); + ostringstream s; + s << ex; + error(s.str()); } } @@ -122,19 +122,19 @@ Parser::destroy(const list<string>& args) { try { - for(list<string>::const_iterator i = args.begin(); i != args.end() ; ++i) - { - string arg; - IceStorm::TopicManagerPrx manager = findManagerById(*i, arg); - TopicPrx topic = manager->retrieve(arg); - topic->destroy(); - } + for(list<string>::const_iterator i = args.begin(); i != args.end() ; ++i) + { + string arg; + IceStorm::TopicManagerPrx manager = findManagerById(*i, arg); + TopicPrx topic = manager->retrieve(arg); + topic->destroy(); + } } catch(const Exception& ex) { - ostringstream s; - s << ex; - error(s.str()); + ostringstream s; + s << ex; + error(s.str()); } } @@ -145,56 +145,56 @@ Parser::link(const list<string>& _args) if(args.size() < 2) { - error("`link' requires at least two arguments (type `help' for more info)"); - return; + error("`link' requires at least two arguments (type `help' for more info)"); + return; } try { - TopicPrx fromTopic; - TopicPrx toTopic; - - try - { - string arg; - IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg); - fromTopic = manager->retrieve(arg); - } - catch(const IceStorm::NoSuchTopic&) - { - ostringstream s; - s << args.front() << ": topic doesn't exist"; - error(s.str()); - return; - } - args.pop_front(); - - try - { - string arg; - IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg); - toTopic = manager->retrieve(arg); - } - catch(const IceStorm::NoSuchTopic&) - { - ostringstream s; - s << args.front() << ": topic doesn't exist"; - error(s.str()); - return; - } - args.pop_front(); - Ice::Int cost = 0; - if(!args.empty()) - { - cost = atoi(args.front().c_str()); - } - fromTopic->link(toTopic, cost); + TopicPrx fromTopic; + TopicPrx toTopic; + + try + { + string arg; + IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg); + fromTopic = manager->retrieve(arg); + } + catch(const IceStorm::NoSuchTopic&) + { + ostringstream s; + s << args.front() << ": topic doesn't exist"; + error(s.str()); + return; + } + args.pop_front(); + + try + { + string arg; + IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg); + toTopic = manager->retrieve(arg); + } + catch(const IceStorm::NoSuchTopic&) + { + ostringstream s; + s << args.front() << ": topic doesn't exist"; + error(s.str()); + return; + } + args.pop_front(); + Ice::Int cost = 0; + if(!args.empty()) + { + cost = atoi(args.front().c_str()); + } + fromTopic->link(toTopic, cost); } catch(const Exception& ex) { - ostringstream s; - s << ex; - error(s.str()); + ostringstream s; + s << ex; + error(s.str()); } } @@ -205,51 +205,51 @@ Parser::unlink(const list<string>& _args) if(args.size() != 2) { - error("`unlink' requires exactly two arguments (type `help' for more info)"); - return; + error("`unlink' requires exactly two arguments (type `help' for more info)"); + return; } try { - TopicPrx fromTopic; - TopicPrx toTopic; - - try - { - string arg; - IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg); - fromTopic = manager->retrieve(arg); - } - catch(const IceStorm::NoSuchTopic&) - { - ostringstream s; - s << args.front() << ": topic doesn't exist"; - error(s.str()); - return; - } - args.pop_front(); - - try - { - string arg; - IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg); - toTopic = manager->retrieve(arg); - } - catch(const IceStorm::NoSuchTopic&) - { - ostringstream s; - s << args.front() << ": topic doesn't exist"; - error(s.str()); - return; - } - - fromTopic->unlink(toTopic); + TopicPrx fromTopic; + TopicPrx toTopic; + + try + { + string arg; + IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg); + fromTopic = manager->retrieve(arg); + } + catch(const IceStorm::NoSuchTopic&) + { + ostringstream s; + s << args.front() << ": topic doesn't exist"; + error(s.str()); + return; + } + args.pop_front(); + + try + { + string arg; + IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg); + toTopic = manager->retrieve(arg); + } + catch(const IceStorm::NoSuchTopic&) + { + ostringstream s; + s << args.front() << ": topic doesn't exist"; + error(s.str()); + return; + } + + fromTopic->unlink(toTopic); } catch(const Exception& ex) { - ostringstream s; - s << ex; - error(s.str()); + ostringstream s; + s << ex; + error(s.str()); } } @@ -260,22 +260,22 @@ Parser::current(const list<string>& _args) if(args.size() == 0) { - cout << _communicator->identityToString(_defaultManager->ice_getIdentity()) << endl; - return; + cout << _communicator->identityToString(_defaultManager->ice_getIdentity()) << endl; + return; } try { - IceStorm::TopicManagerPrx manager = findManagerByCategory(args.front()); - manager->ice_ping(); - _defaultManager = manager; + IceStorm::TopicManagerPrx manager = findManagerByCategory(args.front()); + manager->ice_ping(); + _defaultManager = manager; } catch(const Exception& ex) { - ostringstream s; - s << args.front() << ": " << ex; - error(s.str()); - return; + ostringstream s; + s << args.front() << ": " << ex; + error(s.str()); + return; } } @@ -286,60 +286,60 @@ Parser::dolist(const list<string>& _args) try { - if(args.size() <= 1) - { - IceStorm::TopicManagerPrx manager; - if(args.size() == 1) - { - manager = findManagerByCategory(args.front()); - } - else - { - manager = _defaultManager; - } - TopicDict d = manager->retrieveAll(); - if(!d.empty()) - { - for(TopicDict::iterator i = d.begin(); i != d.end(); ++i) - { - if(i != d.begin()) - { - cout << ", "; - } - cout << i->first; - } - cout << endl; - } - } - else - { - IceStorm::TopicManagerPrx manager = findManagerByCategory(args.front()); - args.pop_front(); - while(!args.empty()) - { - try - { - string arg = args.front(); - args.pop_front(); - TopicPrx topic = manager->retrieve(arg); - LinkInfoSeq links = topic->getLinkInfoSeq(); - for(LinkInfoSeq::const_iterator p = links.begin(); p != links.end(); ++p) - { - cout << "\t" << (*p).name << " with cost " << (*p).cost << endl; - } - } - catch(const NoSuchTopic&) - { - cout << "\tNo such topic" << endl; - } - } - } + if(args.size() <= 1) + { + IceStorm::TopicManagerPrx manager; + if(args.size() == 1) + { + manager = findManagerByCategory(args.front()); + } + else + { + manager = _defaultManager; + } + TopicDict d = manager->retrieveAll(); + if(!d.empty()) + { + for(TopicDict::iterator i = d.begin(); i != d.end(); ++i) + { + if(i != d.begin()) + { + cout << ", "; + } + cout << i->first; + } + cout << endl; + } + } + else + { + IceStorm::TopicManagerPrx manager = findManagerByCategory(args.front()); + args.pop_front(); + while(!args.empty()) + { + try + { + string arg = args.front(); + args.pop_front(); + TopicPrx topic = manager->retrieve(arg); + LinkInfoSeq links = topic->getLinkInfoSeq(); + for(LinkInfoSeq::const_iterator p = links.begin(); p != links.end(); ++p) + { + cout << "\t" << (*p).name << " with cost " << (*p).cost << endl; + } + } + catch(const NoSuchTopic&) + { + cout << "\tNo such topic" << endl; + } + } + } } catch(const Exception& ex) { - ostringstream s; - s << ex; - error(s.str()); + ostringstream s; + s << ex; + error(s.str()); } } @@ -366,105 +366,105 @@ Parser::getInput(char* buf, int& result, int maxSize) { if(!_commands.empty()) { - if(_commands == ";") - { - result = 0; - } - else - { + if(_commands == ";") + { + result = 0; + } + else + { #if defined(_MSC_VER) && !defined(_STLP_MSVC) - // COMPILERBUG: Stupid Visual C++ defines min and max as macros - result = _MIN(maxSize, static_cast<int>(_commands.length())); + // COMPILERBUG: Stupid Visual C++ defines min and max as macros + result = _MIN(maxSize, static_cast<int>(_commands.length())); #else - result = min(maxSize, static_cast<int>(_commands.length())); + result = min(maxSize, static_cast<int>(_commands.length())); #endif - strncpy(buf, _commands.c_str(), result); - _commands.erase(0, result); - if(_commands.empty()) - { - _commands = ";"; - } - } + strncpy(buf, _commands.c_str(), result); + _commands.erase(0, result); + if(_commands.empty()) + { + _commands = ";"; + } + } } else if(isatty(fileno(yyin))) { #ifdef HAVE_READLINE const char* prompt = parser->getPrompt(); - char* line = readline(const_cast<char*>(prompt)); - if(!line) - { - result = 0; - } - else - { - if(*line) - { - add_history(line); - } - - result = strlen(line) + 1; - if(result > maxSize) - { - free(line); - error("input line too long"); - result = 0; - } - else - { - strcpy(buf, line); - strcat(buf, "\n"); - free(line); - } - } + char* line = readline(const_cast<char*>(prompt)); + if(!line) + { + result = 0; + } + else + { + if(*line) + { + add_history(line); + } + + result = strlen(line) + 1; + if(result > maxSize) + { + free(line); + error("input line too long"); + result = 0; + } + else + { + strcpy(buf, line); + strcat(buf, "\n"); + free(line); + } + } #else - cout << parser->getPrompt() << flush; - - string line; - while(true) - { - char c = static_cast<char>(getc(yyin)); - if(c == EOF) - { - if(line.size()) - { - line += '\n'; - } - break; - } - - line += c; - - if(c == '\n') - { - break; - } - } - - result = (int) line.length(); - if(result > maxSize) - { - error("input line too long"); - buf[0] = EOF; - result = 1; - } - else - { - strcpy(buf, line.c_str()); - } + cout << parser->getPrompt() << flush; + + string line; + while(true) + { + char c = static_cast<char>(getc(yyin)); + if(c == EOF) + { + if(line.size()) + { + line += '\n'; + } + break; + } + + line += c; + + if(c == '\n') + { + break; + } + } + + result = (int) line.length(); + if(result > maxSize) + { + error("input line too long"); + buf[0] = EOF; + result = 1; + } + else + { + strcpy(buf, line.c_str()); + } #endif } else { - if(((result = (int) fread(buf, 1, maxSize, yyin)) == 0) && ferror(yyin)) - { - error("input in flex scanner failed"); - buf[0] = EOF; - result = 1; - } + if(((result = (int) fread(buf, 1, maxSize, yyin)) == 0) && ferror(yyin)) + { + error("input in flex scanner failed"); + buf[0] = EOF; + result = 1; + } } } @@ -487,12 +487,12 @@ Parser::getPrompt() if(_continue) { - _continue = false; - return "(cont) "; + _continue = false; + return "(cont) "; } else { - return ">>> "; + return ">>> "; } } @@ -505,13 +505,13 @@ Parser::scanPosition(const char* s) idx = line.find("line"); if(idx != string::npos) { - line.erase(0, idx + 4); + line.erase(0, idx + 4); } idx = line.find_first_not_of(" \t\r#"); if(idx != string::npos) { - line.erase(0, idx); + line.erase(0, idx); } _currentLine = atoi(line.c_str()) - 1; @@ -519,24 +519,24 @@ Parser::scanPosition(const char* s) idx = line.find_first_of(" \t\r"); if(idx != string::npos) { - line.erase(0, idx); + line.erase(0, idx); } idx = line.find_first_not_of(" \t\r\""); if(idx != string::npos) { - line.erase(0, idx); + line.erase(0, idx); - idx = line.find_first_of(" \t\r\""); - if(idx != string::npos) - { - _currentFile = line.substr(0, idx); - line.erase(0, idx + 1); - } - else - { - _currentFile = line; - } + idx = line.find_first_of(" \t\r\""); + if(idx != string::npos) + { + _currentFile = line.substr(0, idx); + line.erase(0, idx + 1); + } + else + { + _currentFile = line; + } } } @@ -545,11 +545,11 @@ Parser::error(const char* s) { if(_commands.empty() && !isatty(fileno(yyin))) { - cerr << _currentFile << ':' << _currentLine << ": " << s << endl; + cerr << _currentFile << ':' << _currentLine << ": " << s << endl; } else { - cerr << "error: " << s << endl; + cerr << "error: " << s << endl; } _errors++; } @@ -565,11 +565,11 @@ Parser::warning(const char* s) { if(_commands.empty() && !isatty(fileno(yyin))) { - cerr << _currentFile << ':' << _currentLine << ": warning: " << s << endl; + cerr << _currentFile << ':' << _currentLine << ": warning: " << s << endl; } else { - cerr << "warning: " << s << endl; + cerr << "warning: " << s << endl; } } @@ -600,7 +600,7 @@ Parser::parse(FILE* file, bool debug) int status = yyparse(); if(_errors) { - status = EXIT_FAILURE; + status = EXIT_FAILURE; } parser = 0; @@ -628,7 +628,7 @@ Parser::parse(const std::string& commands, bool debug) int status = yyparse(); if(_errors) { - status = EXIT_FAILURE; + status = EXIT_FAILURE; } parser = 0; @@ -642,13 +642,13 @@ Parser::findManagerById(const string& full, string& arg) const arg = id.name; if(id.category.empty()) { - return _defaultManager; + return _defaultManager; } id.name = "TopicManager"; map<Ice::Identity, IceStorm::TopicManagerPrx>::const_iterator p = _managers.find(id); if(p == _managers.end()) { - throw UnknownManagerException(id.category, __FILE__, __LINE__); + throw UnknownManagerException(id.category, __FILE__, __LINE__); } return p->second; } @@ -662,13 +662,13 @@ Parser::findManagerByCategory(const string& full) const map<Ice::Identity, IceStorm::TopicManagerPrx>::const_iterator p = _managers.find(id); if(p == _managers.end()) { - throw UnknownManagerException(id.category, __FILE__, __LINE__); + throw UnknownManagerException(id.category, __FILE__, __LINE__); } return p->second; } Parser::Parser(const CommunicatorPtr& communicator, const TopicManagerPrx& admin, - const map<Ice::Identity, IceStorm::TopicManagerPrx>& managers) : + const map<Ice::Identity, IceStorm::TopicManagerPrx>& managers) : _communicator(communicator), _defaultManager(admin), _managers(managers) diff --git a/cpp/src/IceStorm/Parser.h b/cpp/src/IceStorm/Parser.h index be753288518..df8d77c0ea4 100644 --- a/cpp/src/IceStorm/Parser.h +++ b/cpp/src/IceStorm/Parser.h @@ -64,7 +64,7 @@ class Parser : public ::IceUtil::SimpleShared public: static ParserPtr createParser(const Ice::CommunicatorPtr&, const IceStorm::TopicManagerPrx&, - const std::map<Ice::Identity, IceStorm::TopicManagerPrx>&); + const std::map<Ice::Identity, IceStorm::TopicManagerPrx>&); void usage(); @@ -100,7 +100,7 @@ private: IceStorm::TopicManagerPrx findManagerByCategory(const std::string&) const; Parser(const Ice::CommunicatorPtr&, const IceStorm::TopicManagerPrx&, - const std::map<Ice::Identity, IceStorm::TopicManagerPrx>&); + const std::map<Ice::Identity, IceStorm::TopicManagerPrx>&); const Ice::CommunicatorPtr _communicator; IceStorm::TopicManagerPrx _defaultManager; diff --git a/cpp/src/IceStorm/Service.cpp b/cpp/src/IceStorm/Service.cpp index be83f1c46a2..bf5b93951c6 100644 --- a/cpp/src/IceStorm/Service.cpp +++ b/cpp/src/IceStorm/Service.cpp @@ -32,15 +32,15 @@ public: virtual ~ServiceI(); virtual void start(const string&, - const CommunicatorPtr&, - const StringSeq&); + const CommunicatorPtr&, + const StringSeq&); virtual void start(const CommunicatorPtr&, - const ObjectAdapterPtr&, - const ObjectAdapterPtr&, - const string&, - const Ice::Identity&, - const string&); + const ObjectAdapterPtr&, + const ObjectAdapterPtr&, + const string&, + const Ice::Identity&, + const string&); virtual TopicManagerPrx getTopicManager() const; @@ -70,11 +70,11 @@ createIceStorm(CommunicatorPtr communicator) ServicePtr IceStorm::Service::create(const CommunicatorPtr& communicator, - const ObjectAdapterPtr& topicAdapter, - const ObjectAdapterPtr& publishAdapter, - const string& name, - const Ice::Identity& id, - const string& dbEnv) + const ObjectAdapterPtr& topicAdapter, + const ObjectAdapterPtr& publishAdapter, + const string& name, + const Ice::Identity& id, + const string& dbEnv) { ServiceI* service = new ServiceI; ServicePtr svc = service; @@ -113,26 +113,26 @@ IceStorm::ServiceI::start( try { - _manager = new TopicManagerI(_instance, _topicAdapter, name, "topics"); - _managerProxy = TopicManagerPrx::uncheckedCast(_topicAdapter->add(_manager, topicManagerId)); + _manager = new TopicManagerI(_instance, _topicAdapter, name, "topics"); + _managerProxy = TopicManagerPrx::uncheckedCast(_topicAdapter->add(_manager, topicManagerId)); } catch(const Ice::Exception&) { - _instance = 0; - throw; + _instance = 0; + throw; } - + _topicAdapter->activate(); _publishAdapter->activate(); } void IceStorm::ServiceI::start(const CommunicatorPtr& communicator, - const ObjectAdapterPtr& topicAdapter, - const ObjectAdapterPtr& publishAdapter, - const string& name, - const Ice::Identity& id, - const string& dbEnv) + const ObjectAdapterPtr& topicAdapter, + const ObjectAdapterPtr& publishAdapter, + const string& name, + const Ice::Identity& id, + const string& dbEnv) { string instanceName = communicator->getProperties()->getPropertyWithDefault(name + ".InstanceName", "IceStorm"); _instance = new Instance(instanceName, name, communicator, publishAdapter); @@ -142,13 +142,13 @@ IceStorm::ServiceI::start(const CommunicatorPtr& communicator, // try { - _manager = new TopicManagerI(_instance, topicAdapter, dbEnv, "topics"); - _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(_manager, id)); + _manager = new TopicManagerI(_instance, topicAdapter, dbEnv, "topics"); + _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(_manager, id)); } catch(const Ice::Exception&) { - _instance = 0; - throw; + _instance = 0; + throw; } } @@ -163,20 +163,20 @@ IceStorm::ServiceI::stop() { if(_topicAdapter) { - _topicAdapter->deactivate(); + _topicAdapter->deactivate(); } if(_publishAdapter) { - _publishAdapter->deactivate(); + _publishAdapter->deactivate(); } if(_topicAdapter) { - _topicAdapter->waitForDeactivate(); + _topicAdapter->waitForDeactivate(); } if(_publishAdapter) { - _publishAdapter->waitForDeactivate(); + _publishAdapter->waitForDeactivate(); } // diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index 9174eed5949..3162116566a 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -37,7 +37,7 @@ class PerSubscriberPublisherI : public Ice::BlobjectArray public: PerSubscriberPublisherI(const InstancePtr& instance) : - _instance(instance) + _instance(instance) { } @@ -48,36 +48,36 @@ public: void setSubscriber(const SubscriberPtr& subscriber) { - _subscriber = subscriber; + _subscriber = subscriber; } virtual bool ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams, - vector<Ice::Byte>&, - const Ice::Current& current) + vector<Ice::Byte>&, + const Ice::Current& current) { - EventDataPtr event = new EventData( - current.operation, - current.mode, - Ice::ByteSeq(), - current.ctx); + EventDataPtr event = new EventData( + current.operation, + current.mode, + Ice::ByteSeq(), + current.ctx); - // - // COMPILERBUG: gcc 4.0.1 doesn't like this. - // - //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); - Ice::ByteSeq data(inParams.first, inParams.second); - event->data.swap(data); + // + // COMPILERBUG: gcc 4.0.1 doesn't like this. + // + //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + Ice::ByteSeq data(inParams.first, inParams.second); + event->data.swap(data); - EventDataSeq e; - e.push_back(event); - Subscriber::QueueState state = _subscriber->queue(false, e); + EventDataSeq e; + e.push_back(event); + Subscriber::QueueState state = _subscriber->queue(false, e); - if(state == Subscriber::QueueStateFlush) - { - _instance->subscriberPool()->flush(_subscriber); - } - return true; + if(state == Subscriber::QueueStateFlush) + { + _instance->subscriberPool()->flush(_subscriber); + } + return true; } private: @@ -188,7 +188,7 @@ SubscriberOneway::SubscriberOneway( if(_batch) { - _instance->batchFlusher()->add(_obj); + _instance->batchFlusher()->add(_obj); } } @@ -202,66 +202,66 @@ SubscriberOneway::flush() // if(_state == SubscriberStateError) { - return false; + return false; } assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); try { - // - // Get the current set of events, but release the lock before - // attempting to deliver the events. This allows other threads - // to add events in case we block (such as during connection - // establishment). - // - EventDataSeq v; - v.swap(_events); - sync.release(); - - // - // Deliver the events without holding the lock. - // - // If there are more than one event queued and we are not in - // batch sending mode then send the events as a batch and then - // flush immediately, otherwise send one at a time. - // - vector<Ice::Byte> dummy; - if(v.size() > 1 && !_batch) - { - for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) - { - _objBatch->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); - } - Ice::ConnectionPtr conn = _objBatch->ice_getCachedConnection(); - assert(conn); - conn->flushBatchRequests(); - } - else - { - for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) - { - _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); - } - } - - // - // Reacquire the lock before we check the queue again. - // - sync.acquire(); + // + // Get the current set of events, but release the lock before + // attempting to deliver the events. This allows other threads + // to add events in case we block (such as during connection + // establishment). + // + EventDataSeq v; + v.swap(_events); + sync.release(); + + // + // Deliver the events without holding the lock. + // + // If there are more than one event queued and we are not in + // batch sending mode then send the events as a batch and then + // flush immediately, otherwise send one at a time. + // + vector<Ice::Byte> dummy; + if(v.size() > 1 && !_batch) + { + for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) + { + _objBatch->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); + } + Ice::ConnectionPtr conn = _objBatch->ice_getCachedConnection(); + assert(conn); + conn->flushBatchRequests(); + } + else + { + for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) + { + _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); + } + } + + // + // Reacquire the lock before we check the queue again. + // + sync.acquire(); } catch(const Ice::LocalException& ex) { - assert(!sync.acquired()); - // error will re-acquire and release the lock. - error(ex); - return false; + assert(!sync.acquired()); + // error will re-acquire and release the lock. + error(ex); + return false; } if(!_events.empty()) { - assert(_state == SubscriberStateFlushPending); - return true; + assert(_state == SubscriberStateFlushPending); + return true; } _state = SubscriberStateOnline; return false; @@ -272,7 +272,7 @@ SubscriberOneway::destroy() { if(_batch) { - _instance->batchFlusher()->remove(_obj); + _instance->batchFlusher()->remove(_obj); } Subscriber::destroy(); } @@ -285,7 +285,7 @@ class TwowayInvokeI : public Ice::AMI_Object_ice_invoke public: TwowayInvokeI(const SubscriberPtr& subscriber) : - _subscriber(subscriber) + _subscriber(subscriber) { } @@ -297,7 +297,7 @@ public: virtual void ice_exception(const Ice::Exception& e) { - _subscriber->error(e); + _subscriber->error(e); } private: @@ -326,7 +326,7 @@ SubscriberTwoway::flush() // if(_state == SubscriberStateError) { - return false; + return false; } assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); @@ -346,7 +346,7 @@ SubscriberTwoway::flush() // for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) { - _obj->ice_invoke_async(new TwowayInvokeI(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context); + _obj->ice_invoke_async(new TwowayInvokeI(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context); } // @@ -360,8 +360,8 @@ SubscriberTwoway::flush() // if(!_events.empty()) { - assert(_state == SubscriberStateFlushPending); - return true; + assert(_state == SubscriberStateFlushPending); + return true; } _state = SubscriberStateOnline; return false; @@ -375,20 +375,20 @@ class TwowayOrderedInvokeI : public Ice::AMI_Object_ice_invoke public: TwowayOrderedInvokeI(const SubscriberTwowayOrderedPtr& subscriber) : - _subscriber(subscriber) + _subscriber(subscriber) { } virtual void ice_response(bool, const std::vector<Ice::Byte>&) { - _subscriber->response(); + _subscriber->response(); } virtual void ice_exception(const Ice::Exception& ex) { - _subscriber->error(ex); + _subscriber->error(ex); } private: @@ -412,20 +412,20 @@ SubscriberTwowayOrdered::flush() { EventDataPtr e; { - IceUtil::Mutex::Lock sync(_mutex); - - // - // If the subscriber errored out then we're done. - // - if(_state == SubscriberStateError) - { - return false; - } - assert(_state == SubscriberStateFlushPending); - assert(!_events.empty()); - - e = _events.front(); - _events.erase(_events.begin()); + IceUtil::Mutex::Lock sync(_mutex); + + // + // If the subscriber errored out then we're done. + // + if(_state == SubscriberStateError) + { + return false; + } + assert(_state == SubscriberStateFlushPending); + assert(!_events.empty()); + + e = _events.front(); + _events.erase(_events.begin()); } _obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context); @@ -441,8 +441,8 @@ SubscriberTwowayOrdered::response() assert(_state != SubscriberStateError); if(_events.empty()) { - _state = SubscriberStateOnline; - return; + _state = SubscriberStateOnline; + return; } _instance->subscriberPool()->flush(this); } @@ -455,31 +455,31 @@ class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward public: Topiclink_forwardI(const SubscriberLinkPtr& subscriber) : - _subscriber(subscriber) + _subscriber(subscriber) { } virtual void ice_response() { - _subscriber->response(); + _subscriber->response(); } virtual void ice_exception(const Ice::Exception& ex) { - try - { - ex.ice_throw(); - } - catch(const Ice::ObjectNotExistException& ex) - { - _subscriber->error(ex); - } - catch(const Ice::LocalException& ex) - { - _subscriber->offline(ex); - } + try + { + ex.ice_throw(); + } + catch(const Ice::ObjectNotExistException& ex) + { + _subscriber->error(ex); + } + catch(const Ice::LocalException& ex) + { + _subscriber->offline(ex); + } } private: @@ -505,7 +505,7 @@ SubscriberLink::queue(bool forwarded, const EventDataSeq& events) { if(forwarded) { - return QueueStateNoFlush; + return QueueStateNoFlush; } // @@ -518,7 +518,7 @@ SubscriberLink::queue(bool forwarded, const EventDataSeq& events) if(_state == SubscriberStateError) { - return QueueStateError; + return QueueStateError; } // @@ -527,53 +527,53 @@ SubscriberLink::queue(bool forwarded, const EventDataSeq& events) // if(_state == SubscriberStateOffline) { - // - // If there are alot of subscribers offline then we will call - // Time::now() alot, which could be costly. This could be - // optimized to only one per event-batch by making the - // forwarded argument an EventInfo thing where the queue-time - // is lazy initialized. - // - if(IceUtil::Time::now() < _next) - { - return QueueStateNoFlush; - } - - // - // State transition to online. - // - _state = SubscriberStateOnline; + // + // If there are alot of subscribers offline then we will call + // Time::now() alot, which could be costly. This could be + // optimized to only one per event-batch by making the + // forwarded argument an EventInfo thing where the queue-time + // is lazy initialized. + // + if(IceUtil::Time::now() < _next) + { + return QueueStateNoFlush; + } + + // + // State transition to online. + // + _state = SubscriberStateOnline; } int queued = 0; for(EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p) { - if(_cost != 0) - { - // - // Note that we could calculate this cost once and cache - // it in a private form of the event to avoid this if this - // really is a performance problem (this could use the - // EventInfo thing discussed above). - // - int cost = 0; - Ice::Context::const_iterator q = (*p)->context.find("cost"); - if(q != (*p)->context.end()) - { - cost = atoi(q->second.c_str()); - } - if(cost > _cost) - { - continue; - } - } - ++queued; - _events.push_back(*p); + if(_cost != 0) + { + // + // Note that we could calculate this cost once and cache + // it in a private form of the event to avoid this if this + // really is a performance problem (this could use the + // EventInfo thing discussed above). + // + int cost = 0; + Ice::Context::const_iterator q = (*p)->context.find("cost"); + if(q != (*p)->context.end()) + { + cost = atoi(q->second.c_str()); + } + if(cost > _cost) + { + continue; + } + } + ++queued; + _events.push_back(*p); } if(_state == SubscriberStateFlushPending || queued == 0) { - return QueueStateNoFlush; + return QueueStateNoFlush; } _state = SubscriberStateFlushPending; return QueueStateFlush; @@ -584,20 +584,20 @@ SubscriberLink::flush() { EventDataSeq v; { - IceUtil::Mutex::Lock sync(_mutex); - - // - // If the subscriber errored out then we're done. - // - if(_state == SubscriberStateError) - { - return false; - } + IceUtil::Mutex::Lock sync(_mutex); + + // + // If the subscriber errored out then we're done. + // + if(_state == SubscriberStateError) + { + return false; + } - assert(_state == SubscriberStateFlushPending); - assert(!_events.empty()); - - v.swap(_events); + assert(_state == SubscriberStateFlushPending); + assert(!_events.empty()); + + v.swap(_events); } _obj->forward_async(new Topiclink_forwardI(this), v); @@ -622,8 +622,8 @@ SubscriberLink::response() // if(_events.empty()) { - _state = SubscriberStateOnline; - return; + _state = SubscriberStateOnline; + return; } _instance->subscriberPool()->flush(this); } @@ -639,18 +639,18 @@ SubscriberLink::offline(const Ice::Exception& e) TraceLevelsPtr traceLevels = _instance->traceLevels(); if(_warn) { - Ice::Warning warn(traceLevels->logger); - warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_id) - << ": link offline: " << e; + Ice::Warning warn(traceLevels->logger); + warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_id) + << ": link offline: " << e; } else { - if(traceLevels->subscriber > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << _instance->communicator()->identityToString(_id) << ": link offline: " << e - << " discarding events: " << _instance->discardInterval() << "s"; - } + if(traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << _instance->communicator()->identityToString(_id) << ": link offline: " << e + << " discarding events: " << _instance->discardInterval() << "s"; + } } _state = SubscriberStateOffline; @@ -675,57 +675,57 @@ Subscriber::create( try { - string reliability; - QoS::const_iterator p = qos.find("reliability"); - if(p != qos.end()) - { - reliability = p->second; - } - if(!reliability.empty() && reliability != "ordered") - { - throw BadQoS("invalid reliability: " + reliability); - } - - // - // Override the timeout. - // - Ice::ObjectPrx newObj; - try - { - newObj = obj->ice_timeout(instance->sendTimeout()); - } - catch(const Ice::FixedProxyException&) - { - // - // In the event IceStorm is collocated this could be a - // fixed proxy in which case its not possible to set the - // timeout. - // - newObj = obj; - } - if(reliability == "ordered") - { - if(!newObj->ice_isTwoway()) - { - throw BadQoS("ordered reliability requires a twoway proxy"); - } - subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj); - } - else if(newObj->ice_isOneway() || newObj->ice_isDatagram() || - newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram()) - { - subscriber = new SubscriberOneway(instance, proxy, newObj); - } - else if(newObj->ice_isTwoway()) - { - subscriber = new SubscriberTwoway(instance, proxy, newObj); - } - per->setSubscriber(subscriber); + string reliability; + QoS::const_iterator p = qos.find("reliability"); + if(p != qos.end()) + { + reliability = p->second; + } + if(!reliability.empty() && reliability != "ordered") + { + throw BadQoS("invalid reliability: " + reliability); + } + + // + // Override the timeout. + // + Ice::ObjectPrx newObj; + try + { + newObj = obj->ice_timeout(instance->sendTimeout()); + } + catch(const Ice::FixedProxyException&) + { + // + // In the event IceStorm is collocated this could be a + // fixed proxy in which case its not possible to set the + // timeout. + // + newObj = obj; + } + if(reliability == "ordered") + { + if(!newObj->ice_isTwoway()) + { + throw BadQoS("ordered reliability requires a twoway proxy"); + } + subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj); + } + else if(newObj->ice_isOneway() || newObj->ice_isDatagram() || + newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram()) + { + subscriber = new SubscriberOneway(instance, proxy, newObj); + } + else if(newObj->ice_isTwoway()) + { + subscriber = new SubscriberTwoway(instance, proxy, newObj); + } + per->setSubscriber(subscriber); } catch(const Ice::Exception&) { - instance->objectAdapter()->remove(proxy->ice_getIdentity()); - throw; + instance->objectAdapter()->remove(proxy->ice_getIdentity()); + throw; } return subscriber; @@ -738,9 +738,9 @@ Subscriber::create( int cost) { return new SubscriberLink( - instance, - TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())), - cost); + instance, + TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())), + cost); } Subscriber::~Subscriber() @@ -772,13 +772,13 @@ Subscriber::queue(bool, const EventDataSeq& events) if(_state == SubscriberStateError) { - return QueueStateError; + return QueueStateError; } copy(events.begin(), events.end(), back_inserter(_events)); if(_state == SubscriberStateFlushPending) { - return QueueStateNoFlush; + return QueueStateNoFlush; } _state = SubscriberStateFlushPending; @@ -793,18 +793,18 @@ Subscriber::destroy() // if(_proxy) { - try - { - _instance->objectAdapter()->remove(_proxy->ice_getIdentity()); - } - catch(const Ice::NotRegisteredException&) - { - // Ignore - } - catch(const Ice::ObjectAdapterDeactivatedException&) - { - // Ignore - } + try + { + _instance->objectAdapter()->remove(_proxy->ice_getIdentity()); + } + catch(const Ice::NotRegisteredException&) + { + // Ignore + } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + // Ignore + } } } @@ -813,9 +813,9 @@ Subscriber::flushTime(const IceUtil::Time& interval) { if(_resetMax || interval > _maxSend) { - assert(interval != IceUtil::Time()); - _resetMax = false; - _maxSend = interval; + assert(interval != IceUtil::Time()); + _resetMax = false; + _maxSend = interval; } } @@ -833,15 +833,15 @@ Subscriber::error(const Ice::Exception& e) IceUtil::Mutex::Lock sync(_mutex); if(_state != SubscriberStateError) { - _state = SubscriberStateError; - _events.clear(); - - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->subscriber > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << _instance->communicator()->identityToString(_id) << ": topic publish failed: " << e; - } + _state = SubscriberStateError; + _events.clear(); + + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << _instance->communicator()->identityToString(_id) << ": topic publish failed: " << e; + } } } diff --git a/cpp/src/IceStorm/Subscriber.h b/cpp/src/IceStorm/Subscriber.h index bbe83750874..166362f1128 100644 --- a/cpp/src/IceStorm/Subscriber.h +++ b/cpp/src/IceStorm/Subscriber.h @@ -36,9 +36,9 @@ public: enum QueueState { - QueueStateError, - QueueStateFlush, - QueueStateNoFlush + QueueStateError, + QueueStateFlush, + QueueStateNoFlush }; virtual QueueState queue(bool, const std::vector<EventDataPtr>&); // @@ -70,10 +70,10 @@ protected: enum SubscriberState { - SubscriberStateOnline, - SubscriberStateFlushPending, - SubscriberStateOffline, - SubscriberStateError + SubscriberStateOnline, + SubscriberStateFlushPending, + SubscriberStateOffline, + SubscriberStateError }; SubscriberState _state; // The subscriber state. EventDataSeq _events; // The queue of events to send. diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp index 42c92fd97cf..1d1fd773028 100644 --- a/cpp/src/IceStorm/SubscriberPool.cpp +++ b/cpp/src/IceStorm/SubscriberPool.cpp @@ -27,9 +27,9 @@ class SubscriberPoolWorker : public IceUtil::Thread public: SubscriberPoolWorker(const SubscriberPoolPtr& manager) : - _manager(manager) + _manager(manager) { - start(); + start(); } ~SubscriberPoolWorker() @@ -39,35 +39,35 @@ public: virtual void run() { - IceUtil::Time interval = IceUtil::Time::seconds(24 * 60); // A long time. - SubscriberPtr sub; - bool requeue = false; - bool computeInterval = false; - while(true) - { - _manager->dequeue(sub, requeue, interval, computeInterval); - if(!sub) - { - return; - } - - // - // If SubscriberPool returns true then the subscriber - // needs to be SubscriberPooled again, so therefore we - // will re-enqueue the subscriber in the call to dequeue. - // - if(computeInterval) - { - IceUtil::Time start = IceUtil::Time::now(); - requeue = sub->flush(); - interval = IceUtil::Time::now() - start; - } - else - { - requeue = sub->flush(); - interval = IceUtil::Time::seconds(24 * 60); // A long time. - } - } + IceUtil::Time interval = IceUtil::Time::seconds(24 * 60); // A long time. + SubscriberPtr sub; + bool requeue = false; + bool computeInterval = false; + while(true) + { + _manager->dequeue(sub, requeue, interval, computeInterval); + if(!sub) + { + return; + } + + // + // If SubscriberPool returns true then the subscriber + // needs to be SubscriberPooled again, so therefore we + // will re-enqueue the subscriber in the call to dequeue. + // + if(computeInterval) + { + IceUtil::Time start = IceUtil::Time::now(); + requeue = sub->flush(); + interval = IceUtil::Time::now() - start; + } + else + { + requeue = sub->flush(); + interval = IceUtil::Time::seconds(24 * 60); // A long time. + } + } } private: @@ -96,32 +96,32 @@ SubscriberPoolMonitor::run() for(;;) { { - Lock sync(*this); - if(_destroyed) - { - return; - } + Lock sync(*this); + if(_destroyed) + { + return; + } - if(_needCheck) - { - timedWait(_timeout); - // - // Monitoring was stopped. - // - if(!_needCheck) - { - continue; - } - if(_destroyed) - { - return; - } - } - else - { - wait(); - continue; - } + if(_needCheck) + { + timedWait(_timeout); + // + // Monitoring was stopped. + // + if(!_needCheck) + { + continue; + } + if(_destroyed) + { + return; + } + } + else + { + wait(); + continue; + } } // // Call outside of the lock to prevent any deadlocks. @@ -163,7 +163,7 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) : _size(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.Size", 1)), // minimum 50ms, default 1s. _timeout(IceUtil::Time::milliSeconds(max(instance->properties()->getPropertyAsIntWithDefault( - "IceStorm.SubscriberPool.Timeout", 1000), 50))), + "IceStorm.SubscriberPool.Timeout", 1000), 50))), // 10 * the stall timeout. _stallCheck(_timeout * 10), _destroyed(false), @@ -171,23 +171,23 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) : { try { - __setNoDelete(true); - _subscriberPoolMonitor = new SubscriberPoolMonitor(this, _timeout); - for(unsigned int i = 0; i < _size; ++i) - { - ++_inUse; - _workers.push_back(new SubscriberPoolWorker(this)); - } + __setNoDelete(true); + _subscriberPoolMonitor = new SubscriberPoolMonitor(this, _timeout); + for(unsigned int i = 0; i < _size; ++i) + { + ++_inUse; + _workers.push_back(new SubscriberPoolWorker(this)); + } } catch(const IceUtil::Exception& ex) { - { - Ice::Error out(_traceLevels->logger); - out << "SubscriberPool: " << ex; - } - destroy(); - __setNoDelete(false); - throw; + { + Ice::Error out(_traceLevels->logger); + out << "SubscriberPool: " << ex; + } + destroy(); + __setNoDelete(false); + throw; } __setNoDelete(false); @@ -203,7 +203,7 @@ SubscriberPool::flush(list<SubscriberPtr>& subscribers) Lock sync(*this); if(_destroyed) { - return; + return; } // // Splice on the new set of subscribers to SubscriberPool. @@ -219,7 +219,7 @@ SubscriberPool::flush(const SubscriberPtr& subscriber) Lock sync(*this); if(_destroyed) { - return; + return; } _pending.push_back(subscriber); assert(invariants()); @@ -232,7 +232,7 @@ SubscriberPool::add(const SubscriberPtr& subscriber) Lock sync(*this); if(_destroyed) { - return; + return; } _subscribers.push_back(subscriber); assert(invariants()); @@ -244,7 +244,7 @@ SubscriberPool::remove(const SubscriberPtr& subscriber) Lock sync(*this); if(_destroyed) { - return; + return; } // // Note that this cannot remove based on the subscriber id because @@ -267,18 +267,18 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: if(_destroyed) { - subscriber = 0; - return; + subscriber = 0; + return; } if(subscriber) { - if(requeue) - { - _pending.push_back(subscriber); - assert(invariants()); - } - subscriber->flushTime(interval); + if(requeue) + { + _pending.push_back(subscriber); + assert(invariants()); + } + subscriber->flushTime(interval); } // // Clear the reference. @@ -297,99 +297,99 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: if(_sizeMax != 1) { - // - // Reap dead workers, if necessary. - // - if(_reap > 0) - { - if(_traceLevels->subscriberPool > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "reaping: " << _reap << " workers"; - } - list<IceUtil::ThreadPtr>::iterator p = _workers.begin(); - while(p != _workers.end() && _reap > 0) - { - if(!(*p)->isAlive()) - { - (*p)->getThreadControl().join(); - p = _workers.erase(p); - --_reap; - } - else - { - ++p; - } - } - } - - // - // If we have extra workers every _stallCheck period we run - // through the complete set of subscribers and determine how - // many have stalled since the last check. If this number is - // less than the number of extra threads then we terminate the - // calling worker. - // - // - The flush time is protected by the subscriber pool mutex. - // - The flush time is only computed if we have extra threads, - // otherwise it is set to some large value. - // - The max flush time is reset to the next sending interval - // after after _stallCheck period. - // - Every subscriber is considered to be stalled iff it has - // never sent an event or we have just created the first - // additional worker. The first handles the case where a - // subscriber stalls for a long time on the first message - // send. The second means that we can disable computation of - // the flush latency if there are no additional threads. - // - if(_workers.size() > _size) - { - IceUtil::Time now = IceUtil::Time::now(); - if(now - _lastStallCheck > _stallCheck) - { - _lastStallCheck = now; - unsigned int stalls = 0; - for(list<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) - { - if((*p)->pollMaxFlushTime(now) > _timeout) - { - ++stalls; - } - } - - if(_traceLevels->subscriberPool > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "checking stalls. extra workers: " << _workers.size() - _size - << " subscribers: " << _subscribers.size() << " stalls: " << stalls; - } - - if((_workers.size() - _size) > stalls) - { - if(_traceLevels->subscriberPool > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "destroying workers"; - } - ++_reap; - return; - } - } - } + // + // Reap dead workers, if necessary. + // + if(_reap > 0) + { + if(_traceLevels->subscriberPool > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); + out << "reaping: " << _reap << " workers"; + } + list<IceUtil::ThreadPtr>::iterator p = _workers.begin(); + while(p != _workers.end() && _reap > 0) + { + if(!(*p)->isAlive()) + { + (*p)->getThreadControl().join(); + p = _workers.erase(p); + --_reap; + } + else + { + ++p; + } + } + } + + // + // If we have extra workers every _stallCheck period we run + // through the complete set of subscribers and determine how + // many have stalled since the last check. If this number is + // less than the number of extra threads then we terminate the + // calling worker. + // + // - The flush time is protected by the subscriber pool mutex. + // - The flush time is only computed if we have extra threads, + // otherwise it is set to some large value. + // - The max flush time is reset to the next sending interval + // after after _stallCheck period. + // - Every subscriber is considered to be stalled iff it has + // never sent an event or we have just created the first + // additional worker. The first handles the case where a + // subscriber stalls for a long time on the first message + // send. The second means that we can disable computation of + // the flush latency if there are no additional threads. + // + if(_workers.size() > _size) + { + IceUtil::Time now = IceUtil::Time::now(); + if(now - _lastStallCheck > _stallCheck) + { + _lastStallCheck = now; + unsigned int stalls = 0; + for(list<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p) + { + if((*p)->pollMaxFlushTime(now) > _timeout) + { + ++stalls; + } + } + + if(_traceLevels->subscriberPool > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); + out << "checking stalls. extra workers: " << _workers.size() - _size + << " subscribers: " << _subscribers.size() << " stalls: " << stalls; + } + + if((_workers.size() - _size) > stalls) + { + if(_traceLevels->subscriberPool > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); + out << "destroying workers"; + } + ++_reap; + return; + } + } + } } - + while(_pending.empty() && !_destroyed) { - // - // If we wait then there is no need to monitor anymore. - // - _subscriberPoolMonitor->stopMonitor(); - wait(); + // + // If we wait then there is no need to monitor anymore. + // + _subscriberPoolMonitor->stopMonitor(); + wait(); } if(_destroyed) { - return; + return; } _lastDequeue = IceUtil::Time::now(); @@ -405,11 +405,11 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: // if(_inUse == _workers.size() && (_workers.size() < _sizeMax || _sizeMax != 1)) { - _subscriberPoolMonitor->startMonitor(); + _subscriberPoolMonitor->startMonitor(); } else { - _subscriberPoolMonitor->stopMonitor(); + _subscriberPoolMonitor->stopMonitor(); } // // We only need to compute the push interval if we've created @@ -429,22 +429,22 @@ SubscriberPool::destroy() // _destroyed is set. // { - Lock sync(*this); - _destroyed = true; - notifyAll(); - if(_subscriberPoolMonitor) - { - _subscriberPoolMonitor->destroy(); - } - _subscribers.clear(); - _pending.clear(); + Lock sync(*this); + _destroyed = true; + notifyAll(); + if(_subscriberPoolMonitor) + { + _subscriberPoolMonitor->destroy(); + } + _subscribers.clear(); + _pending.clear(); } // // Next join with each worker. // for(list<IceUtil::ThreadPtr>::const_iterator p = _workers.begin(); p != _workers.end(); ++p) { - (*p)->getThreadControl().join(); + (*p)->getThreadControl().join(); } _workers.clear(); @@ -455,8 +455,8 @@ SubscriberPool::destroy() // if(_subscriberPoolMonitor) { - _subscriberPoolMonitor->getThreadControl().join(); - _subscriberPoolMonitor = 0; + _subscriberPoolMonitor->getThreadControl().join(); + _subscriberPoolMonitor = 0; } } @@ -466,7 +466,7 @@ SubscriberPool::check() Lock sync(*this); if(_destroyed) { - return; + return; } IceUtil::Time now = IceUtil::Time::now(); @@ -474,34 +474,34 @@ SubscriberPool::check() /* if(_traceLevels->subscriberPool > 1) { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "check called: interval: " << interval << " timeout: " << _timeout - << " pending: " << _pending.size() << " running: " << _workers.size() - << " sizeMax: " << _sizeMax; + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); + out << "check called: interval: " << interval << " timeout: " << _timeout + << " pending: " << _pending.size() << " running: " << _workers.size() + << " sizeMax: " << _sizeMax; } */ if(interval > _timeout && _pending.size() > 0 && (_workers.size() < _sizeMax || _sizeMax == 0)) { - if(_traceLevels->subscriberPool > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); - out << "detected stall: creating thread: threads: " << _workers.size(); - } - - // - // We'll now start stall checking at regular intervals if this - // is the first newly created worker. Here we need to - // initially set the stall check and the number of requests at - // this point. - // - if(_workers.size() == _size) - { - _lastStallCheck = now; - } - - ++_inUse; - _workers.push_back(new SubscriberPoolWorker(this)); + if(_traceLevels->subscriberPool > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat); + out << "detected stall: creating thread: threads: " << _workers.size(); + } + + // + // We'll now start stall checking at regular intervals if this + // is the first newly created worker. Here we need to + // initially set the stall check and the number of requests at + // this point. + // + if(_workers.size() == _size) + { + _lastStallCheck = now; + } + + ++_inUse; + _workers.push_back(new SubscriberPoolWorker(this)); } } @@ -512,14 +512,14 @@ SubscriberPool::invariants() list<SubscriberPtr>::const_iterator p; for(p = _subscribers.begin(); p != _subscribers.end(); ++p) { - assert(subs.find(*p) == subs.end()); - subs.insert(*p); + assert(subs.find(*p) == subs.end()); + subs.insert(*p); } subs.clear(); for(p = _pending.begin(); p != _pending.end(); ++p) { - assert(subs.find(*p) == subs.end()); - subs.insert(*p); + assert(subs.find(*p) == subs.end()); + subs.insert(*p); } return true; } diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index a88bc1c1744..1831a746ac5 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -35,33 +35,33 @@ class PublisherI : public Ice::BlobjectArray public: PublisherI(const TopicIPtr& topic) : - _topic(topic) + _topic(topic) { } virtual bool ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams, - Ice::ByteSeq&, - const Ice::Current& current) + Ice::ByteSeq&, + const Ice::Current& current) { - EventDataPtr event = new EventData( - current.operation, - current.mode, - Ice::ByteSeq(), - current.ctx); + EventDataPtr event = new EventData( + current.operation, + current.mode, + Ice::ByteSeq(), + current.ctx); - // - // COMPILERBUG: gcc 4.0.1 doesn't like this. - // - //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); - Ice::ByteSeq data(inParams.first, inParams.second); - event->data.swap(data); - - EventDataSeq v; - v.push_back(event); - _topic->publish(false, v); + // + // COMPILERBUG: gcc 4.0.1 doesn't like this. + // + //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + Ice::ByteSeq data(inParams.first, inParams.second); + event->data.swap(data); + + EventDataSeq v; + v.push_back(event); + _topic->publish(false, v); - return true; + return true; } private: @@ -78,14 +78,14 @@ class TopicLinkI : public TopicLink public: TopicLinkI(const TopicIPtr& topic) : - _topic(topic) + _topic(topic) { } virtual void forward(const EventDataSeq& v, const Ice::Current& current) { - _topic->publish(true, v); + _topic->publish(true, v); } private: @@ -128,17 +128,17 @@ TopicI::TopicI( Ice::Identity linkid; if(id.category.empty()) { - pubid.category = _name; - pubid.name = "publish"; - linkid.category = _name; - linkid.name = "link"; + pubid.category = _name; + pubid.name = "publish"; + linkid.category = _name; + linkid.name = "link"; } else { - pubid.category = id.category; - pubid.name = _name + ".publish"; - linkid.category = id.category; - linkid.name = _name + ".link"; + pubid.category = id.category; + pubid.name = _name + ".publish"; + linkid.category = id.category; + linkid.name = _name + ".link"; } _publisherPrx = _instance->objectAdapter()->add(new PublisherI(this), pubid); @@ -149,20 +149,20 @@ TopicI::TopicI( // for(LinkRecordSeq::const_iterator p = _topicRecord.begin(); p != _topicRecord.end(); ++p) { - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " relink " << _instance->communicator()->identityToString(p->theTopic->ice_getIdentity()); - } - - // - // Create the subscriber object add it to the set of - // subscribers. - // - SubscriberPtr subscriber = Subscriber::create(_instance, p->obj, p->cost); - _subscribers.push_back(subscriber); - _instance->subscriberPool()->add(subscriber); + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << " relink " << _instance->communicator()->identityToString(p->theTopic->ice_getIdentity()); + } + + // + // Create the subscriber object add it to the set of + // subscribers. + // + SubscriberPtr subscriber = Subscriber::create(_instance, p->obj, p->cost); + _subscribers.push_back(subscriber); + _instance->subscriberPool()->add(subscriber); } } @@ -193,11 +193,11 @@ find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end, { while(start != end) { - if(*start == ident) - { - return start; - } - ++start; + if(*start == ident) + { + return start; + } + ++start; } return end; } @@ -211,74 +211,74 @@ TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Curr QoS qos = origQoS; if(traceLevels->topic > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "Subscribe: " << _instance->communicator()->identityToString(id); - if(traceLevels->topic > 1) - { - out << " QoS: "; - for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p) - { - if(p != qos.begin()) - { - out << ','; - } - out << '[' << p->first << "," << p->second << ']'; - } - } + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "Subscribe: " << _instance->communicator()->identityToString(id); + if(traceLevels->topic > 1) + { + out << " QoS: "; + for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p) + { + if(p != qos.begin()) + { + out << ','; + } + out << '[' << p->first << "," << p->second << ']'; + } + } } string reliability = "oneway"; { - QoS::iterator p = qos.find("reliability"); - if(p != qos.end()) - { - reliability = p->second; - qos.erase(p); - } + QoS::iterator p = qos.find("reliability"); + if(p != qos.end()) + { + reliability = p->second; + qos.erase(p); + } } Ice::ObjectPrx newObj = obj; if(reliability == "batch") { - if(newObj->ice_isDatagram()) - { - newObj = newObj->ice_batchDatagram(); - } - else - { - newObj = newObj->ice_batchOneway(); - } + if(newObj->ice_isDatagram()) + { + newObj = newObj->ice_batchDatagram(); + } + else + { + newObj = newObj->ice_batchOneway(); + } } else if(reliability == "twoway") { - newObj = newObj->ice_twoway(); + newObj = newObj->ice_twoway(); } else if(reliability == "twoway ordered") { - qos["reliability"] = "ordered"; - newObj = newObj->ice_twoway(); + qos["reliability"] = "ordered"; + newObj = newObj->ice_twoway(); } else // reliability == "oneway" { - if(reliability != "oneway" && traceLevels->subscriber > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << reliability <<" mode not understood."; - } - if(!newObj->ice_isDatagram()) - { - newObj = newObj->ice_oneway(); - } + if(reliability != "oneway" && traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << reliability <<" mode not understood."; + } + if(!newObj->ice_isDatagram()) + { + newObj = newObj->ice_oneway(); + } } IceUtil::Mutex::Lock sync(_subscribersMutex); vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); if(p != _subscribers.end()) { - (*p)->destroy(); - _instance->subscriberPool()->remove(*p); - _subscribers.erase(p); + (*p)->destroy(); + _instance->subscriberPool()->remove(*p); + _subscribers.erase(p); } SubscriberPtr subscriber = Subscriber::create(_instance, newObj, qos); @@ -293,27 +293,27 @@ TopicI::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, cons TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "Subscribe: " << _instance->communicator()->identityToString(id); - if(traceLevels->topic > 1) - { - out << " QoS: "; - for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p) - { - if(p != qos.begin()) - { - out << ','; - } - out << '[' << p->first << "," << p->second << ']'; - } - } + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "Subscribe: " << _instance->communicator()->identityToString(id); + if(traceLevels->topic > 1) + { + out << " QoS: "; + for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p) + { + if(p != qos.begin()) + { + out << ','; + } + out << '[' << p->first << "," << p->second << ']'; + } + } } IceUtil::Mutex::Lock sync(_subscribersMutex); vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); if(p != _subscribers.end()) { - throw AlreadySubscribed(); + throw AlreadySubscribed(); } SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos); @@ -329,20 +329,20 @@ TopicI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&) TraceLevelsPtr traceLevels = _instance->traceLevels(); if(!subscriber) { - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "unsubscribe with null subscriber."; - } - return; + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "unsubscribe with null subscriber."; + } + return; } Ice::Identity id = subscriber->ice_getIdentity(); if(traceLevels->topic > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "Unsubscribe: " << _instance->communicator()->identityToString(id); + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "Unsubscribe: " << _instance->communicator()->identityToString(id); } // @@ -367,7 +367,7 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) IceUtil::RecMutex::Lock topicSync(_topicRecordMutex); if(_destroyed) { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); + throw Ice::ObjectNotExistException(__FILE__, __LINE__); } reap(); @@ -379,20 +379,20 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&) // link. for(LinkRecordSeq::const_iterator p = _topicRecord.begin(); p != _topicRecord.end(); ++p) { - if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity()) - { - LinkExists ex; - ex.name = name; - throw ex; - } + if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity()) + { + LinkExists ex; + ex.name = name; + throw ex; + } } TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " link " << _instance->communicator()->identityToString(id) - << " cost " << cost; + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << " link " << _instance->communicator()->identityToString(id) + << " cost " << cost; } SubscriberPtr subscriber = Subscriber::create(_instance, link, cost); @@ -422,7 +422,7 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current) IceUtil::RecMutex::Lock topicSync(_topicRecordMutex); if(_destroyed) { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); + throw Ice::ObjectNotExistException(__FILE__, __LINE__); } reap(); @@ -433,24 +433,24 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current) LinkRecordSeq::iterator p = _topicRecord.begin(); while(p != _topicRecord.end()) { - if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity()) - { - break; - } - ++p; + if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity()) + { + break; + } + ++p; } if(p == _topicRecord.end()) { - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " unlink " << name << " failed - not linked"; - } + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << " unlink " << name << " failed - not linked"; + } - NoSuchLink ex; - ex.name = name; - throw ex; + NoSuchLink ex; + ex.name = name; + throw ex; } Ice::ObjectPrx subscriber = p->obj; @@ -464,8 +464,8 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current) TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _name << " unlink " << _instance->communicator()->identityToString(id); + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _name << " unlink " << _instance->communicator()->identityToString(id); } removeSubscriber(subscriber); } @@ -481,11 +481,11 @@ TopicI::getLinkInfoSeq(const Ice::Current&) const for(LinkRecordSeq::const_iterator q = _topicRecord.begin(); q != _topicRecord.end(); ++q) { - LinkInfo info; - info.name = identityToTopicName(q->theTopic->ice_getIdentity()); - info.cost = q->cost; - info.theTopic = q->theTopic; - seq.push_back(info); + LinkInfo info; + info.name = identityToTopicName(q->theTopic->ice_getIdentity()); + info.cost = q->cost; + info.theTopic = q->theTopic; + seq.push_back(info); } return seq; @@ -498,18 +498,18 @@ TopicI::destroy(const Ice::Current&) if(_destroyed) { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); + throw Ice::ObjectNotExistException(__FILE__, __LINE__); } _destroyed = true; try { - _instance->objectAdapter()->remove(_linkPrx->ice_getIdentity()); - _instance->objectAdapter()->remove(_publisherPrx->ice_getIdentity()); + _instance->objectAdapter()->remove(_linkPrx->ice_getIdentity()); + _instance->objectAdapter()->remove(_publisherPrx->ice_getIdentity()); } catch(const Ice::ObjectAdapterDeactivatedException&) { - // Ignore -- this could occur on shutdown. + // Ignore -- this could occur on shutdown. } } @@ -533,7 +533,7 @@ TopicI::reap() IceUtil::RecMutex::Lock topicSync(_topicRecordMutex); if(_destroyed) { - return; + return; } bool updated = false; @@ -543,46 +543,46 @@ TopicI::reap() // list<SubscriberPtr> error; { - IceUtil::Mutex::Lock errorSync(_errorMutex); - _error.swap(error); + IceUtil::Mutex::Lock errorSync(_errorMutex); + _error.swap(error); } TraceLevelsPtr traceLevels = _instance->traceLevels(); for(list<SubscriberPtr>::const_iterator p = error.begin(); p != error.end(); ++p) { - SubscriberPtr subscriber = *p; - assert(subscriber->persistent()); // Only persistent subscribers need to be reaped. - - bool found = false; - // - // If this turns out to be a performance problem then we - // can create an in memory map cache. - // - LinkRecordSeq::iterator q = _topicRecord.begin(); - while(q != _topicRecord.end()) - { - if(q->obj->ice_getIdentity() == subscriber->id()) - { - _topicRecord.erase(q); - updated = true; - found = true; - break; - } - ++q; - } - if(traceLevels->topic > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << "reaping " << _instance->communicator()->identityToString(subscriber->id()); - if(!found) - { - out << ": failed - not in database"; - } - } + SubscriberPtr subscriber = *p; + assert(subscriber->persistent()); // Only persistent subscribers need to be reaped. + + bool found = false; + // + // If this turns out to be a performance problem then we + // can create an in memory map cache. + // + LinkRecordSeq::iterator q = _topicRecord.begin(); + while(q != _topicRecord.end()) + { + if(q->obj->ice_getIdentity() == subscriber->id()) + { + _topicRecord.erase(q); + updated = true; + found = true; + break; + } + ++q; + } + if(traceLevels->topic > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << "reaping " << _instance->communicator()->identityToString(subscriber->id()); + if(!found) + { + out << ": failed - not in database"; + } + } } if(updated) { - _topics.put(PersistentTopicMap::value_type(_id, _topicRecord)); + _topics.put(PersistentTopicMap::value_type(_id, _topicRecord)); } } @@ -595,8 +595,8 @@ TopicI::publish(bool forwarded, const EventDataSeq& events) // vector<SubscriberPtr> copy; { - IceUtil::Mutex::Lock sync(_subscribersMutex); - copy = _subscribers; + IceUtil::Mutex::Lock sync(_subscribersMutex); + copy = _subscribers; } // @@ -608,18 +608,18 @@ TopicI::publish(bool forwarded, const EventDataSeq& events) list<SubscriberPtr> flush; for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p) { - Subscriber::QueueState state = (*p)->queue(forwarded, events); - switch(state) - { - case Subscriber::QueueStateError: - e.push_back((*p)->id()); - break; - case Subscriber::QueueStateFlush: - flush.push_back(*p); - break; - case Subscriber::QueueStateNoFlush: - break; - } + Subscriber::QueueState state = (*p)->queue(forwarded, events); + switch(state) + { + case Subscriber::QueueStateError: + e.push_back((*p)->id()); + break; + case Subscriber::QueueStateFlush: + flush.push_back(*p); + break; + case Subscriber::QueueStateNoFlush: + break; + } } // @@ -627,7 +627,7 @@ TopicI::publish(bool forwarded, const EventDataSeq& events) // if(!flush.empty()) { - _instance->subscriberPool()->flush(flush); + _instance->subscriberPool()->flush(flush); } // @@ -637,48 +637,48 @@ TopicI::publish(bool forwarded, const EventDataSeq& events) list<SubscriberPtr> reap; if(!e.empty()) { - IceUtil::Mutex::Lock sync(_subscribersMutex); - for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep) - { - // - // Its possible for the subscriber to already have been - // removed since the copy is iterated over outside of - // mutex protection. - // - // Note that although this could be quicker if we used a - // map, the most optimal case should be pushing around - // events not searching for a particular subscriber. - // - // The subscriber is immediately destroyed & removed from - // the _subscribers list. If the subscriber is persistent - // its added to an list of error'd subscribers and removed - // from the database on the next reap. - // - vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep); - if(q != _subscribers.end()) - { - // - // Destroy the subscriber in any case. - // - (*q)->destroy(); - if((*q)->persistent()) - { - reap.push_back(*q); - } - _instance->subscriberPool()->remove(*q); - _subscribers.erase(q); - } - } + IceUtil::Mutex::Lock sync(_subscribersMutex); + for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep) + { + // + // Its possible for the subscriber to already have been + // removed since the copy is iterated over outside of + // mutex protection. + // + // Note that although this could be quicker if we used a + // map, the most optimal case should be pushing around + // events not searching for a particular subscriber. + // + // The subscriber is immediately destroyed & removed from + // the _subscribers list. If the subscriber is persistent + // its added to an list of error'd subscribers and removed + // from the database on the next reap. + // + vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep); + if(q != _subscribers.end()) + { + // + // Destroy the subscriber in any case. + // + (*q)->destroy(); + if((*q)->persistent()) + { + reap.push_back(*q); + } + _instance->subscriberPool()->remove(*q); + _subscribers.erase(q); + } + } } if(!reap.empty()) { - // - // This is why _error is a list, so we can splice on the - // reaped subscribers. - // - IceUtil::Mutex::Lock errorSync(_errorMutex); - _error.splice(_error.begin(), reap); + // + // This is why _error is a list, so we can splice on the + // reaped subscribers. + // + IceUtil::Mutex::Lock errorSync(_errorMutex); + _error.splice(_error.begin(), reap); } } @@ -691,10 +691,10 @@ TopicI::removeSubscriber(const Ice::ObjectPrx& obj) vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); if(p != _subscribers.end()) { - (*p)->destroy(); - _instance->subscriberPool()->remove(*p); - _subscribers.erase(p); - return; + (*p)->destroy(); + _instance->subscriberPool()->remove(*p); + _subscribers.erase(p); + return; } // @@ -703,7 +703,7 @@ TopicI::removeSubscriber(const Ice::ObjectPrx& obj) TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topic > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->topicCat); - out << _instance->communicator()->identityToString(id) << ": not subscribed."; + Ice::Trace out(traceLevels->logger, traceLevels->topicCat); + out << _instance->communicator()->identityToString(id) << ": not subscribed."; } } diff --git a/cpp/src/IceStorm/TopicI.h b/cpp/src/IceStorm/TopicI.h index 2ec61aee3c2..8db61085fa3 100644 --- a/cpp/src/IceStorm/TopicI.h +++ b/cpp/src/IceStorm/TopicI.h @@ -29,7 +29,7 @@ class TopicI : public TopicInternal public: TopicI(const InstancePtr&, const std::string&, const Ice::Identity&, const LinkRecordSeq&, const std::string&, - const std::string&); + const std::string&); ~TopicI(); virtual std::string getName(const Ice::Current&) const; diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp index 12c9706ceaa..f8ab5fe80bd 100644 --- a/cpp/src/IceStorm/TopicManagerI.cpp +++ b/cpp/src/IceStorm/TopicManagerI.cpp @@ -37,7 +37,7 @@ identityToTopicName(const Ice::Identity& id) // if(id.category.empty()) { - return id.name; + return id.name; } assert(id.name.length() > 6 && id.name.compare(0, 6, "topic.") == 0); @@ -63,7 +63,7 @@ TopicManagerI::TopicManagerI( // for(PersistentTopicMap::const_iterator p = _topics.begin(); p != _topics.end(); ++p) { - installTopic(identityToTopicName(p->first), p->first, p->second, false); + installTopic(identityToTopicName(p->first), p->first, p->second, false); } } @@ -80,7 +80,7 @@ TopicManagerI::create(const string& name, const Ice::Current&) if(_topicIMap.find(name) != _topicIMap.end()) { TopicExists ex; - ex.name = name; + ex.name = name; throw ex; } @@ -105,9 +105,9 @@ TopicManagerI::retrieve(const string& name, const Ice::Current&) const TopicIMap::const_iterator p = _topicIMap.find(name); if(p == _topicIMap.end()) { - NoSuchTopic ex; - ex.name = name; - throw ex; + NoSuchTopic ex; + ex.name = name; + throw ex; } // Here we cannot just reconstruct the identity since the @@ -127,13 +127,13 @@ TopicManagerI::retrieveAll(const Ice::Current&) const TopicDict all; for(TopicIMap::const_iterator p = _topicIMap.begin(); p != _topicIMap.end(); ++p) { - // - // Here we cannot just reconstruct the identity since the - // identity could be either "<instanceName>/topic.<topicname>" - // name, or if created with pre-3.2 IceStorm "/<topicname>". - // + // + // Here we cannot just reconstruct the identity since the + // identity could be either "<instanceName>/topic.<topicname>" + // name, or if created with pre-3.2 IceStorm "/<topicname>". + // all.insert(TopicDict::value_type( - p->first, TopicPrx::uncheckedCast(_topicAdapter->createProxy(p->second->id())))); + p->first, TopicPrx::uncheckedCast(_topicAdapter->createProxy(p->second->id())))); } return all; @@ -158,8 +158,8 @@ TopicManagerI::reap() { if(i->second->destroyed()) { - Ice::Identity id = i->second->id(); - TraceLevelsPtr traceLevels = _instance->traceLevels(); + Ice::Identity id = i->second->id(); + TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topicMgr > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat); @@ -195,7 +195,7 @@ TopicManagerI::shutdown() for(TopicIMap::const_iterator p = _topicIMap.begin(); p != _topicIMap.end(); ++p) { - p->second->reap(); + p->second->reap(); } } @@ -208,17 +208,17 @@ TopicManagerI::installTopic(const string& name, const Ice::Identity& id, const L TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->topicMgr > 0) { - Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat); - if(create) - { - out << "creating new topic \"" << name << "\". id: " - << _instance->communicator()->identityToString(id); - } - else - { - out << "loading topic \"" << name << "\" from database. id: " - << _instance->communicator()->identityToString(id); - } + Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat); + if(create) + { + out << "creating new topic \"" << name << "\". id: " + << _instance->communicator()->identityToString(id); + } + else + { + out << "loading topic \"" << name << "\" from database. id: " + << _instance->communicator()->identityToString(id); + } } // diff --git a/cpp/src/IceStorm/TopicManagerI.h b/cpp/src/IceStorm/TopicManagerI.h index 61e6f30e223..ebddf8a9156 100644 --- a/cpp/src/IceStorm/TopicManagerI.h +++ b/cpp/src/IceStorm/TopicManagerI.h @@ -38,9 +38,9 @@ class TopicManagerI : public TopicManager, public IceUtil::Mutex public: TopicManagerI(const InstancePtr&, - const Ice::ObjectAdapterPtr&, - const std::string&, - const std::string&); + const Ice::ObjectAdapterPtr&, + const std::string&, + const std::string&); ~TopicManagerI(); virtual TopicPrx create(const std::string&, const Ice::Current&); |