summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm')
-rw-r--r--cpp/src/IceStorm/Admin.cpp310
-rw-r--r--cpp/src/IceStorm/BatchFlusher.cpp110
-rw-r--r--cpp/src/IceStorm/Instance.cpp20
-rw-r--r--cpp/src/IceStorm/Parser.cpp550
-rw-r--r--cpp/src/IceStorm/Parser.h4
-rw-r--r--cpp/src/IceStorm/Service.cpp60
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp504
-rw-r--r--cpp/src/IceStorm/Subscriber.h14
-rw-r--r--cpp/src/IceStorm/SubscriberPool.cpp422
-rw-r--r--cpp/src/IceStorm/TopicI.cpp484
-rw-r--r--cpp/src/IceStorm/TopicI.h2
-rw-r--r--cpp/src/IceStorm/TopicManagerI.cpp52
-rw-r--r--cpp/src/IceStorm/TopicManagerI.h6
13 files changed, 1269 insertions, 1269 deletions
diff --git a/cpp/src/IceStorm/Admin.cpp b/cpp/src/IceStorm/Admin.cpp
index bd175b818d3..62ab8958c0a 100644
--- a/cpp/src/IceStorm/Admin.cpp
+++ b/cpp/src/IceStorm/Admin.cpp
@@ -39,17 +39,17 @@ void
Client::usage()
{
cerr << "Usage: " << appName() << " [options] [file...]\n";
- cerr <<
- "Options:\n"
- "-h, --help Show this message.\n"
- "-v, --version Display the Ice version.\n"
- "-DNAME Define NAME as 1.\n"
- "-DNAME=DEF Define NAME as DEF.\n"
- "-UNAME Remove any definition for NAME.\n"
- "-IDIR Put DIR in the include file search path.\n"
- "-e COMMANDS Execute COMMANDS.\n"
- "-d, --debug Print debug messages.\n"
- ;
+ cerr <<
+ "Options:\n"
+ "-h, --help Show this message.\n"
+ "-v, --version Display the Ice version.\n"
+ "-DNAME Define NAME as 1.\n"
+ "-DNAME=DEF Define NAME as DEF.\n"
+ "-UNAME Remove any definition for NAME.\n"
+ "-IDIR Put DIR in the include file search path.\n"
+ "-e COMMANDS Execute COMMANDS.\n"
+ "-d, --debug Print debug messages.\n"
+ ;
}
int
@@ -71,64 +71,64 @@ Client::run(int argc, char* argv[])
vector<string> args;
try
{
- args = opts.parse(argc, (const char**)argv);
+ args = opts.parse(argc, (const char**)argv);
}
catch(const IceUtil::BadOptException& e)
{
cerr << e.reason << endl;
- usage();
- return EXIT_FAILURE;
+ usage();
+ return EXIT_FAILURE;
}
if(opts.isSet("help"))
{
- usage();
- return EXIT_SUCCESS;
+ usage();
+ return EXIT_SUCCESS;
}
if(opts.isSet("version"))
{
- cout << ICE_STRING_VERSION << endl;
- return EXIT_SUCCESS;
+ cout << ICE_STRING_VERSION << endl;
+ return EXIT_SUCCESS;
}
if(opts.isSet("D"))
{
- vector<string> optargs = opts.argVec("D");
- for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i)
- {
- cpp += " -D" + *i;
- }
+ vector<string> optargs = opts.argVec("D");
+ for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i)
+ {
+ cpp += " -D" + *i;
+ }
}
if(opts.isSet("U"))
{
- vector<string> optargs = opts.argVec("U");
- for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i)
- {
- cpp += " -U" + *i;
- }
+ vector<string> optargs = opts.argVec("U");
+ for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i)
+ {
+ cpp += " -U" + *i;
+ }
}
if(opts.isSet("I"))
{
- vector<string> optargs = opts.argVec("I");
- for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i)
- {
- cpp += " -I" + *i;
- }
+ vector<string> optargs = opts.argVec("I");
+ for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i)
+ {
+ cpp += " -I" + *i;
+ }
}
if(opts.isSet("e"))
{
- vector<string> optargs = opts.argVec("e");
- for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i)
- {
- commands += *i + ";";
- }
+ vector<string> optargs = opts.argVec("e");
+ for(vector<string>::const_iterator i = optargs.begin(); i != optargs.end(); ++i)
+ {
+ commands += *i + ";";
+ }
}
debug = opts.isSet("debug");
if(!args.empty() && !commands.empty())
{
- cerr << appName() << ": `-e' option cannot be used if input files are given" << endl;
- usage();
- return EXIT_FAILURE;
+ cerr << appName() << ": `-e' option cannot be used if input files are given" << endl;
+ usage();
+ return EXIT_FAILURE;
}
// The complete set of Ice::Identity -> manager proxies.
@@ -141,14 +141,14 @@ Client::run(int argc, char* argv[])
IceStorm::TopicManagerPrx defaultManager;
if(!managerProxy.empty())
{
- defaultManager = IceStorm::TopicManagerPrx::checkedCast(communicator()->stringToProxy(managerProxy));
- if(!defaultManager)
- {
- cerr << appName() << ": `" << managerProxy << "' is not running" << endl;
- return EXIT_FAILURE;
- }
- managers.insert(map<Ice::Identity, IceStorm::TopicManagerPrx>::value_type(
- defaultManager->ice_getIdentity(), defaultManager));
+ defaultManager = IceStorm::TopicManagerPrx::checkedCast(communicator()->stringToProxy(managerProxy));
+ if(!defaultManager)
+ {
+ cerr << appName() << ": `" << managerProxy << "' is not running" << endl;
+ return EXIT_FAILURE;
+ }
+ managers.insert(map<Ice::Identity, IceStorm::TopicManagerPrx>::value_type(
+ defaultManager->ice_getIdentity(), defaultManager));
}
//
@@ -156,134 +156,134 @@ Client::run(int argc, char* argv[])
//
Ice::PropertyDict props = communicator()->getProperties()->getPropertiesForPrefix("IceStormAdmin.TopicManager.");
{
- for(Ice::PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
- {
- try
- {
- IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::uncheckedCast(
- communicator()->stringToProxy(p->second));
- managers.insert(map<Ice::Identity, IceStorm::TopicManagerPrx>::value_type(
- manager->ice_getIdentity(), manager));
- }
- catch(const Ice::ProxyParseException&)
- {
- cerr << appName() << ": malformed proxy: " << p->second << endl;
- return EXIT_FAILURE;
- }
- }
- if(props.empty() && !defaultManager)
- {
- cerr << appName() << ": no manager proxies configured" << endl;
- return EXIT_FAILURE;
- }
+ for(Ice::PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
+ {
+ try
+ {
+ IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::uncheckedCast(
+ communicator()->stringToProxy(p->second));
+ managers.insert(map<Ice::Identity, IceStorm::TopicManagerPrx>::value_type(
+ manager->ice_getIdentity(), manager));
+ }
+ catch(const Ice::ProxyParseException&)
+ {
+ cerr << appName() << ": malformed proxy: " << p->second << endl;
+ return EXIT_FAILURE;
+ }
+ }
+ if(props.empty() && !defaultManager)
+ {
+ cerr << appName() << ": no manager proxies configured" << endl;
+ return EXIT_FAILURE;
+ }
- if(!defaultManager)
- {
- string managerProxy = properties->getProperty("IceStormAdmin.TopicManager.Default");
- if(!managerProxy.empty())
- {
- defaultManager = IceStorm::TopicManagerPrx::uncheckedCast(
- communicator()->stringToProxy(managerProxy));
- }
- else
- {
- defaultManager = managers.begin()->second;
- }
- }
+ if(!defaultManager)
+ {
+ string managerProxy = properties->getProperty("IceStormAdmin.TopicManager.Default");
+ if(!managerProxy.empty())
+ {
+ defaultManager = IceStorm::TopicManagerPrx::uncheckedCast(
+ communicator()->stringToProxy(managerProxy));
+ }
+ else
+ {
+ defaultManager = managers.begin()->second;
+ }
+ }
}
// Check slice checksums for each manager.
{
- for(map<Ice::Identity, IceStorm::TopicManagerPrx>::const_iterator p = managers.begin(); p != managers.end();
- ++p)
- {
- try
- {
- Ice::SliceChecksumDict serverChecksums = p->second->getSliceChecksums();
- Ice::SliceChecksumDict localChecksums = Ice::sliceChecksums();
- for(Ice::SliceChecksumDict::const_iterator q = localChecksums.begin(); q != localChecksums.end(); ++q)
- {
- Ice::SliceChecksumDict::const_iterator r = serverChecksums.find(q->first);
- if(r == serverChecksums.end())
- {
- cerr << appName() << ": " << communicator()->identityToString(p->first)
- << " is using unknown Slice type `" << q->first << "'" << endl;
- }
- else if(q->second != r->second)
- {
- cerr << appName() << ": " << communicator()->identityToString(p->first)
- << " is using a different Slice definition of `" << q->first << "'" << endl;
- }
- }
- }
- catch(const Ice::Exception& ex)
- {
- cerr << communicator()->identityToString(p->first) << ": " << ex << endl;
- }
- }
+ for(map<Ice::Identity, IceStorm::TopicManagerPrx>::const_iterator p = managers.begin(); p != managers.end();
+ ++p)
+ {
+ try
+ {
+ Ice::SliceChecksumDict serverChecksums = p->second->getSliceChecksums();
+ Ice::SliceChecksumDict localChecksums = Ice::sliceChecksums();
+ for(Ice::SliceChecksumDict::const_iterator q = localChecksums.begin(); q != localChecksums.end(); ++q)
+ {
+ Ice::SliceChecksumDict::const_iterator r = serverChecksums.find(q->first);
+ if(r == serverChecksums.end())
+ {
+ cerr << appName() << ": " << communicator()->identityToString(p->first)
+ << " is using unknown Slice type `" << q->first << "'" << endl;
+ }
+ else if(q->second != r->second)
+ {
+ cerr << appName() << ": " << communicator()->identityToString(p->first)
+ << " is using a different Slice definition of `" << q->first << "'" << endl;
+ }
+ }
+ }
+ catch(const Ice::Exception& ex)
+ {
+ cerr << communicator()->identityToString(p->first) << ": " << ex << endl;
+ }
+ }
}
-
+
ParserPtr p = Parser::createParser(communicator(), defaultManager, managers);
int status = EXIT_SUCCESS;
if(args.empty()) // No files given
{
- if(!commands.empty()) // Commands were given
- {
- int parseStatus = p->parse(commands, debug);
- if(parseStatus == EXIT_FAILURE)
- {
- status = EXIT_FAILURE;
- }
- }
- else // No commands, let's use standard input
- {
- p->showBanner();
+ if(!commands.empty()) // Commands were given
+ {
+ int parseStatus = p->parse(commands, debug);
+ if(parseStatus == EXIT_FAILURE)
+ {
+ status = EXIT_FAILURE;
+ }
+ }
+ else // No commands, let's use standard input
+ {
+ p->showBanner();
- int parseStatus = p->parse(stdin, debug);
- if(parseStatus == EXIT_FAILURE)
- {
- status = EXIT_FAILURE;
- }
- }
+ int parseStatus = p->parse(stdin, debug);
+ if(parseStatus == EXIT_FAILURE)
+ {
+ status = EXIT_FAILURE;
+ }
+ }
}
else // Process files given on the command line
{
- for(vector<string>::const_iterator i = args.begin(); i != args.end(); ++i)
- {
- ifstream test(i->c_str());
- if(!test)
- {
- cerr << appName() << ": can't open `" << *i << "' for reading: " << strerror(errno) << endl;
- return EXIT_FAILURE;
- }
- test.close();
-
- string cmd = cpp + " " + *i;
+ for(vector<string>::const_iterator i = args.begin(); i != args.end(); ++i)
+ {
+ ifstream test(i->c_str());
+ if(!test)
+ {
+ cerr << appName() << ": can't open `" << *i << "' for reading: " << strerror(errno) << endl;
+ return EXIT_FAILURE;
+ }
+ test.close();
+
+ string cmd = cpp + " " + *i;
#ifdef _WIN32
- FILE* cppHandle = _popen(cmd.c_str(), "r");
+ FILE* cppHandle = _popen(cmd.c_str(), "r");
#else
- FILE* cppHandle = popen(cmd.c_str(), "r");
+ FILE* cppHandle = popen(cmd.c_str(), "r");
#endif
- if(cppHandle == NULL)
- {
- cerr << appName() << ": can't run C++ preprocessor: " << strerror(errno) << endl;
- return EXIT_FAILURE;
- }
-
- int parseStatus = p->parse(cppHandle, debug);
-
+ if(cppHandle == NULL)
+ {
+ cerr << appName() << ": can't run C++ preprocessor: " << strerror(errno) << endl;
+ return EXIT_FAILURE;
+ }
+
+ int parseStatus = p->parse(cppHandle, debug);
+
#ifdef _WIN32
- _pclose(cppHandle);
+ _pclose(cppHandle);
#else
- pclose(cppHandle);
+ pclose(cppHandle);
#endif
- if(parseStatus == EXIT_FAILURE)
- {
- status = EXIT_FAILURE;
- }
- }
+ if(parseStatus == EXIT_FAILURE)
+ {
+ status = EXIT_FAILURE;
+ }
+ }
}
return status;
diff --git a/cpp/src/IceStorm/BatchFlusher.cpp b/cpp/src/IceStorm/BatchFlusher.cpp
index 699a145db87..d38c93ea649 100644
--- a/cpp/src/IceStorm/BatchFlusher.cpp
+++ b/cpp/src/IceStorm/BatchFlusher.cpp
@@ -24,8 +24,8 @@ using namespace std;
BatchFlusher::BatchFlusher(const InstancePtr& instance) :
_traceLevels(instance->traceLevels()),
_flushTime(IceUtil::Time::milliSeconds(
- max(instance->properties()->getPropertyAsIntWithDefault(
- "IceStorm.Flush.Timeout", 1000), 100))), // Minimum of 100ms.
+ max(instance->properties()->getPropertyAsIntWithDefault(
+ "IceStorm.Flush.Timeout", 1000), 100))), // Minimum of 100ms.
_destroy(false)
{
start();
@@ -45,7 +45,7 @@ BatchFlusher::add(const Ice::ObjectPrx& subscriber)
//
if(_subscribers.empty())
{
- notify();
+ notify();
}
_subscribers.push_back(subscriber);
}
@@ -70,58 +70,58 @@ BatchFlusher::run()
{
for(;;)
{
- list<Ice::ObjectPrx> subscribers;
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_destroy)
- {
- return;
- }
- if(_subscribers.empty())
- {
- wait();
- }
- else
- {
- timedWait(_flushTime);
- }
- if(_destroy)
- {
- return;
- }
- subscribers = _subscribers;
- }
+ list<Ice::ObjectPrx> subscribers;
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_destroy)
+ {
+ return;
+ }
+ if(_subscribers.empty())
+ {
+ wait();
+ }
+ else
+ {
+ timedWait(_flushTime);
+ }
+ if(_destroy)
+ {
+ return;
+ }
+ subscribers = _subscribers;
+ }
- set<Ice::ConnectionPtr> flushSet;
- for(list<Ice::ObjectPrx>::const_iterator p = subscribers.begin(); p != subscribers.end(); ++p)
- {
- Ice::ConnectionPtr connection = (*p)->ice_getCachedConnection();
- if(connection)
- {
- flushSet.insert(connection);
- }
- }
-
- for(set<Ice::ConnectionPtr>::const_iterator q = flushSet.begin(); q != flushSet.end(); ++q)
- {
- try
- {
- (*q)->flushBatchRequests();
- }
- catch(const Ice::LocalException&)
- {
- // Ignore.
- }
- }
-
- //
- // Trace after the flush so that the correct number of objects
- // are displayed
- //
- if(_traceLevels->flush > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->flushCat);
- out << "connections: " << flushSet.size() << " subscribers: " << subscribers.size();
- }
+ set<Ice::ConnectionPtr> flushSet;
+ for(list<Ice::ObjectPrx>::const_iterator p = subscribers.begin(); p != subscribers.end(); ++p)
+ {
+ Ice::ConnectionPtr connection = (*p)->ice_getCachedConnection();
+ if(connection)
+ {
+ flushSet.insert(connection);
+ }
+ }
+
+ for(set<Ice::ConnectionPtr>::const_iterator q = flushSet.begin(); q != flushSet.end(); ++q)
+ {
+ try
+ {
+ (*q)->flushBatchRequests();
+ }
+ catch(const Ice::LocalException&)
+ {
+ // Ignore.
+ }
+ }
+
+ //
+ // Trace after the flush so that the correct number of objects
+ // are displayed
+ //
+ if(_traceLevels->flush > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->flushCat);
+ out << "connections: " << flushSet.size() << " subscribers: " << subscribers.size();
+ }
}
}
diff --git a/cpp/src/IceStorm/Instance.cpp b/cpp/src/IceStorm/Instance.cpp
index a5bcb3628d6..dac531d34f0 100644
--- a/cpp/src/IceStorm/Instance.cpp
+++ b/cpp/src/IceStorm/Instance.cpp
@@ -28,23 +28,23 @@ Instance::Instance(
_adapter(adapter),
_traceLevels(new TraceLevels(name, communicator->getProperties(), communicator->getLogger())),
_discardInterval(IceUtil::Time::seconds(communicator->getProperties()->getPropertyAsIntWithDefault(
- "IceStorm.Discard.Interval", 60))), // default one minute.
+ "IceStorm.Discard.Interval", 60))), // default one minute.
// default one minute.
_sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault("IceStorm.Send.Timeout", 60 * 1000))
{
try
{
- __setNoDelete(true);
+ __setNoDelete(true);
- _batchFlusher = new BatchFlusher(this);
- _subscriberPool = new SubscriberPool(this);
+ _batchFlusher = new BatchFlusher(this);
+ _subscriberPool = new SubscriberPool(this);
}
catch(...)
{
- shutdown();
- __setNoDelete(false);
+ shutdown();
+ __setNoDelete(false);
- throw;
+ throw;
}
__setNoDelete(false);
}
@@ -112,12 +112,12 @@ Instance::shutdown()
{
if(_batchFlusher)
{
- _batchFlusher->destroy();
- _batchFlusher->getThreadControl().join();
+ _batchFlusher->destroy();
+ _batchFlusher->getThreadControl().join();
}
if(_subscriberPool)
{
- _subscriberPool->destroy();
+ _subscriberPool->destroy();
}
}
diff --git a/cpp/src/IceStorm/Parser.cpp b/cpp/src/IceStorm/Parser.cpp
index aed113d2aec..3933ce6bd5a 100644
--- a/cpp/src/IceStorm/Parser.cpp
+++ b/cpp/src/IceStorm/Parser.cpp
@@ -39,8 +39,8 @@ class UnknownManagerException : public Exception
public:
UnknownManagerException(const string& name, const char* file, int line) :
- Exception(file, line),
- name(name)
+ Exception(file, line),
+ name(name)
{
}
@@ -51,18 +51,18 @@ public:
virtual string
ice_name() const
{
- return "UnknownManagerException";
+ return "UnknownManagerException";
}
virtual Exception*
ice_clone() const
{
- return new UnknownManagerException(*this);
+ return new UnknownManagerException(*this);
}
virtual void
ice_throw() const
{
- throw *this;
+ throw *this;
}
const string name;
};
@@ -71,7 +71,7 @@ public:
ParserPtr
Parser::createParser(const CommunicatorPtr& communicator, const TopicManagerPrx& admin,
- const map<Ice::Identity, IceStorm::TopicManagerPrx>& managers)
+ const map<Ice::Identity, IceStorm::TopicManagerPrx>& managers)
{
return new Parser(communicator, admin, managers);
}
@@ -102,18 +102,18 @@ Parser::create(const list<string>& args)
try
{
- for(list<string>::const_iterator i = args.begin(); i != args.end() ; ++i)
- {
- string arg;
- IceStorm::TopicManagerPrx manager = findManagerById(*i, arg);
- manager->create(arg);
- }
+ for(list<string>::const_iterator i = args.begin(); i != args.end() ; ++i)
+ {
+ string arg;
+ IceStorm::TopicManagerPrx manager = findManagerById(*i, arg);
+ manager->create(arg);
+ }
}
catch(const Exception& ex)
{
- ostringstream s;
- s << ex;
- error(s.str());
+ ostringstream s;
+ s << ex;
+ error(s.str());
}
}
@@ -122,19 +122,19 @@ Parser::destroy(const list<string>& args)
{
try
{
- for(list<string>::const_iterator i = args.begin(); i != args.end() ; ++i)
- {
- string arg;
- IceStorm::TopicManagerPrx manager = findManagerById(*i, arg);
- TopicPrx topic = manager->retrieve(arg);
- topic->destroy();
- }
+ for(list<string>::const_iterator i = args.begin(); i != args.end() ; ++i)
+ {
+ string arg;
+ IceStorm::TopicManagerPrx manager = findManagerById(*i, arg);
+ TopicPrx topic = manager->retrieve(arg);
+ topic->destroy();
+ }
}
catch(const Exception& ex)
{
- ostringstream s;
- s << ex;
- error(s.str());
+ ostringstream s;
+ s << ex;
+ error(s.str());
}
}
@@ -145,56 +145,56 @@ Parser::link(const list<string>& _args)
if(args.size() < 2)
{
- error("`link' requires at least two arguments (type `help' for more info)");
- return;
+ error("`link' requires at least two arguments (type `help' for more info)");
+ return;
}
try
{
- TopicPrx fromTopic;
- TopicPrx toTopic;
-
- try
- {
- string arg;
- IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg);
- fromTopic = manager->retrieve(arg);
- }
- catch(const IceStorm::NoSuchTopic&)
- {
- ostringstream s;
- s << args.front() << ": topic doesn't exist";
- error(s.str());
- return;
- }
- args.pop_front();
-
- try
- {
- string arg;
- IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg);
- toTopic = manager->retrieve(arg);
- }
- catch(const IceStorm::NoSuchTopic&)
- {
- ostringstream s;
- s << args.front() << ": topic doesn't exist";
- error(s.str());
- return;
- }
- args.pop_front();
- Ice::Int cost = 0;
- if(!args.empty())
- {
- cost = atoi(args.front().c_str());
- }
- fromTopic->link(toTopic, cost);
+ TopicPrx fromTopic;
+ TopicPrx toTopic;
+
+ try
+ {
+ string arg;
+ IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg);
+ fromTopic = manager->retrieve(arg);
+ }
+ catch(const IceStorm::NoSuchTopic&)
+ {
+ ostringstream s;
+ s << args.front() << ": topic doesn't exist";
+ error(s.str());
+ return;
+ }
+ args.pop_front();
+
+ try
+ {
+ string arg;
+ IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg);
+ toTopic = manager->retrieve(arg);
+ }
+ catch(const IceStorm::NoSuchTopic&)
+ {
+ ostringstream s;
+ s << args.front() << ": topic doesn't exist";
+ error(s.str());
+ return;
+ }
+ args.pop_front();
+ Ice::Int cost = 0;
+ if(!args.empty())
+ {
+ cost = atoi(args.front().c_str());
+ }
+ fromTopic->link(toTopic, cost);
}
catch(const Exception& ex)
{
- ostringstream s;
- s << ex;
- error(s.str());
+ ostringstream s;
+ s << ex;
+ error(s.str());
}
}
@@ -205,51 +205,51 @@ Parser::unlink(const list<string>& _args)
if(args.size() != 2)
{
- error("`unlink' requires exactly two arguments (type `help' for more info)");
- return;
+ error("`unlink' requires exactly two arguments (type `help' for more info)");
+ return;
}
try
{
- TopicPrx fromTopic;
- TopicPrx toTopic;
-
- try
- {
- string arg;
- IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg);
- fromTopic = manager->retrieve(arg);
- }
- catch(const IceStorm::NoSuchTopic&)
- {
- ostringstream s;
- s << args.front() << ": topic doesn't exist";
- error(s.str());
- return;
- }
- args.pop_front();
-
- try
- {
- string arg;
- IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg);
- toTopic = manager->retrieve(arg);
- }
- catch(const IceStorm::NoSuchTopic&)
- {
- ostringstream s;
- s << args.front() << ": topic doesn't exist";
- error(s.str());
- return;
- }
-
- fromTopic->unlink(toTopic);
+ TopicPrx fromTopic;
+ TopicPrx toTopic;
+
+ try
+ {
+ string arg;
+ IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg);
+ fromTopic = manager->retrieve(arg);
+ }
+ catch(const IceStorm::NoSuchTopic&)
+ {
+ ostringstream s;
+ s << args.front() << ": topic doesn't exist";
+ error(s.str());
+ return;
+ }
+ args.pop_front();
+
+ try
+ {
+ string arg;
+ IceStorm::TopicManagerPrx manager = findManagerById(args.front(), arg);
+ toTopic = manager->retrieve(arg);
+ }
+ catch(const IceStorm::NoSuchTopic&)
+ {
+ ostringstream s;
+ s << args.front() << ": topic doesn't exist";
+ error(s.str());
+ return;
+ }
+
+ fromTopic->unlink(toTopic);
}
catch(const Exception& ex)
{
- ostringstream s;
- s << ex;
- error(s.str());
+ ostringstream s;
+ s << ex;
+ error(s.str());
}
}
@@ -260,22 +260,22 @@ Parser::current(const list<string>& _args)
if(args.size() == 0)
{
- cout << _communicator->identityToString(_defaultManager->ice_getIdentity()) << endl;
- return;
+ cout << _communicator->identityToString(_defaultManager->ice_getIdentity()) << endl;
+ return;
}
try
{
- IceStorm::TopicManagerPrx manager = findManagerByCategory(args.front());
- manager->ice_ping();
- _defaultManager = manager;
+ IceStorm::TopicManagerPrx manager = findManagerByCategory(args.front());
+ manager->ice_ping();
+ _defaultManager = manager;
}
catch(const Exception& ex)
{
- ostringstream s;
- s << args.front() << ": " << ex;
- error(s.str());
- return;
+ ostringstream s;
+ s << args.front() << ": " << ex;
+ error(s.str());
+ return;
}
}
@@ -286,60 +286,60 @@ Parser::dolist(const list<string>& _args)
try
{
- if(args.size() <= 1)
- {
- IceStorm::TopicManagerPrx manager;
- if(args.size() == 1)
- {
- manager = findManagerByCategory(args.front());
- }
- else
- {
- manager = _defaultManager;
- }
- TopicDict d = manager->retrieveAll();
- if(!d.empty())
- {
- for(TopicDict::iterator i = d.begin(); i != d.end(); ++i)
- {
- if(i != d.begin())
- {
- cout << ", ";
- }
- cout << i->first;
- }
- cout << endl;
- }
- }
- else
- {
- IceStorm::TopicManagerPrx manager = findManagerByCategory(args.front());
- args.pop_front();
- while(!args.empty())
- {
- try
- {
- string arg = args.front();
- args.pop_front();
- TopicPrx topic = manager->retrieve(arg);
- LinkInfoSeq links = topic->getLinkInfoSeq();
- for(LinkInfoSeq::const_iterator p = links.begin(); p != links.end(); ++p)
- {
- cout << "\t" << (*p).name << " with cost " << (*p).cost << endl;
- }
- }
- catch(const NoSuchTopic&)
- {
- cout << "\tNo such topic" << endl;
- }
- }
- }
+ if(args.size() <= 1)
+ {
+ IceStorm::TopicManagerPrx manager;
+ if(args.size() == 1)
+ {
+ manager = findManagerByCategory(args.front());
+ }
+ else
+ {
+ manager = _defaultManager;
+ }
+ TopicDict d = manager->retrieveAll();
+ if(!d.empty())
+ {
+ for(TopicDict::iterator i = d.begin(); i != d.end(); ++i)
+ {
+ if(i != d.begin())
+ {
+ cout << ", ";
+ }
+ cout << i->first;
+ }
+ cout << endl;
+ }
+ }
+ else
+ {
+ IceStorm::TopicManagerPrx manager = findManagerByCategory(args.front());
+ args.pop_front();
+ while(!args.empty())
+ {
+ try
+ {
+ string arg = args.front();
+ args.pop_front();
+ TopicPrx topic = manager->retrieve(arg);
+ LinkInfoSeq links = topic->getLinkInfoSeq();
+ for(LinkInfoSeq::const_iterator p = links.begin(); p != links.end(); ++p)
+ {
+ cout << "\t" << (*p).name << " with cost " << (*p).cost << endl;
+ }
+ }
+ catch(const NoSuchTopic&)
+ {
+ cout << "\tNo such topic" << endl;
+ }
+ }
+ }
}
catch(const Exception& ex)
{
- ostringstream s;
- s << ex;
- error(s.str());
+ ostringstream s;
+ s << ex;
+ error(s.str());
}
}
@@ -366,105 +366,105 @@ Parser::getInput(char* buf, int& result, int maxSize)
{
if(!_commands.empty())
{
- if(_commands == ";")
- {
- result = 0;
- }
- else
- {
+ if(_commands == ";")
+ {
+ result = 0;
+ }
+ else
+ {
#if defined(_MSC_VER) && !defined(_STLP_MSVC)
- // COMPILERBUG: Stupid Visual C++ defines min and max as macros
- result = _MIN(maxSize, static_cast<int>(_commands.length()));
+ // COMPILERBUG: Stupid Visual C++ defines min and max as macros
+ result = _MIN(maxSize, static_cast<int>(_commands.length()));
#else
- result = min(maxSize, static_cast<int>(_commands.length()));
+ result = min(maxSize, static_cast<int>(_commands.length()));
#endif
- strncpy(buf, _commands.c_str(), result);
- _commands.erase(0, result);
- if(_commands.empty())
- {
- _commands = ";";
- }
- }
+ strncpy(buf, _commands.c_str(), result);
+ _commands.erase(0, result);
+ if(_commands.empty())
+ {
+ _commands = ";";
+ }
+ }
}
else if(isatty(fileno(yyin)))
{
#ifdef HAVE_READLINE
const char* prompt = parser->getPrompt();
- char* line = readline(const_cast<char*>(prompt));
- if(!line)
- {
- result = 0;
- }
- else
- {
- if(*line)
- {
- add_history(line);
- }
-
- result = strlen(line) + 1;
- if(result > maxSize)
- {
- free(line);
- error("input line too long");
- result = 0;
- }
- else
- {
- strcpy(buf, line);
- strcat(buf, "\n");
- free(line);
- }
- }
+ char* line = readline(const_cast<char*>(prompt));
+ if(!line)
+ {
+ result = 0;
+ }
+ else
+ {
+ if(*line)
+ {
+ add_history(line);
+ }
+
+ result = strlen(line) + 1;
+ if(result > maxSize)
+ {
+ free(line);
+ error("input line too long");
+ result = 0;
+ }
+ else
+ {
+ strcpy(buf, line);
+ strcat(buf, "\n");
+ free(line);
+ }
+ }
#else
- cout << parser->getPrompt() << flush;
-
- string line;
- while(true)
- {
- char c = static_cast<char>(getc(yyin));
- if(c == EOF)
- {
- if(line.size())
- {
- line += '\n';
- }
- break;
- }
-
- line += c;
-
- if(c == '\n')
- {
- break;
- }
- }
-
- result = (int) line.length();
- if(result > maxSize)
- {
- error("input line too long");
- buf[0] = EOF;
- result = 1;
- }
- else
- {
- strcpy(buf, line.c_str());
- }
+ cout << parser->getPrompt() << flush;
+
+ string line;
+ while(true)
+ {
+ char c = static_cast<char>(getc(yyin));
+ if(c == EOF)
+ {
+ if(line.size())
+ {
+ line += '\n';
+ }
+ break;
+ }
+
+ line += c;
+
+ if(c == '\n')
+ {
+ break;
+ }
+ }
+
+ result = (int) line.length();
+ if(result > maxSize)
+ {
+ error("input line too long");
+ buf[0] = EOF;
+ result = 1;
+ }
+ else
+ {
+ strcpy(buf, line.c_str());
+ }
#endif
}
else
{
- if(((result = (int) fread(buf, 1, maxSize, yyin)) == 0) && ferror(yyin))
- {
- error("input in flex scanner failed");
- buf[0] = EOF;
- result = 1;
- }
+ if(((result = (int) fread(buf, 1, maxSize, yyin)) == 0) && ferror(yyin))
+ {
+ error("input in flex scanner failed");
+ buf[0] = EOF;
+ result = 1;
+ }
}
}
@@ -487,12 +487,12 @@ Parser::getPrompt()
if(_continue)
{
- _continue = false;
- return "(cont) ";
+ _continue = false;
+ return "(cont) ";
}
else
{
- return ">>> ";
+ return ">>> ";
}
}
@@ -505,13 +505,13 @@ Parser::scanPosition(const char* s)
idx = line.find("line");
if(idx != string::npos)
{
- line.erase(0, idx + 4);
+ line.erase(0, idx + 4);
}
idx = line.find_first_not_of(" \t\r#");
if(idx != string::npos)
{
- line.erase(0, idx);
+ line.erase(0, idx);
}
_currentLine = atoi(line.c_str()) - 1;
@@ -519,24 +519,24 @@ Parser::scanPosition(const char* s)
idx = line.find_first_of(" \t\r");
if(idx != string::npos)
{
- line.erase(0, idx);
+ line.erase(0, idx);
}
idx = line.find_first_not_of(" \t\r\"");
if(idx != string::npos)
{
- line.erase(0, idx);
+ line.erase(0, idx);
- idx = line.find_first_of(" \t\r\"");
- if(idx != string::npos)
- {
- _currentFile = line.substr(0, idx);
- line.erase(0, idx + 1);
- }
- else
- {
- _currentFile = line;
- }
+ idx = line.find_first_of(" \t\r\"");
+ if(idx != string::npos)
+ {
+ _currentFile = line.substr(0, idx);
+ line.erase(0, idx + 1);
+ }
+ else
+ {
+ _currentFile = line;
+ }
}
}
@@ -545,11 +545,11 @@ Parser::error(const char* s)
{
if(_commands.empty() && !isatty(fileno(yyin)))
{
- cerr << _currentFile << ':' << _currentLine << ": " << s << endl;
+ cerr << _currentFile << ':' << _currentLine << ": " << s << endl;
}
else
{
- cerr << "error: " << s << endl;
+ cerr << "error: " << s << endl;
}
_errors++;
}
@@ -565,11 +565,11 @@ Parser::warning(const char* s)
{
if(_commands.empty() && !isatty(fileno(yyin)))
{
- cerr << _currentFile << ':' << _currentLine << ": warning: " << s << endl;
+ cerr << _currentFile << ':' << _currentLine << ": warning: " << s << endl;
}
else
{
- cerr << "warning: " << s << endl;
+ cerr << "warning: " << s << endl;
}
}
@@ -600,7 +600,7 @@ Parser::parse(FILE* file, bool debug)
int status = yyparse();
if(_errors)
{
- status = EXIT_FAILURE;
+ status = EXIT_FAILURE;
}
parser = 0;
@@ -628,7 +628,7 @@ Parser::parse(const std::string& commands, bool debug)
int status = yyparse();
if(_errors)
{
- status = EXIT_FAILURE;
+ status = EXIT_FAILURE;
}
parser = 0;
@@ -642,13 +642,13 @@ Parser::findManagerById(const string& full, string& arg) const
arg = id.name;
if(id.category.empty())
{
- return _defaultManager;
+ return _defaultManager;
}
id.name = "TopicManager";
map<Ice::Identity, IceStorm::TopicManagerPrx>::const_iterator p = _managers.find(id);
if(p == _managers.end())
{
- throw UnknownManagerException(id.category, __FILE__, __LINE__);
+ throw UnknownManagerException(id.category, __FILE__, __LINE__);
}
return p->second;
}
@@ -662,13 +662,13 @@ Parser::findManagerByCategory(const string& full) const
map<Ice::Identity, IceStorm::TopicManagerPrx>::const_iterator p = _managers.find(id);
if(p == _managers.end())
{
- throw UnknownManagerException(id.category, __FILE__, __LINE__);
+ throw UnknownManagerException(id.category, __FILE__, __LINE__);
}
return p->second;
}
Parser::Parser(const CommunicatorPtr& communicator, const TopicManagerPrx& admin,
- const map<Ice::Identity, IceStorm::TopicManagerPrx>& managers) :
+ const map<Ice::Identity, IceStorm::TopicManagerPrx>& managers) :
_communicator(communicator),
_defaultManager(admin),
_managers(managers)
diff --git a/cpp/src/IceStorm/Parser.h b/cpp/src/IceStorm/Parser.h
index be753288518..df8d77c0ea4 100644
--- a/cpp/src/IceStorm/Parser.h
+++ b/cpp/src/IceStorm/Parser.h
@@ -64,7 +64,7 @@ class Parser : public ::IceUtil::SimpleShared
public:
static ParserPtr createParser(const Ice::CommunicatorPtr&, const IceStorm::TopicManagerPrx&,
- const std::map<Ice::Identity, IceStorm::TopicManagerPrx>&);
+ const std::map<Ice::Identity, IceStorm::TopicManagerPrx>&);
void usage();
@@ -100,7 +100,7 @@ private:
IceStorm::TopicManagerPrx findManagerByCategory(const std::string&) const;
Parser(const Ice::CommunicatorPtr&, const IceStorm::TopicManagerPrx&,
- const std::map<Ice::Identity, IceStorm::TopicManagerPrx>&);
+ const std::map<Ice::Identity, IceStorm::TopicManagerPrx>&);
const Ice::CommunicatorPtr _communicator;
IceStorm::TopicManagerPrx _defaultManager;
diff --git a/cpp/src/IceStorm/Service.cpp b/cpp/src/IceStorm/Service.cpp
index be83f1c46a2..bf5b93951c6 100644
--- a/cpp/src/IceStorm/Service.cpp
+++ b/cpp/src/IceStorm/Service.cpp
@@ -32,15 +32,15 @@ public:
virtual ~ServiceI();
virtual void start(const string&,
- const CommunicatorPtr&,
- const StringSeq&);
+ const CommunicatorPtr&,
+ const StringSeq&);
virtual void start(const CommunicatorPtr&,
- const ObjectAdapterPtr&,
- const ObjectAdapterPtr&,
- const string&,
- const Ice::Identity&,
- const string&);
+ const ObjectAdapterPtr&,
+ const ObjectAdapterPtr&,
+ const string&,
+ const Ice::Identity&,
+ const string&);
virtual TopicManagerPrx getTopicManager() const;
@@ -70,11 +70,11 @@ createIceStorm(CommunicatorPtr communicator)
ServicePtr
IceStorm::Service::create(const CommunicatorPtr& communicator,
- const ObjectAdapterPtr& topicAdapter,
- const ObjectAdapterPtr& publishAdapter,
- const string& name,
- const Ice::Identity& id,
- const string& dbEnv)
+ const ObjectAdapterPtr& topicAdapter,
+ const ObjectAdapterPtr& publishAdapter,
+ const string& name,
+ const Ice::Identity& id,
+ const string& dbEnv)
{
ServiceI* service = new ServiceI;
ServicePtr svc = service;
@@ -113,26 +113,26 @@ IceStorm::ServiceI::start(
try
{
- _manager = new TopicManagerI(_instance, _topicAdapter, name, "topics");
- _managerProxy = TopicManagerPrx::uncheckedCast(_topicAdapter->add(_manager, topicManagerId));
+ _manager = new TopicManagerI(_instance, _topicAdapter, name, "topics");
+ _managerProxy = TopicManagerPrx::uncheckedCast(_topicAdapter->add(_manager, topicManagerId));
}
catch(const Ice::Exception&)
{
- _instance = 0;
- throw;
+ _instance = 0;
+ throw;
}
-
+
_topicAdapter->activate();
_publishAdapter->activate();
}
void
IceStorm::ServiceI::start(const CommunicatorPtr& communicator,
- const ObjectAdapterPtr& topicAdapter,
- const ObjectAdapterPtr& publishAdapter,
- const string& name,
- const Ice::Identity& id,
- const string& dbEnv)
+ const ObjectAdapterPtr& topicAdapter,
+ const ObjectAdapterPtr& publishAdapter,
+ const string& name,
+ const Ice::Identity& id,
+ const string& dbEnv)
{
string instanceName = communicator->getProperties()->getPropertyWithDefault(name + ".InstanceName", "IceStorm");
_instance = new Instance(instanceName, name, communicator, publishAdapter);
@@ -142,13 +142,13 @@ IceStorm::ServiceI::start(const CommunicatorPtr& communicator,
//
try
{
- _manager = new TopicManagerI(_instance, topicAdapter, dbEnv, "topics");
- _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(_manager, id));
+ _manager = new TopicManagerI(_instance, topicAdapter, dbEnv, "topics");
+ _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(_manager, id));
}
catch(const Ice::Exception&)
{
- _instance = 0;
- throw;
+ _instance = 0;
+ throw;
}
}
@@ -163,20 +163,20 @@ IceStorm::ServiceI::stop()
{
if(_topicAdapter)
{
- _topicAdapter->deactivate();
+ _topicAdapter->deactivate();
}
if(_publishAdapter)
{
- _publishAdapter->deactivate();
+ _publishAdapter->deactivate();
}
if(_topicAdapter)
{
- _topicAdapter->waitForDeactivate();
+ _topicAdapter->waitForDeactivate();
}
if(_publishAdapter)
{
- _publishAdapter->waitForDeactivate();
+ _publishAdapter->waitForDeactivate();
}
//
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index 9174eed5949..3162116566a 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -37,7 +37,7 @@ class PerSubscriberPublisherI : public Ice::BlobjectArray
public:
PerSubscriberPublisherI(const InstancePtr& instance) :
- _instance(instance)
+ _instance(instance)
{
}
@@ -48,36 +48,36 @@ public:
void
setSubscriber(const SubscriberPtr& subscriber)
{
- _subscriber = subscriber;
+ _subscriber = subscriber;
}
virtual bool
ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
- vector<Ice::Byte>&,
- const Ice::Current& current)
+ vector<Ice::Byte>&,
+ const Ice::Current& current)
{
- EventDataPtr event = new EventData(
- current.operation,
- current.mode,
- Ice::ByteSeq(),
- current.ctx);
+ EventDataPtr event = new EventData(
+ current.operation,
+ current.mode,
+ Ice::ByteSeq(),
+ current.ctx);
- //
- // COMPILERBUG: gcc 4.0.1 doesn't like this.
- //
- //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
- Ice::ByteSeq data(inParams.first, inParams.second);
- event->data.swap(data);
+ //
+ // COMPILERBUG: gcc 4.0.1 doesn't like this.
+ //
+ //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
+ Ice::ByteSeq data(inParams.first, inParams.second);
+ event->data.swap(data);
- EventDataSeq e;
- e.push_back(event);
- Subscriber::QueueState state = _subscriber->queue(false, e);
+ EventDataSeq e;
+ e.push_back(event);
+ Subscriber::QueueState state = _subscriber->queue(false, e);
- if(state == Subscriber::QueueStateFlush)
- {
- _instance->subscriberPool()->flush(_subscriber);
- }
- return true;
+ if(state == Subscriber::QueueStateFlush)
+ {
+ _instance->subscriberPool()->flush(_subscriber);
+ }
+ return true;
}
private:
@@ -188,7 +188,7 @@ SubscriberOneway::SubscriberOneway(
if(_batch)
{
- _instance->batchFlusher()->add(_obj);
+ _instance->batchFlusher()->add(_obj);
}
}
@@ -202,66 +202,66 @@ SubscriberOneway::flush()
//
if(_state == SubscriberStateError)
{
- return false;
+ return false;
}
assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
try
{
- //
- // Get the current set of events, but release the lock before
- // attempting to deliver the events. This allows other threads
- // to add events in case we block (such as during connection
- // establishment).
- //
- EventDataSeq v;
- v.swap(_events);
- sync.release();
-
- //
- // Deliver the events without holding the lock.
- //
- // If there are more than one event queued and we are not in
- // batch sending mode then send the events as a batch and then
- // flush immediately, otherwise send one at a time.
- //
- vector<Ice::Byte> dummy;
- if(v.size() > 1 && !_batch)
- {
- for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
- {
- _objBatch->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
- }
- Ice::ConnectionPtr conn = _objBatch->ice_getCachedConnection();
- assert(conn);
- conn->flushBatchRequests();
- }
- else
- {
- for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
- {
- _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
- }
- }
-
- //
- // Reacquire the lock before we check the queue again.
- //
- sync.acquire();
+ //
+ // Get the current set of events, but release the lock before
+ // attempting to deliver the events. This allows other threads
+ // to add events in case we block (such as during connection
+ // establishment).
+ //
+ EventDataSeq v;
+ v.swap(_events);
+ sync.release();
+
+ //
+ // Deliver the events without holding the lock.
+ //
+ // If there are more than one event queued and we are not in
+ // batch sending mode then send the events as a batch and then
+ // flush immediately, otherwise send one at a time.
+ //
+ vector<Ice::Byte> dummy;
+ if(v.size() > 1 && !_batch)
+ {
+ for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
+ {
+ _objBatch->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
+ }
+ Ice::ConnectionPtr conn = _objBatch->ice_getCachedConnection();
+ assert(conn);
+ conn->flushBatchRequests();
+ }
+ else
+ {
+ for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
+ {
+ _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
+ }
+ }
+
+ //
+ // Reacquire the lock before we check the queue again.
+ //
+ sync.acquire();
}
catch(const Ice::LocalException& ex)
{
- assert(!sync.acquired());
- // error will re-acquire and release the lock.
- error(ex);
- return false;
+ assert(!sync.acquired());
+ // error will re-acquire and release the lock.
+ error(ex);
+ return false;
}
if(!_events.empty())
{
- assert(_state == SubscriberStateFlushPending);
- return true;
+ assert(_state == SubscriberStateFlushPending);
+ return true;
}
_state = SubscriberStateOnline;
return false;
@@ -272,7 +272,7 @@ SubscriberOneway::destroy()
{
if(_batch)
{
- _instance->batchFlusher()->remove(_obj);
+ _instance->batchFlusher()->remove(_obj);
}
Subscriber::destroy();
}
@@ -285,7 +285,7 @@ class TwowayInvokeI : public Ice::AMI_Object_ice_invoke
public:
TwowayInvokeI(const SubscriberPtr& subscriber) :
- _subscriber(subscriber)
+ _subscriber(subscriber)
{
}
@@ -297,7 +297,7 @@ public:
virtual void
ice_exception(const Ice::Exception& e)
{
- _subscriber->error(e);
+ _subscriber->error(e);
}
private:
@@ -326,7 +326,7 @@ SubscriberTwoway::flush()
//
if(_state == SubscriberStateError)
{
- return false;
+ return false;
}
assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
@@ -346,7 +346,7 @@ SubscriberTwoway::flush()
//
for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
{
- _obj->ice_invoke_async(new TwowayInvokeI(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context);
+ _obj->ice_invoke_async(new TwowayInvokeI(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context);
}
//
@@ -360,8 +360,8 @@ SubscriberTwoway::flush()
//
if(!_events.empty())
{
- assert(_state == SubscriberStateFlushPending);
- return true;
+ assert(_state == SubscriberStateFlushPending);
+ return true;
}
_state = SubscriberStateOnline;
return false;
@@ -375,20 +375,20 @@ class TwowayOrderedInvokeI : public Ice::AMI_Object_ice_invoke
public:
TwowayOrderedInvokeI(const SubscriberTwowayOrderedPtr& subscriber) :
- _subscriber(subscriber)
+ _subscriber(subscriber)
{
}
virtual void
ice_response(bool, const std::vector<Ice::Byte>&)
{
- _subscriber->response();
+ _subscriber->response();
}
virtual void
ice_exception(const Ice::Exception& ex)
{
- _subscriber->error(ex);
+ _subscriber->error(ex);
}
private:
@@ -412,20 +412,20 @@ SubscriberTwowayOrdered::flush()
{
EventDataPtr e;
{
- IceUtil::Mutex::Lock sync(_mutex);
-
- //
- // If the subscriber errored out then we're done.
- //
- if(_state == SubscriberStateError)
- {
- return false;
- }
- assert(_state == SubscriberStateFlushPending);
- assert(!_events.empty());
-
- e = _events.front();
- _events.erase(_events.begin());
+ IceUtil::Mutex::Lock sync(_mutex);
+
+ //
+ // If the subscriber errored out then we're done.
+ //
+ if(_state == SubscriberStateError)
+ {
+ return false;
+ }
+ assert(_state == SubscriberStateFlushPending);
+ assert(!_events.empty());
+
+ e = _events.front();
+ _events.erase(_events.begin());
}
_obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context);
@@ -441,8 +441,8 @@ SubscriberTwowayOrdered::response()
assert(_state != SubscriberStateError);
if(_events.empty())
{
- _state = SubscriberStateOnline;
- return;
+ _state = SubscriberStateOnline;
+ return;
}
_instance->subscriberPool()->flush(this);
}
@@ -455,31 +455,31 @@ class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward
public:
Topiclink_forwardI(const SubscriberLinkPtr& subscriber) :
- _subscriber(subscriber)
+ _subscriber(subscriber)
{
}
virtual void
ice_response()
{
- _subscriber->response();
+ _subscriber->response();
}
virtual void
ice_exception(const Ice::Exception& ex)
{
- try
- {
- ex.ice_throw();
- }
- catch(const Ice::ObjectNotExistException& ex)
- {
- _subscriber->error(ex);
- }
- catch(const Ice::LocalException& ex)
- {
- _subscriber->offline(ex);
- }
+ try
+ {
+ ex.ice_throw();
+ }
+ catch(const Ice::ObjectNotExistException& ex)
+ {
+ _subscriber->error(ex);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ _subscriber->offline(ex);
+ }
}
private:
@@ -505,7 +505,7 @@ SubscriberLink::queue(bool forwarded, const EventDataSeq& events)
{
if(forwarded)
{
- return QueueStateNoFlush;
+ return QueueStateNoFlush;
}
//
@@ -518,7 +518,7 @@ SubscriberLink::queue(bool forwarded, const EventDataSeq& events)
if(_state == SubscriberStateError)
{
- return QueueStateError;
+ return QueueStateError;
}
//
@@ -527,53 +527,53 @@ SubscriberLink::queue(bool forwarded, const EventDataSeq& events)
//
if(_state == SubscriberStateOffline)
{
- //
- // If there are alot of subscribers offline then we will call
- // Time::now() alot, which could be costly. This could be
- // optimized to only one per event-batch by making the
- // forwarded argument an EventInfo thing where the queue-time
- // is lazy initialized.
- //
- if(IceUtil::Time::now() < _next)
- {
- return QueueStateNoFlush;
- }
-
- //
- // State transition to online.
- //
- _state = SubscriberStateOnline;
+ //
+ // If there are alot of subscribers offline then we will call
+ // Time::now() alot, which could be costly. This could be
+ // optimized to only one per event-batch by making the
+ // forwarded argument an EventInfo thing where the queue-time
+ // is lazy initialized.
+ //
+ if(IceUtil::Time::now() < _next)
+ {
+ return QueueStateNoFlush;
+ }
+
+ //
+ // State transition to online.
+ //
+ _state = SubscriberStateOnline;
}
int queued = 0;
for(EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p)
{
- if(_cost != 0)
- {
- //
- // Note that we could calculate this cost once and cache
- // it in a private form of the event to avoid this if this
- // really is a performance problem (this could use the
- // EventInfo thing discussed above).
- //
- int cost = 0;
- Ice::Context::const_iterator q = (*p)->context.find("cost");
- if(q != (*p)->context.end())
- {
- cost = atoi(q->second.c_str());
- }
- if(cost > _cost)
- {
- continue;
- }
- }
- ++queued;
- _events.push_back(*p);
+ if(_cost != 0)
+ {
+ //
+ // Note that we could calculate this cost once and cache
+ // it in a private form of the event to avoid this if this
+ // really is a performance problem (this could use the
+ // EventInfo thing discussed above).
+ //
+ int cost = 0;
+ Ice::Context::const_iterator q = (*p)->context.find("cost");
+ if(q != (*p)->context.end())
+ {
+ cost = atoi(q->second.c_str());
+ }
+ if(cost > _cost)
+ {
+ continue;
+ }
+ }
+ ++queued;
+ _events.push_back(*p);
}
if(_state == SubscriberStateFlushPending || queued == 0)
{
- return QueueStateNoFlush;
+ return QueueStateNoFlush;
}
_state = SubscriberStateFlushPending;
return QueueStateFlush;
@@ -584,20 +584,20 @@ SubscriberLink::flush()
{
EventDataSeq v;
{
- IceUtil::Mutex::Lock sync(_mutex);
-
- //
- // If the subscriber errored out then we're done.
- //
- if(_state == SubscriberStateError)
- {
- return false;
- }
+ IceUtil::Mutex::Lock sync(_mutex);
+
+ //
+ // If the subscriber errored out then we're done.
+ //
+ if(_state == SubscriberStateError)
+ {
+ return false;
+ }
- assert(_state == SubscriberStateFlushPending);
- assert(!_events.empty());
-
- v.swap(_events);
+ assert(_state == SubscriberStateFlushPending);
+ assert(!_events.empty());
+
+ v.swap(_events);
}
_obj->forward_async(new Topiclink_forwardI(this), v);
@@ -622,8 +622,8 @@ SubscriberLink::response()
//
if(_events.empty())
{
- _state = SubscriberStateOnline;
- return;
+ _state = SubscriberStateOnline;
+ return;
}
_instance->subscriberPool()->flush(this);
}
@@ -639,18 +639,18 @@ SubscriberLink::offline(const Ice::Exception& e)
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(_warn)
{
- Ice::Warning warn(traceLevels->logger);
- warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_id)
- << ": link offline: " << e;
+ Ice::Warning warn(traceLevels->logger);
+ warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_id)
+ << ": link offline: " << e;
}
else
{
- if(traceLevels->subscriber > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << _instance->communicator()->identityToString(_id) << ": link offline: " << e
- << " discarding events: " << _instance->discardInterval() << "s";
- }
+ if(traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << _instance->communicator()->identityToString(_id) << ": link offline: " << e
+ << " discarding events: " << _instance->discardInterval() << "s";
+ }
}
_state = SubscriberStateOffline;
@@ -675,57 +675,57 @@ Subscriber::create(
try
{
- string reliability;
- QoS::const_iterator p = qos.find("reliability");
- if(p != qos.end())
- {
- reliability = p->second;
- }
- if(!reliability.empty() && reliability != "ordered")
- {
- throw BadQoS("invalid reliability: " + reliability);
- }
-
- //
- // Override the timeout.
- //
- Ice::ObjectPrx newObj;
- try
- {
- newObj = obj->ice_timeout(instance->sendTimeout());
- }
- catch(const Ice::FixedProxyException&)
- {
- //
- // In the event IceStorm is collocated this could be a
- // fixed proxy in which case its not possible to set the
- // timeout.
- //
- newObj = obj;
- }
- if(reliability == "ordered")
- {
- if(!newObj->ice_isTwoway())
- {
- throw BadQoS("ordered reliability requires a twoway proxy");
- }
- subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj);
- }
- else if(newObj->ice_isOneway() || newObj->ice_isDatagram() ||
- newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
- {
- subscriber = new SubscriberOneway(instance, proxy, newObj);
- }
- else if(newObj->ice_isTwoway())
- {
- subscriber = new SubscriberTwoway(instance, proxy, newObj);
- }
- per->setSubscriber(subscriber);
+ string reliability;
+ QoS::const_iterator p = qos.find("reliability");
+ if(p != qos.end())
+ {
+ reliability = p->second;
+ }
+ if(!reliability.empty() && reliability != "ordered")
+ {
+ throw BadQoS("invalid reliability: " + reliability);
+ }
+
+ //
+ // Override the timeout.
+ //
+ Ice::ObjectPrx newObj;
+ try
+ {
+ newObj = obj->ice_timeout(instance->sendTimeout());
+ }
+ catch(const Ice::FixedProxyException&)
+ {
+ //
+ // In the event IceStorm is collocated this could be a
+ // fixed proxy in which case its not possible to set the
+ // timeout.
+ //
+ newObj = obj;
+ }
+ if(reliability == "ordered")
+ {
+ if(!newObj->ice_isTwoway())
+ {
+ throw BadQoS("ordered reliability requires a twoway proxy");
+ }
+ subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj);
+ }
+ else if(newObj->ice_isOneway() || newObj->ice_isDatagram() ||
+ newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
+ {
+ subscriber = new SubscriberOneway(instance, proxy, newObj);
+ }
+ else if(newObj->ice_isTwoway())
+ {
+ subscriber = new SubscriberTwoway(instance, proxy, newObj);
+ }
+ per->setSubscriber(subscriber);
}
catch(const Ice::Exception&)
{
- instance->objectAdapter()->remove(proxy->ice_getIdentity());
- throw;
+ instance->objectAdapter()->remove(proxy->ice_getIdentity());
+ throw;
}
return subscriber;
@@ -738,9 +738,9 @@ Subscriber::create(
int cost)
{
return new SubscriberLink(
- instance,
- TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())),
- cost);
+ instance,
+ TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())),
+ cost);
}
Subscriber::~Subscriber()
@@ -772,13 +772,13 @@ Subscriber::queue(bool, const EventDataSeq& events)
if(_state == SubscriberStateError)
{
- return QueueStateError;
+ return QueueStateError;
}
copy(events.begin(), events.end(), back_inserter(_events));
if(_state == SubscriberStateFlushPending)
{
- return QueueStateNoFlush;
+ return QueueStateNoFlush;
}
_state = SubscriberStateFlushPending;
@@ -793,18 +793,18 @@ Subscriber::destroy()
//
if(_proxy)
{
- try
- {
- _instance->objectAdapter()->remove(_proxy->ice_getIdentity());
- }
- catch(const Ice::NotRegisteredException&)
- {
- // Ignore
- }
- catch(const Ice::ObjectAdapterDeactivatedException&)
- {
- // Ignore
- }
+ try
+ {
+ _instance->objectAdapter()->remove(_proxy->ice_getIdentity());
+ }
+ catch(const Ice::NotRegisteredException&)
+ {
+ // Ignore
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ // Ignore
+ }
}
}
@@ -813,9 +813,9 @@ Subscriber::flushTime(const IceUtil::Time& interval)
{
if(_resetMax || interval > _maxSend)
{
- assert(interval != IceUtil::Time());
- _resetMax = false;
- _maxSend = interval;
+ assert(interval != IceUtil::Time());
+ _resetMax = false;
+ _maxSend = interval;
}
}
@@ -833,15 +833,15 @@ Subscriber::error(const Ice::Exception& e)
IceUtil::Mutex::Lock sync(_mutex);
if(_state != SubscriberStateError)
{
- _state = SubscriberStateError;
- _events.clear();
-
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->subscriber > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << _instance->communicator()->identityToString(_id) << ": topic publish failed: " << e;
- }
+ _state = SubscriberStateError;
+ _events.clear();
+
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << _instance->communicator()->identityToString(_id) << ": topic publish failed: " << e;
+ }
}
}
diff --git a/cpp/src/IceStorm/Subscriber.h b/cpp/src/IceStorm/Subscriber.h
index bbe83750874..166362f1128 100644
--- a/cpp/src/IceStorm/Subscriber.h
+++ b/cpp/src/IceStorm/Subscriber.h
@@ -36,9 +36,9 @@ public:
enum QueueState
{
- QueueStateError,
- QueueStateFlush,
- QueueStateNoFlush
+ QueueStateError,
+ QueueStateFlush,
+ QueueStateNoFlush
};
virtual QueueState queue(bool, const std::vector<EventDataPtr>&);
//
@@ -70,10 +70,10 @@ protected:
enum SubscriberState
{
- SubscriberStateOnline,
- SubscriberStateFlushPending,
- SubscriberStateOffline,
- SubscriberStateError
+ SubscriberStateOnline,
+ SubscriberStateFlushPending,
+ SubscriberStateOffline,
+ SubscriberStateError
};
SubscriberState _state; // The subscriber state.
EventDataSeq _events; // The queue of events to send.
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp
index 42c92fd97cf..1d1fd773028 100644
--- a/cpp/src/IceStorm/SubscriberPool.cpp
+++ b/cpp/src/IceStorm/SubscriberPool.cpp
@@ -27,9 +27,9 @@ class SubscriberPoolWorker : public IceUtil::Thread
public:
SubscriberPoolWorker(const SubscriberPoolPtr& manager) :
- _manager(manager)
+ _manager(manager)
{
- start();
+ start();
}
~SubscriberPoolWorker()
@@ -39,35 +39,35 @@ public:
virtual void
run()
{
- IceUtil::Time interval = IceUtil::Time::seconds(24 * 60); // A long time.
- SubscriberPtr sub;
- bool requeue = false;
- bool computeInterval = false;
- while(true)
- {
- _manager->dequeue(sub, requeue, interval, computeInterval);
- if(!sub)
- {
- return;
- }
-
- //
- // If SubscriberPool returns true then the subscriber
- // needs to be SubscriberPooled again, so therefore we
- // will re-enqueue the subscriber in the call to dequeue.
- //
- if(computeInterval)
- {
- IceUtil::Time start = IceUtil::Time::now();
- requeue = sub->flush();
- interval = IceUtil::Time::now() - start;
- }
- else
- {
- requeue = sub->flush();
- interval = IceUtil::Time::seconds(24 * 60); // A long time.
- }
- }
+ IceUtil::Time interval = IceUtil::Time::seconds(24 * 60); // A long time.
+ SubscriberPtr sub;
+ bool requeue = false;
+ bool computeInterval = false;
+ while(true)
+ {
+ _manager->dequeue(sub, requeue, interval, computeInterval);
+ if(!sub)
+ {
+ return;
+ }
+
+ //
+ // If SubscriberPool returns true then the subscriber
+ // needs to be SubscriberPooled again, so therefore we
+ // will re-enqueue the subscriber in the call to dequeue.
+ //
+ if(computeInterval)
+ {
+ IceUtil::Time start = IceUtil::Time::now();
+ requeue = sub->flush();
+ interval = IceUtil::Time::now() - start;
+ }
+ else
+ {
+ requeue = sub->flush();
+ interval = IceUtil::Time::seconds(24 * 60); // A long time.
+ }
+ }
}
private:
@@ -96,32 +96,32 @@ SubscriberPoolMonitor::run()
for(;;)
{
{
- Lock sync(*this);
- if(_destroyed)
- {
- return;
- }
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
- if(_needCheck)
- {
- timedWait(_timeout);
- //
- // Monitoring was stopped.
- //
- if(!_needCheck)
- {
- continue;
- }
- if(_destroyed)
- {
- return;
- }
- }
- else
- {
- wait();
- continue;
- }
+ if(_needCheck)
+ {
+ timedWait(_timeout);
+ //
+ // Monitoring was stopped.
+ //
+ if(!_needCheck)
+ {
+ continue;
+ }
+ if(_destroyed)
+ {
+ return;
+ }
+ }
+ else
+ {
+ wait();
+ continue;
+ }
}
//
// Call outside of the lock to prevent any deadlocks.
@@ -163,7 +163,7 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) :
_size(instance->properties()->getPropertyAsIntWithDefault("IceStorm.SubscriberPool.Size", 1)),
// minimum 50ms, default 1s.
_timeout(IceUtil::Time::milliSeconds(max(instance->properties()->getPropertyAsIntWithDefault(
- "IceStorm.SubscriberPool.Timeout", 1000), 50))),
+ "IceStorm.SubscriberPool.Timeout", 1000), 50))),
// 10 * the stall timeout.
_stallCheck(_timeout * 10),
_destroyed(false),
@@ -171,23 +171,23 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) :
{
try
{
- __setNoDelete(true);
- _subscriberPoolMonitor = new SubscriberPoolMonitor(this, _timeout);
- for(unsigned int i = 0; i < _size; ++i)
- {
- ++_inUse;
- _workers.push_back(new SubscriberPoolWorker(this));
- }
+ __setNoDelete(true);
+ _subscriberPoolMonitor = new SubscriberPoolMonitor(this, _timeout);
+ for(unsigned int i = 0; i < _size; ++i)
+ {
+ ++_inUse;
+ _workers.push_back(new SubscriberPoolWorker(this));
+ }
}
catch(const IceUtil::Exception& ex)
{
- {
- Ice::Error out(_traceLevels->logger);
- out << "SubscriberPool: " << ex;
- }
- destroy();
- __setNoDelete(false);
- throw;
+ {
+ Ice::Error out(_traceLevels->logger);
+ out << "SubscriberPool: " << ex;
+ }
+ destroy();
+ __setNoDelete(false);
+ throw;
}
__setNoDelete(false);
@@ -203,7 +203,7 @@ SubscriberPool::flush(list<SubscriberPtr>& subscribers)
Lock sync(*this);
if(_destroyed)
{
- return;
+ return;
}
//
// Splice on the new set of subscribers to SubscriberPool.
@@ -219,7 +219,7 @@ SubscriberPool::flush(const SubscriberPtr& subscriber)
Lock sync(*this);
if(_destroyed)
{
- return;
+ return;
}
_pending.push_back(subscriber);
assert(invariants());
@@ -232,7 +232,7 @@ SubscriberPool::add(const SubscriberPtr& subscriber)
Lock sync(*this);
if(_destroyed)
{
- return;
+ return;
}
_subscribers.push_back(subscriber);
assert(invariants());
@@ -244,7 +244,7 @@ SubscriberPool::remove(const SubscriberPtr& subscriber)
Lock sync(*this);
if(_destroyed)
{
- return;
+ return;
}
//
// Note that this cannot remove based on the subscriber id because
@@ -267,18 +267,18 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
if(_destroyed)
{
- subscriber = 0;
- return;
+ subscriber = 0;
+ return;
}
if(subscriber)
{
- if(requeue)
- {
- _pending.push_back(subscriber);
- assert(invariants());
- }
- subscriber->flushTime(interval);
+ if(requeue)
+ {
+ _pending.push_back(subscriber);
+ assert(invariants());
+ }
+ subscriber->flushTime(interval);
}
//
// Clear the reference.
@@ -297,99 +297,99 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
if(_sizeMax != 1)
{
- //
- // Reap dead workers, if necessary.
- //
- if(_reap > 0)
- {
- if(_traceLevels->subscriberPool > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
- out << "reaping: " << _reap << " workers";
- }
- list<IceUtil::ThreadPtr>::iterator p = _workers.begin();
- while(p != _workers.end() && _reap > 0)
- {
- if(!(*p)->isAlive())
- {
- (*p)->getThreadControl().join();
- p = _workers.erase(p);
- --_reap;
- }
- else
- {
- ++p;
- }
- }
- }
-
- //
- // If we have extra workers every _stallCheck period we run
- // through the complete set of subscribers and determine how
- // many have stalled since the last check. If this number is
- // less than the number of extra threads then we terminate the
- // calling worker.
- //
- // - The flush time is protected by the subscriber pool mutex.
- // - The flush time is only computed if we have extra threads,
- // otherwise it is set to some large value.
- // - The max flush time is reset to the next sending interval
- // after after _stallCheck period.
- // - Every subscriber is considered to be stalled iff it has
- // never sent an event or we have just created the first
- // additional worker. The first handles the case where a
- // subscriber stalls for a long time on the first message
- // send. The second means that we can disable computation of
- // the flush latency if there are no additional threads.
- //
- if(_workers.size() > _size)
- {
- IceUtil::Time now = IceUtil::Time::now();
- if(now - _lastStallCheck > _stallCheck)
- {
- _lastStallCheck = now;
- unsigned int stalls = 0;
- for(list<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
- {
- if((*p)->pollMaxFlushTime(now) > _timeout)
- {
- ++stalls;
- }
- }
-
- if(_traceLevels->subscriberPool > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
- out << "checking stalls. extra workers: " << _workers.size() - _size
- << " subscribers: " << _subscribers.size() << " stalls: " << stalls;
- }
-
- if((_workers.size() - _size) > stalls)
- {
- if(_traceLevels->subscriberPool > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
- out << "destroying workers";
- }
- ++_reap;
- return;
- }
- }
- }
+ //
+ // Reap dead workers, if necessary.
+ //
+ if(_reap > 0)
+ {
+ if(_traceLevels->subscriberPool > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
+ out << "reaping: " << _reap << " workers";
+ }
+ list<IceUtil::ThreadPtr>::iterator p = _workers.begin();
+ while(p != _workers.end() && _reap > 0)
+ {
+ if(!(*p)->isAlive())
+ {
+ (*p)->getThreadControl().join();
+ p = _workers.erase(p);
+ --_reap;
+ }
+ else
+ {
+ ++p;
+ }
+ }
+ }
+
+ //
+ // If we have extra workers every _stallCheck period we run
+ // through the complete set of subscribers and determine how
+ // many have stalled since the last check. If this number is
+ // less than the number of extra threads then we terminate the
+ // calling worker.
+ //
+ // - The flush time is protected by the subscriber pool mutex.
+ // - The flush time is only computed if we have extra threads,
+ // otherwise it is set to some large value.
+ // - The max flush time is reset to the next sending interval
+ // after after _stallCheck period.
+ // - Every subscriber is considered to be stalled iff it has
+ // never sent an event or we have just created the first
+ // additional worker. The first handles the case where a
+ // subscriber stalls for a long time on the first message
+ // send. The second means that we can disable computation of
+ // the flush latency if there are no additional threads.
+ //
+ if(_workers.size() > _size)
+ {
+ IceUtil::Time now = IceUtil::Time::now();
+ if(now - _lastStallCheck > _stallCheck)
+ {
+ _lastStallCheck = now;
+ unsigned int stalls = 0;
+ for(list<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ {
+ if((*p)->pollMaxFlushTime(now) > _timeout)
+ {
+ ++stalls;
+ }
+ }
+
+ if(_traceLevels->subscriberPool > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
+ out << "checking stalls. extra workers: " << _workers.size() - _size
+ << " subscribers: " << _subscribers.size() << " stalls: " << stalls;
+ }
+
+ if((_workers.size() - _size) > stalls)
+ {
+ if(_traceLevels->subscriberPool > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
+ out << "destroying workers";
+ }
+ ++_reap;
+ return;
+ }
+ }
+ }
}
-
+
while(_pending.empty() && !_destroyed)
{
- //
- // If we wait then there is no need to monitor anymore.
- //
- _subscriberPoolMonitor->stopMonitor();
- wait();
+ //
+ // If we wait then there is no need to monitor anymore.
+ //
+ _subscriberPoolMonitor->stopMonitor();
+ wait();
}
if(_destroyed)
{
- return;
+ return;
}
_lastDequeue = IceUtil::Time::now();
@@ -405,11 +405,11 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
//
if(_inUse == _workers.size() && (_workers.size() < _sizeMax || _sizeMax != 1))
{
- _subscriberPoolMonitor->startMonitor();
+ _subscriberPoolMonitor->startMonitor();
}
else
{
- _subscriberPoolMonitor->stopMonitor();
+ _subscriberPoolMonitor->stopMonitor();
}
//
// We only need to compute the push interval if we've created
@@ -429,22 +429,22 @@ SubscriberPool::destroy()
// _destroyed is set.
//
{
- Lock sync(*this);
- _destroyed = true;
- notifyAll();
- if(_subscriberPoolMonitor)
- {
- _subscriberPoolMonitor->destroy();
- }
- _subscribers.clear();
- _pending.clear();
+ Lock sync(*this);
+ _destroyed = true;
+ notifyAll();
+ if(_subscriberPoolMonitor)
+ {
+ _subscriberPoolMonitor->destroy();
+ }
+ _subscribers.clear();
+ _pending.clear();
}
//
// Next join with each worker.
//
for(list<IceUtil::ThreadPtr>::const_iterator p = _workers.begin(); p != _workers.end(); ++p)
{
- (*p)->getThreadControl().join();
+ (*p)->getThreadControl().join();
}
_workers.clear();
@@ -455,8 +455,8 @@ SubscriberPool::destroy()
//
if(_subscriberPoolMonitor)
{
- _subscriberPoolMonitor->getThreadControl().join();
- _subscriberPoolMonitor = 0;
+ _subscriberPoolMonitor->getThreadControl().join();
+ _subscriberPoolMonitor = 0;
}
}
@@ -466,7 +466,7 @@ SubscriberPool::check()
Lock sync(*this);
if(_destroyed)
{
- return;
+ return;
}
IceUtil::Time now = IceUtil::Time::now();
@@ -474,34 +474,34 @@ SubscriberPool::check()
/*
if(_traceLevels->subscriberPool > 1)
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
- out << "check called: interval: " << interval << " timeout: " << _timeout
- << " pending: " << _pending.size() << " running: " << _workers.size()
- << " sizeMax: " << _sizeMax;
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
+ out << "check called: interval: " << interval << " timeout: " << _timeout
+ << " pending: " << _pending.size() << " running: " << _workers.size()
+ << " sizeMax: " << _sizeMax;
}
*/
if(interval > _timeout && _pending.size() > 0 && (_workers.size() < _sizeMax || _sizeMax == 0))
{
- if(_traceLevels->subscriberPool > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
- out << "detected stall: creating thread: threads: " << _workers.size();
- }
-
- //
- // We'll now start stall checking at regular intervals if this
- // is the first newly created worker. Here we need to
- // initially set the stall check and the number of requests at
- // this point.
- //
- if(_workers.size() == _size)
- {
- _lastStallCheck = now;
- }
-
- ++_inUse;
- _workers.push_back(new SubscriberPoolWorker(this));
+ if(_traceLevels->subscriberPool > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberPoolCat);
+ out << "detected stall: creating thread: threads: " << _workers.size();
+ }
+
+ //
+ // We'll now start stall checking at regular intervals if this
+ // is the first newly created worker. Here we need to
+ // initially set the stall check and the number of requests at
+ // this point.
+ //
+ if(_workers.size() == _size)
+ {
+ _lastStallCheck = now;
+ }
+
+ ++_inUse;
+ _workers.push_back(new SubscriberPoolWorker(this));
}
}
@@ -512,14 +512,14 @@ SubscriberPool::invariants()
list<SubscriberPtr>::const_iterator p;
for(p = _subscribers.begin(); p != _subscribers.end(); ++p)
{
- assert(subs.find(*p) == subs.end());
- subs.insert(*p);
+ assert(subs.find(*p) == subs.end());
+ subs.insert(*p);
}
subs.clear();
for(p = _pending.begin(); p != _pending.end(); ++p)
{
- assert(subs.find(*p) == subs.end());
- subs.insert(*p);
+ assert(subs.find(*p) == subs.end());
+ subs.insert(*p);
}
return true;
}
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index a88bc1c1744..1831a746ac5 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -35,33 +35,33 @@ class PublisherI : public Ice::BlobjectArray
public:
PublisherI(const TopicIPtr& topic) :
- _topic(topic)
+ _topic(topic)
{
}
virtual bool
ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
- Ice::ByteSeq&,
- const Ice::Current& current)
+ Ice::ByteSeq&,
+ const Ice::Current& current)
{
- EventDataPtr event = new EventData(
- current.operation,
- current.mode,
- Ice::ByteSeq(),
- current.ctx);
+ EventDataPtr event = new EventData(
+ current.operation,
+ current.mode,
+ Ice::ByteSeq(),
+ current.ctx);
- //
- // COMPILERBUG: gcc 4.0.1 doesn't like this.
- //
- //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
- Ice::ByteSeq data(inParams.first, inParams.second);
- event->data.swap(data);
-
- EventDataSeq v;
- v.push_back(event);
- _topic->publish(false, v);
+ //
+ // COMPILERBUG: gcc 4.0.1 doesn't like this.
+ //
+ //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
+ Ice::ByteSeq data(inParams.first, inParams.second);
+ event->data.swap(data);
+
+ EventDataSeq v;
+ v.push_back(event);
+ _topic->publish(false, v);
- return true;
+ return true;
}
private:
@@ -78,14 +78,14 @@ class TopicLinkI : public TopicLink
public:
TopicLinkI(const TopicIPtr& topic) :
- _topic(topic)
+ _topic(topic)
{
}
virtual void
forward(const EventDataSeq& v, const Ice::Current& current)
{
- _topic->publish(true, v);
+ _topic->publish(true, v);
}
private:
@@ -128,17 +128,17 @@ TopicI::TopicI(
Ice::Identity linkid;
if(id.category.empty())
{
- pubid.category = _name;
- pubid.name = "publish";
- linkid.category = _name;
- linkid.name = "link";
+ pubid.category = _name;
+ pubid.name = "publish";
+ linkid.category = _name;
+ linkid.name = "link";
}
else
{
- pubid.category = id.category;
- pubid.name = _name + ".publish";
- linkid.category = id.category;
- linkid.name = _name + ".link";
+ pubid.category = id.category;
+ pubid.name = _name + ".publish";
+ linkid.category = id.category;
+ linkid.name = _name + ".link";
}
_publisherPrx = _instance->objectAdapter()->add(new PublisherI(this), pubid);
@@ -149,20 +149,20 @@ TopicI::TopicI(
//
for(LinkRecordSeq::const_iterator p = _topicRecord.begin(); p != _topicRecord.end(); ++p)
{
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " relink " << _instance->communicator()->identityToString(p->theTopic->ice_getIdentity());
- }
-
- //
- // Create the subscriber object add it to the set of
- // subscribers.
- //
- SubscriberPtr subscriber = Subscriber::create(_instance, p->obj, p->cost);
- _subscribers.push_back(subscriber);
- _instance->subscriberPool()->add(subscriber);
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << " relink " << _instance->communicator()->identityToString(p->theTopic->ice_getIdentity());
+ }
+
+ //
+ // Create the subscriber object add it to the set of
+ // subscribers.
+ //
+ SubscriberPtr subscriber = Subscriber::create(_instance, p->obj, p->cost);
+ _subscribers.push_back(subscriber);
+ _instance->subscriberPool()->add(subscriber);
}
}
@@ -193,11 +193,11 @@ find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end,
{
while(start != end)
{
- if(*start == ident)
- {
- return start;
- }
- ++start;
+ if(*start == ident)
+ {
+ return start;
+ }
+ ++start;
}
return end;
}
@@ -211,74 +211,74 @@ TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Curr
QoS qos = origQoS;
if(traceLevels->topic > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "Subscribe: " << _instance->communicator()->identityToString(id);
- if(traceLevels->topic > 1)
- {
- out << " QoS: ";
- for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
- {
- if(p != qos.begin())
- {
- out << ',';
- }
- out << '[' << p->first << "," << p->second << ']';
- }
- }
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "Subscribe: " << _instance->communicator()->identityToString(id);
+ if(traceLevels->topic > 1)
+ {
+ out << " QoS: ";
+ for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
+ {
+ if(p != qos.begin())
+ {
+ out << ',';
+ }
+ out << '[' << p->first << "," << p->second << ']';
+ }
+ }
}
string reliability = "oneway";
{
- QoS::iterator p = qos.find("reliability");
- if(p != qos.end())
- {
- reliability = p->second;
- qos.erase(p);
- }
+ QoS::iterator p = qos.find("reliability");
+ if(p != qos.end())
+ {
+ reliability = p->second;
+ qos.erase(p);
+ }
}
Ice::ObjectPrx newObj = obj;
if(reliability == "batch")
{
- if(newObj->ice_isDatagram())
- {
- newObj = newObj->ice_batchDatagram();
- }
- else
- {
- newObj = newObj->ice_batchOneway();
- }
+ if(newObj->ice_isDatagram())
+ {
+ newObj = newObj->ice_batchDatagram();
+ }
+ else
+ {
+ newObj = newObj->ice_batchOneway();
+ }
}
else if(reliability == "twoway")
{
- newObj = newObj->ice_twoway();
+ newObj = newObj->ice_twoway();
}
else if(reliability == "twoway ordered")
{
- qos["reliability"] = "ordered";
- newObj = newObj->ice_twoway();
+ qos["reliability"] = "ordered";
+ newObj = newObj->ice_twoway();
}
else // reliability == "oneway"
{
- if(reliability != "oneway" && traceLevels->subscriber > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << reliability <<" mode not understood.";
- }
- if(!newObj->ice_isDatagram())
- {
- newObj = newObj->ice_oneway();
- }
+ if(reliability != "oneway" && traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << reliability <<" mode not understood.";
+ }
+ if(!newObj->ice_isDatagram())
+ {
+ newObj = newObj->ice_oneway();
+ }
}
IceUtil::Mutex::Lock sync(_subscribersMutex);
vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
if(p != _subscribers.end())
{
- (*p)->destroy();
- _instance->subscriberPool()->remove(*p);
- _subscribers.erase(p);
+ (*p)->destroy();
+ _instance->subscriberPool()->remove(*p);
+ _subscribers.erase(p);
}
SubscriberPtr subscriber = Subscriber::create(_instance, newObj, qos);
@@ -293,27 +293,27 @@ TopicI::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, cons
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "Subscribe: " << _instance->communicator()->identityToString(id);
- if(traceLevels->topic > 1)
- {
- out << " QoS: ";
- for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
- {
- if(p != qos.begin())
- {
- out << ',';
- }
- out << '[' << p->first << "," << p->second << ']';
- }
- }
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "Subscribe: " << _instance->communicator()->identityToString(id);
+ if(traceLevels->topic > 1)
+ {
+ out << " QoS: ";
+ for(QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
+ {
+ if(p != qos.begin())
+ {
+ out << ',';
+ }
+ out << '[' << p->first << "," << p->second << ']';
+ }
+ }
}
IceUtil::Mutex::Lock sync(_subscribersMutex);
vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
if(p != _subscribers.end())
{
- throw AlreadySubscribed();
+ throw AlreadySubscribed();
}
SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos);
@@ -329,20 +329,20 @@ TopicI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&)
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(!subscriber)
{
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "unsubscribe with null subscriber.";
- }
- return;
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "unsubscribe with null subscriber.";
+ }
+ return;
}
Ice::Identity id = subscriber->ice_getIdentity();
if(traceLevels->topic > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "Unsubscribe: " << _instance->communicator()->identityToString(id);
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "Unsubscribe: " << _instance->communicator()->identityToString(id);
}
//
@@ -367,7 +367,7 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
IceUtil::RecMutex::Lock topicSync(_topicRecordMutex);
if(_destroyed)
{
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
reap();
@@ -379,20 +379,20 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
// link.
for(LinkRecordSeq::const_iterator p = _topicRecord.begin(); p != _topicRecord.end(); ++p)
{
- if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity())
- {
- LinkExists ex;
- ex.name = name;
- throw ex;
- }
+ if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity())
+ {
+ LinkExists ex;
+ ex.name = name;
+ throw ex;
+ }
}
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " link " << _instance->communicator()->identityToString(id)
- << " cost " << cost;
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << " link " << _instance->communicator()->identityToString(id)
+ << " cost " << cost;
}
SubscriberPtr subscriber = Subscriber::create(_instance, link, cost);
@@ -422,7 +422,7 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current)
IceUtil::RecMutex::Lock topicSync(_topicRecordMutex);
if(_destroyed)
{
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
reap();
@@ -433,24 +433,24 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current)
LinkRecordSeq::iterator p = _topicRecord.begin();
while(p != _topicRecord.end())
{
- if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity())
- {
- break;
- }
- ++p;
+ if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity())
+ {
+ break;
+ }
+ ++p;
}
if(p == _topicRecord.end())
{
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " unlink " << name << " failed - not linked";
- }
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << " unlink " << name << " failed - not linked";
+ }
- NoSuchLink ex;
- ex.name = name;
- throw ex;
+ NoSuchLink ex;
+ ex.name = name;
+ throw ex;
}
Ice::ObjectPrx subscriber = p->obj;
@@ -464,8 +464,8 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current)
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " unlink " << _instance->communicator()->identityToString(id);
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << " unlink " << _instance->communicator()->identityToString(id);
}
removeSubscriber(subscriber);
}
@@ -481,11 +481,11 @@ TopicI::getLinkInfoSeq(const Ice::Current&) const
for(LinkRecordSeq::const_iterator q = _topicRecord.begin(); q != _topicRecord.end(); ++q)
{
- LinkInfo info;
- info.name = identityToTopicName(q->theTopic->ice_getIdentity());
- info.cost = q->cost;
- info.theTopic = q->theTopic;
- seq.push_back(info);
+ LinkInfo info;
+ info.name = identityToTopicName(q->theTopic->ice_getIdentity());
+ info.cost = q->cost;
+ info.theTopic = q->theTopic;
+ seq.push_back(info);
}
return seq;
@@ -498,18 +498,18 @@ TopicI::destroy(const Ice::Current&)
if(_destroyed)
{
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
_destroyed = true;
try
{
- _instance->objectAdapter()->remove(_linkPrx->ice_getIdentity());
- _instance->objectAdapter()->remove(_publisherPrx->ice_getIdentity());
+ _instance->objectAdapter()->remove(_linkPrx->ice_getIdentity());
+ _instance->objectAdapter()->remove(_publisherPrx->ice_getIdentity());
}
catch(const Ice::ObjectAdapterDeactivatedException&)
{
- // Ignore -- this could occur on shutdown.
+ // Ignore -- this could occur on shutdown.
}
}
@@ -533,7 +533,7 @@ TopicI::reap()
IceUtil::RecMutex::Lock topicSync(_topicRecordMutex);
if(_destroyed)
{
- return;
+ return;
}
bool updated = false;
@@ -543,46 +543,46 @@ TopicI::reap()
//
list<SubscriberPtr> error;
{
- IceUtil::Mutex::Lock errorSync(_errorMutex);
- _error.swap(error);
+ IceUtil::Mutex::Lock errorSync(_errorMutex);
+ _error.swap(error);
}
TraceLevelsPtr traceLevels = _instance->traceLevels();
for(list<SubscriberPtr>::const_iterator p = error.begin(); p != error.end(); ++p)
{
- SubscriberPtr subscriber = *p;
- assert(subscriber->persistent()); // Only persistent subscribers need to be reaped.
-
- bool found = false;
- //
- // If this turns out to be a performance problem then we
- // can create an in memory map cache.
- //
- LinkRecordSeq::iterator q = _topicRecord.begin();
- while(q != _topicRecord.end())
- {
- if(q->obj->ice_getIdentity() == subscriber->id())
- {
- _topicRecord.erase(q);
- updated = true;
- found = true;
- break;
- }
- ++q;
- }
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "reaping " << _instance->communicator()->identityToString(subscriber->id());
- if(!found)
- {
- out << ": failed - not in database";
- }
- }
+ SubscriberPtr subscriber = *p;
+ assert(subscriber->persistent()); // Only persistent subscribers need to be reaped.
+
+ bool found = false;
+ //
+ // If this turns out to be a performance problem then we
+ // can create an in memory map cache.
+ //
+ LinkRecordSeq::iterator q = _topicRecord.begin();
+ while(q != _topicRecord.end())
+ {
+ if(q->obj->ice_getIdentity() == subscriber->id())
+ {
+ _topicRecord.erase(q);
+ updated = true;
+ found = true;
+ break;
+ }
+ ++q;
+ }
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "reaping " << _instance->communicator()->identityToString(subscriber->id());
+ if(!found)
+ {
+ out << ": failed - not in database";
+ }
+ }
}
if(updated)
{
- _topics.put(PersistentTopicMap::value_type(_id, _topicRecord));
+ _topics.put(PersistentTopicMap::value_type(_id, _topicRecord));
}
}
@@ -595,8 +595,8 @@ TopicI::publish(bool forwarded, const EventDataSeq& events)
//
vector<SubscriberPtr> copy;
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
- copy = _subscribers;
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+ copy = _subscribers;
}
//
@@ -608,18 +608,18 @@ TopicI::publish(bool forwarded, const EventDataSeq& events)
list<SubscriberPtr> flush;
for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
{
- Subscriber::QueueState state = (*p)->queue(forwarded, events);
- switch(state)
- {
- case Subscriber::QueueStateError:
- e.push_back((*p)->id());
- break;
- case Subscriber::QueueStateFlush:
- flush.push_back(*p);
- break;
- case Subscriber::QueueStateNoFlush:
- break;
- }
+ Subscriber::QueueState state = (*p)->queue(forwarded, events);
+ switch(state)
+ {
+ case Subscriber::QueueStateError:
+ e.push_back((*p)->id());
+ break;
+ case Subscriber::QueueStateFlush:
+ flush.push_back(*p);
+ break;
+ case Subscriber::QueueStateNoFlush:
+ break;
+ }
}
//
@@ -627,7 +627,7 @@ TopicI::publish(bool forwarded, const EventDataSeq& events)
//
if(!flush.empty())
{
- _instance->subscriberPool()->flush(flush);
+ _instance->subscriberPool()->flush(flush);
}
//
@@ -637,48 +637,48 @@ TopicI::publish(bool forwarded, const EventDataSeq& events)
list<SubscriberPtr> reap;
if(!e.empty())
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
- for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep)
- {
- //
- // Its possible for the subscriber to already have been
- // removed since the copy is iterated over outside of
- // mutex protection.
- //
- // Note that although this could be quicker if we used a
- // map, the most optimal case should be pushing around
- // events not searching for a particular subscriber.
- //
- // The subscriber is immediately destroyed & removed from
- // the _subscribers list. If the subscriber is persistent
- // its added to an list of error'd subscribers and removed
- // from the database on the next reap.
- //
- vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep);
- if(q != _subscribers.end())
- {
- //
- // Destroy the subscriber in any case.
- //
- (*q)->destroy();
- if((*q)->persistent())
- {
- reap.push_back(*q);
- }
- _instance->subscriberPool()->remove(*q);
- _subscribers.erase(q);
- }
- }
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+ for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep)
+ {
+ //
+ // Its possible for the subscriber to already have been
+ // removed since the copy is iterated over outside of
+ // mutex protection.
+ //
+ // Note that although this could be quicker if we used a
+ // map, the most optimal case should be pushing around
+ // events not searching for a particular subscriber.
+ //
+ // The subscriber is immediately destroyed & removed from
+ // the _subscribers list. If the subscriber is persistent
+ // its added to an list of error'd subscribers and removed
+ // from the database on the next reap.
+ //
+ vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep);
+ if(q != _subscribers.end())
+ {
+ //
+ // Destroy the subscriber in any case.
+ //
+ (*q)->destroy();
+ if((*q)->persistent())
+ {
+ reap.push_back(*q);
+ }
+ _instance->subscriberPool()->remove(*q);
+ _subscribers.erase(q);
+ }
+ }
}
if(!reap.empty())
{
- //
- // This is why _error is a list, so we can splice on the
- // reaped subscribers.
- //
- IceUtil::Mutex::Lock errorSync(_errorMutex);
- _error.splice(_error.begin(), reap);
+ //
+ // This is why _error is a list, so we can splice on the
+ // reaped subscribers.
+ //
+ IceUtil::Mutex::Lock errorSync(_errorMutex);
+ _error.splice(_error.begin(), reap);
}
}
@@ -691,10 +691,10 @@ TopicI::removeSubscriber(const Ice::ObjectPrx& obj)
vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
if(p != _subscribers.end())
{
- (*p)->destroy();
- _instance->subscriberPool()->remove(*p);
- _subscribers.erase(p);
- return;
+ (*p)->destroy();
+ _instance->subscriberPool()->remove(*p);
+ _subscribers.erase(p);
+ return;
}
//
@@ -703,7 +703,7 @@ TopicI::removeSubscriber(const Ice::ObjectPrx& obj)
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _instance->communicator()->identityToString(id) << ": not subscribed.";
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _instance->communicator()->identityToString(id) << ": not subscribed.";
}
}
diff --git a/cpp/src/IceStorm/TopicI.h b/cpp/src/IceStorm/TopicI.h
index 2ec61aee3c2..8db61085fa3 100644
--- a/cpp/src/IceStorm/TopicI.h
+++ b/cpp/src/IceStorm/TopicI.h
@@ -29,7 +29,7 @@ class TopicI : public TopicInternal
public:
TopicI(const InstancePtr&, const std::string&, const Ice::Identity&, const LinkRecordSeq&, const std::string&,
- const std::string&);
+ const std::string&);
~TopicI();
virtual std::string getName(const Ice::Current&) const;
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp
index 12c9706ceaa..f8ab5fe80bd 100644
--- a/cpp/src/IceStorm/TopicManagerI.cpp
+++ b/cpp/src/IceStorm/TopicManagerI.cpp
@@ -37,7 +37,7 @@ identityToTopicName(const Ice::Identity& id)
//
if(id.category.empty())
{
- return id.name;
+ return id.name;
}
assert(id.name.length() > 6 && id.name.compare(0, 6, "topic.") == 0);
@@ -63,7 +63,7 @@ TopicManagerI::TopicManagerI(
//
for(PersistentTopicMap::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
{
- installTopic(identityToTopicName(p->first), p->first, p->second, false);
+ installTopic(identityToTopicName(p->first), p->first, p->second, false);
}
}
@@ -80,7 +80,7 @@ TopicManagerI::create(const string& name, const Ice::Current&)
if(_topicIMap.find(name) != _topicIMap.end())
{
TopicExists ex;
- ex.name = name;
+ ex.name = name;
throw ex;
}
@@ -105,9 +105,9 @@ TopicManagerI::retrieve(const string& name, const Ice::Current&) const
TopicIMap::const_iterator p = _topicIMap.find(name);
if(p == _topicIMap.end())
{
- NoSuchTopic ex;
- ex.name = name;
- throw ex;
+ NoSuchTopic ex;
+ ex.name = name;
+ throw ex;
}
// Here we cannot just reconstruct the identity since the
@@ -127,13 +127,13 @@ TopicManagerI::retrieveAll(const Ice::Current&) const
TopicDict all;
for(TopicIMap::const_iterator p = _topicIMap.begin(); p != _topicIMap.end(); ++p)
{
- //
- // Here we cannot just reconstruct the identity since the
- // identity could be either "<instanceName>/topic.<topicname>"
- // name, or if created with pre-3.2 IceStorm "/<topicname>".
- //
+ //
+ // Here we cannot just reconstruct the identity since the
+ // identity could be either "<instanceName>/topic.<topicname>"
+ // name, or if created with pre-3.2 IceStorm "/<topicname>".
+ //
all.insert(TopicDict::value_type(
- p->first, TopicPrx::uncheckedCast(_topicAdapter->createProxy(p->second->id()))));
+ p->first, TopicPrx::uncheckedCast(_topicAdapter->createProxy(p->second->id()))));
}
return all;
@@ -158,8 +158,8 @@ TopicManagerI::reap()
{
if(i->second->destroyed())
{
- Ice::Identity id = i->second->id();
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ Ice::Identity id = i->second->id();
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topicMgr > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
@@ -195,7 +195,7 @@ TopicManagerI::shutdown()
for(TopicIMap::const_iterator p = _topicIMap.begin(); p != _topicIMap.end(); ++p)
{
- p->second->reap();
+ p->second->reap();
}
}
@@ -208,17 +208,17 @@ TopicManagerI::installTopic(const string& name, const Ice::Identity& id, const L
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topicMgr > 0)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
- if(create)
- {
- out << "creating new topic \"" << name << "\". id: "
- << _instance->communicator()->identityToString(id);
- }
- else
- {
- out << "loading topic \"" << name << "\" from database. id: "
- << _instance->communicator()->identityToString(id);
- }
+ Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
+ if(create)
+ {
+ out << "creating new topic \"" << name << "\". id: "
+ << _instance->communicator()->identityToString(id);
+ }
+ else
+ {
+ out << "loading topic \"" << name << "\" from database. id: "
+ << _instance->communicator()->identityToString(id);
+ }
}
//
diff --git a/cpp/src/IceStorm/TopicManagerI.h b/cpp/src/IceStorm/TopicManagerI.h
index 61e6f30e223..ebddf8a9156 100644
--- a/cpp/src/IceStorm/TopicManagerI.h
+++ b/cpp/src/IceStorm/TopicManagerI.h
@@ -38,9 +38,9 @@ class TopicManagerI : public TopicManager, public IceUtil::Mutex
public:
TopicManagerI(const InstancePtr&,
- const Ice::ObjectAdapterPtr&,
- const std::string&,
- const std::string&);
+ const Ice::ObjectAdapterPtr&,
+ const std::string&,
+ const std::string&);
~TopicManagerI();
virtual TopicPrx create(const std::string&, const Ice::Current&);