summaryrefslogtreecommitdiff
path: root/cpp/test/IceStorm/stress/Subscriber.cpp
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2007-01-30 02:03:59 +0000
committerMatthew Newhook <matthew@zeroc.com>2007-01-30 02:03:59 +0000
commit62a0f9e2b11854cb04fd651fd811ef530cde81b4 (patch)
tree49cc426c929ef9e381f52782d7071cf07a7a8b7b /cpp/test/IceStorm/stress/Subscriber.cpp
parentuse thread model consistently (diff)
downloadice-62a0f9e2b11854cb04fd651fd811ef530cde81b4.tar.bz2
ice-62a0f9e2b11854cb04fd651fd811ef530cde81b4.tar.xz
ice-62a0f9e2b11854cb04fd651fd811ef530cde81b4.zip
Added IceStorm stress test.
Diffstat (limited to 'cpp/test/IceStorm/stress/Subscriber.cpp')
-rw-r--r--cpp/test/IceStorm/stress/Subscriber.cpp391
1 files changed, 391 insertions, 0 deletions
diff --git a/cpp/test/IceStorm/stress/Subscriber.cpp b/cpp/test/IceStorm/stress/Subscriber.cpp
new file mode 100644
index 00000000000..81cc980470e
--- /dev/null
+++ b/cpp/test/IceStorm/stress/Subscriber.cpp
@@ -0,0 +1,391 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceUtil/DisableWarnings.h>
+#include <Ice/Ice.h>
+#include <IceStorm/IceStorm.h>
+#include <IceUtil/Options.h>
+#include <IceUtil/Thread.h>
+#include <IceUtil/Time.h>
+#include <Event.h>
+
+#include <TestCommon.h>
+
+#include <fcntl.h>
+#ifdef _WIN32
+# include <io.h>
+#else
+# include <sys/types.h>
+# include <sys/stat.h>
+#endif
+
+using namespace std;
+using namespace Ice;
+using namespace IceStorm;
+using namespace Test;
+
+class EventI : public Event, public IceUtil::Mutex
+{
+public:
+
+ EventI(const CommunicatorPtr& communicator, int total) :
+ _communicator(communicator), _total(total), _count(0)
+ {
+ }
+
+ int count() const
+ {
+ Lock sync(*this);
+ return _count;
+ }
+
+protected:
+
+ const CommunicatorPtr _communicator;
+ const int _total;
+ int _count;
+};
+typedef IceUtil::Handle<EventI> EventIPtr;
+
+class OrderEventI : public EventI
+{
+public:
+
+ OrderEventI(const CommunicatorPtr& communicator, int total) :
+ EventI(communicator, total)
+ {
+ }
+
+ virtual void
+ pub(int counter, const Ice::Current&)
+ {
+ Lock sync(*this);
+
+ if(counter != _count || counter == _total-1)
+ {
+ if(counter != _count)
+ {
+ cerr << "failed! expected event: " << _count << " received event: " << counter << endl;
+ }
+ _communicator->shutdown();
+ }
+ _count++;
+ }
+};
+
+class CountEventI : public EventI
+{
+public:
+
+ CountEventI(const CommunicatorPtr& communicator, int total) :
+ EventI(communicator, total)
+ {
+ }
+
+ virtual void
+ pub(int, const Ice::Current&)
+ {
+ Lock sync(*this);
+
+ if(++_count == _total)
+ {
+ _communicator->shutdown();
+ }
+ }
+};
+
+class SlowEventI : public EventI
+{
+public:
+
+ SlowEventI(const CommunicatorPtr& communicator, int total) :
+ EventI(communicator, total)
+ {
+ }
+
+ virtual void
+ pub(int, const Ice::Current&)
+ {
+ Lock sync(*this);
+
+ //
+ // Ignore events over and above the expected.
+ //
+ if(_count >= _total)
+ {
+ return;
+ }
+ // Sleep for 3 seconds
+ IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(3));
+ if(++_count == _total)
+ {
+ _communicator->shutdown();
+ }
+ }
+};
+
+class ErraticEventI : public EventI
+{
+public:
+
+ ErraticEventI(const CommunicatorPtr& communicator, int total) :
+ EventI(communicator, total), _done(false)
+ {
+ IceUtil::StaticMutex::Lock sync(_remainingMutex);
+ ++_remaining;
+ }
+
+ virtual void
+ pub(int, const Ice::Current& current)
+ {
+ Lock sync(*this);
+
+ // Randomly close the connection.
+ if(!_done && (random() % 10 == 1 || ++_count == _total))
+ {
+ _done = true;
+ current.con->close(true);
+ _count = _total;
+ {
+ IceUtil::StaticMutex::Lock sync(_remainingMutex);
+ --_remaining;
+ if(_remaining == 0)
+ {
+ _communicator->shutdown();
+ }
+ }
+ }
+ }
+
+private:
+
+ static IceUtil::StaticMutex _remainingMutex;
+ static int _remaining;
+ bool _done;
+};
+
+IceUtil::StaticMutex ErraticEventI::_remainingMutex = ICE_STATIC_MUTEX_INITIALIZER;
+int ErraticEventI::_remaining = 0;
+
+void
+usage(const char* appName)
+{
+ cerr << "Usage: " << appName << " [options] [lockfile]\n";
+ // XXX:
+ cerr <<
+ "Options:\n"
+ "-h, --help Show this message.\n"
+ "-b Use batch reliability.\n"
+ ;
+}
+
+struct Subscription
+{
+ Ice::ObjectAdapterPtr adapter;
+ Ice::ObjectPrx obj;
+ EventIPtr servant;
+ IceStorm::QoS qos;
+};
+
+int
+run(int argc, char* argv[], const CommunicatorPtr& communicator)
+{
+ IceUtil::Options opts;
+ opts.addOpt("", "events", IceUtil::Options::NeedArg);
+ opts.addOpt("", "qos", IceUtil::Options::NeedArg, "", IceUtil::Options::Repeat);
+ opts.addOpt("", "slow");
+ opts.addOpt("", "erratic", IceUtil::Options::NeedArg);
+
+ try
+ {
+ opts.parse(argc, argv);
+ }
+ catch(const IceUtil::BadOptException& e)
+ {
+ cerr << argv[0] << ": " << e.reason << endl;
+ return EXIT_FAILURE;
+ }
+
+ int events = 1000;
+ string s = opts.optArg("events");
+ if(!s.empty())
+ {
+ events = atoi(s.c_str());
+ }
+ if(events <= 0)
+ {
+ cerr << argv[0] << ": events must be > 0." << endl;
+ return EXIT_FAILURE;
+ }
+
+ IceStorm::QoS qos;
+
+ vector<string> sqos = opts.argVec("qos");
+ for(vector<string>::const_iterator q = sqos.begin(); q != sqos.end(); ++q)
+ {
+ string::size_type off = q->find(",");
+ if(off == string::npos)
+ {
+ cerr << argv[0] << ": parse error: no , in QoS" << endl;
+ return EXIT_FAILURE;
+ }
+ qos[q->substr(0, off)] = q->substr(off+1);
+ }
+
+ bool slow = opts.isSet("slow");
+ bool erratic = false;
+ int erraticNum = 0;
+ s = opts.optArg("erratic");
+ if(!s.empty())
+ {
+ erratic = true;
+ erraticNum = atoi(s.c_str());
+ }
+ if(events <= 0)
+ {
+ cerr << argv[0] << ": events must be > 0." << endl;
+ return EXIT_FAILURE;
+ }
+
+ PropertiesPtr properties = communicator->getProperties();
+ const char* managerProxyProperty = "IceStorm.TopicManager.Proxy";
+ string managerProxy = properties->getProperty(managerProxyProperty);
+ if(managerProxy.empty())
+ {
+ cerr << argv[0] << ": property `" << managerProxyProperty << "' is not set" << endl;
+ return EXIT_FAILURE;
+ }
+
+ IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(
+ communicator->stringToProxy(managerProxy));
+ if(!manager)
+ {
+ cerr << argv[0] << ": `" << managerProxy << "' is not running" << endl;
+ return EXIT_FAILURE;
+ }
+
+ vector<Subscription> subs;
+
+ ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("SubscriberAdapter", "default");
+
+ EventIPtr servant;
+ if(erratic)
+ {
+ for(int i = 0 ; i< erraticNum; ++i)
+ {
+ ostringstream os;
+ os << "SubscriberAdapter" << i;
+ Subscription item;
+ item.adapter = communicator->createObjectAdapterWithEndpoints(os.str(), "default");
+ item.servant = new ErraticEventI(communicator, events);
+ item.qos["reliability"] = "twoway";
+ subs.push_back(item);
+ }
+ }
+ else if(slow)
+ {
+ servant = new SlowEventI(communicator, events);
+ }
+ else
+ {
+ map<string, string>::const_iterator reliability = qos.find("reliability");
+ if(reliability != qos.end())
+ {
+ if(reliability->second == "twoway ordered")
+ {
+ servant = new OrderEventI(communicator, events);
+ }
+ }
+ if(!servant)
+ {
+ servant = new CountEventI(communicator, events);
+ }
+ }
+
+ //
+ // Activate the servants.
+ //
+ if(subs.empty())
+ {
+ Subscription item;
+ item.adapter = adapter;
+ item.servant = servant;
+ item.qos = qos;
+ subs.push_back(item);
+ }
+
+ TopicPrx topic;
+ try
+ {
+ topic = manager->retrieve("fed1");
+ }
+ catch(const IceStorm::NoSuchTopic& e)
+ {
+ cerr << argv[0] << ": NoSuchTopic: " << e.name << endl;
+ return EXIT_FAILURE;
+ }
+
+ {
+ for(vector<Subscription>::iterator p = subs.begin(); p != subs.end(); ++p)
+ {
+ p->obj = p->adapter->addWithUUID(p->servant);
+ p->adapter->activate();
+ topic->subscribe(qos, p->obj);
+ }
+ }
+
+ communicator->waitForShutdown();
+
+ {
+ for(vector<Subscription>::const_iterator p = subs.begin(); p != subs.end(); ++p)
+ {
+ topic->unsubscribe(p->obj);
+ if(p->servant->count() != events)
+ {
+ cerr << "expected " << events << " events but got " << p->servant->count() << " events." << endl;
+ return EXIT_FAILURE;
+ }
+ }
+ }
+
+ return EXIT_SUCCESS;
+}
+
+int
+main(int argc, char* argv[])
+{
+ int status;
+ CommunicatorPtr communicator;
+
+ try
+ {
+ communicator = initialize(argc, argv);
+ status = run(argc, argv, communicator);
+ }
+ catch(const Exception& ex)
+ {
+ cerr << ex << endl;
+ status = EXIT_FAILURE;
+ }
+
+ if(communicator)
+ {
+ try
+ {
+ communicator->destroy();
+ }
+ catch(const Exception& ex)
+ {
+ cerr << ex << endl;
+ status = EXIT_FAILURE;
+ }
+ }
+
+ return status;
+}