summaryrefslogtreecommitdiff
path: root/cpp/test/IceStorm/stress/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/test/IceStorm/stress/Subscriber.cpp')
-rw-r--r--cpp/test/IceStorm/stress/Subscriber.cpp306
1 files changed, 153 insertions, 153 deletions
diff --git a/cpp/test/IceStorm/stress/Subscriber.cpp b/cpp/test/IceStorm/stress/Subscriber.cpp
index 0a10931b223..01590e4702f 100644
--- a/cpp/test/IceStorm/stress/Subscriber.cpp
+++ b/cpp/test/IceStorm/stress/Subscriber.cpp
@@ -28,14 +28,14 @@ class EventI : public Event, public IceUtil::Mutex
public:
EventI(const CommunicatorPtr& communicator, int total) :
- _communicator(communicator), _total(total), _count(0)
+ _communicator(communicator), _total(total), _count(0)
{
}
int count() const
{
- Lock sync(*this);
- return _count;
+ Lock sync(*this);
+ return _count;
}
protected:
@@ -51,24 +51,24 @@ class OrderEventI : public EventI
public:
OrderEventI(const CommunicatorPtr& communicator, int total) :
- EventI(communicator, total)
+ EventI(communicator, total)
{
}
virtual void
pub(int counter, const Ice::Current&)
{
- Lock sync(*this);
-
- if(counter != _count || counter == _total-1)
- {
- if(counter != _count)
- {
- cerr << "failed! expected event: " << _count << " received event: " << counter << endl;
- }
- _communicator->shutdown();
- }
- _count++;
+ Lock sync(*this);
+
+ if(counter != _count || counter == _total-1)
+ {
+ if(counter != _count)
+ {
+ cerr << "failed! expected event: " << _count << " received event: " << counter << endl;
+ }
+ _communicator->shutdown();
+ }
+ _count++;
}
};
@@ -77,19 +77,19 @@ class CountEventI : public EventI
public:
CountEventI(const CommunicatorPtr& communicator, int total) :
- EventI(communicator, total)
+ EventI(communicator, total)
{
}
virtual void
pub(int, const Ice::Current&)
{
- Lock sync(*this);
+ Lock sync(*this);
- if(++_count == _total)
- {
- _communicator->shutdown();
- }
+ if(++_count == _total)
+ {
+ _communicator->shutdown();
+ }
}
};
@@ -98,28 +98,28 @@ class SlowEventI : public EventI
public:
SlowEventI(const CommunicatorPtr& communicator, int total) :
- EventI(communicator, total)
+ EventI(communicator, total)
{
}
virtual void
pub(int, const Ice::Current&)
{
- Lock sync(*this);
-
- //
- // Ignore events over and above the expected.
- //
- if(_count >= _total)
- {
- return;
- }
- // Sleep for 3 seconds
- IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(3));
- if(++_count == _total)
- {
- _communicator->shutdown();
- }
+ Lock sync(*this);
+
+ //
+ // Ignore events over and above the expected.
+ //
+ if(_count >= _total)
+ {
+ return;
+ }
+ // Sleep for 3 seconds
+ IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(3));
+ if(++_count == _total)
+ {
+ _communicator->shutdown();
+ }
}
};
@@ -128,32 +128,32 @@ class ErraticEventI : public EventI
public:
ErraticEventI(const CommunicatorPtr& communicator, int total) :
- EventI(communicator, total), _done(false)
+ EventI(communicator, total), _done(false)
{
- IceUtil::StaticMutex::Lock sync(_remainingMutex);
- ++_remaining;
+ IceUtil::StaticMutex::Lock sync(_remainingMutex);
+ ++_remaining;
}
virtual void
pub(int, const Ice::Current& current)
{
- Lock sync(*this);
-
- // Randomly close the connection.
- if(!_done && (IceUtil::random(10) == 1 || ++_count == _total))
- {
- _done = true;
- current.con->close(true);
- _count = _total;
- {
- IceUtil::StaticMutex::Lock sync(_remainingMutex);
- --_remaining;
- if(_remaining == 0)
- {
- _communicator->shutdown();
- }
- }
- }
+ Lock sync(*this);
+
+ // Randomly close the connection.
+ if(!_done && (IceUtil::random(10) == 1 || ++_count == _total))
+ {
+ _done = true;
+ current.con->close(true);
+ _count = _total;
+ {
+ IceUtil::StaticMutex::Lock sync(_remainingMutex);
+ --_remaining;
+ if(_remaining == 0)
+ {
+ _communicator->shutdown();
+ }
+ }
+ }
}
private:
@@ -189,20 +189,20 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
}
catch(const IceUtil::BadOptException& e)
{
- cerr << argv[0] << ": " << e.reason << endl;
- return EXIT_FAILURE;
+ cerr << argv[0] << ": " << e.reason << endl;
+ return EXIT_FAILURE;
}
int events = 1000;
string s = opts.optArg("events");
if(!s.empty())
{
- events = atoi(s.c_str());
+ events = atoi(s.c_str());
}
if(events <= 0)
{
- cerr << argv[0] << ": events must be > 0." << endl;
- return EXIT_FAILURE;
+ cerr << argv[0] << ": events must be > 0." << endl;
+ return EXIT_FAILURE;
}
IceStorm::QoS qos;
@@ -210,13 +210,13 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
vector<string> sqos = opts.argVec("qos");
for(vector<string>::const_iterator q = sqos.begin(); q != sqos.end(); ++q)
{
- string::size_type off = q->find(",");
- if(off == string::npos)
- {
- cerr << argv[0] << ": parse error: no , in QoS" << endl;
- return EXIT_FAILURE;
- }
- qos[q->substr(0, off)] = q->substr(off+1);
+ string::size_type off = q->find(",");
+ if(off == string::npos)
+ {
+ cerr << argv[0] << ": parse error: no , in QoS" << endl;
+ return EXIT_FAILURE;
+ }
+ qos[q->substr(0, off)] = q->substr(off+1);
}
bool slow = opts.isSet("slow");
@@ -225,13 +225,13 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
s = opts.optArg("erratic");
if(!s.empty())
{
- erratic = true;
- erraticNum = atoi(s.c_str());
+ erratic = true;
+ erraticNum = atoi(s.c_str());
}
if(events <= 0)
{
- cerr << argv[0] << ": events must be > 0." << endl;
- return EXIT_FAILURE;
+ cerr << argv[0] << ": events must be > 0." << endl;
+ return EXIT_FAILURE;
}
PropertiesPtr properties = communicator->getProperties();
@@ -239,16 +239,16 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
string managerProxy = properties->getProperty(managerProxyProperty);
if(managerProxy.empty())
{
- cerr << argv[0] << ": property `" << managerProxyProperty << "' is not set" << endl;
- return EXIT_FAILURE;
+ cerr << argv[0] << ": property `" << managerProxyProperty << "' is not set" << endl;
+ return EXIT_FAILURE;
}
IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(
- communicator->stringToProxy(managerProxy));
+ communicator->stringToProxy(managerProxy));
if(!manager)
{
- cerr << argv[0] << ": `" << managerProxy << "' is not running" << endl;
- return EXIT_FAILURE;
+ cerr << argv[0] << ": `" << managerProxy << "' is not running" << endl;
+ return EXIT_FAILURE;
}
vector<Subscription> subs;
@@ -259,40 +259,40 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
EventIPtr servant;
if(erratic)
{
- for(int i = 0 ; i< erraticNum; ++i)
- {
- ostringstream os;
- os << "SubscriberAdapter" << i;
- Subscription item;
- item.adapter = communicator->createObjectAdapterWithEndpoints(os.str(), "default");
- item.servant = new ErraticEventI(communicator, events);
- item.qos["reliability"] = "twoway";
- subs.push_back(item);
- }
+ for(int i = 0 ; i< erraticNum; ++i)
+ {
+ ostringstream os;
+ os << "SubscriberAdapter" << i;
+ Subscription item;
+ item.adapter = communicator->createObjectAdapterWithEndpoints(os.str(), "default");
+ item.servant = new ErraticEventI(communicator, events);
+ item.qos["reliability"] = "twoway";
+ subs.push_back(item);
+ }
}
else if(slow)
{
- servant = new SlowEventI(communicator, events);
+ servant = new SlowEventI(communicator, events);
}
else
{
- map<string, string>::iterator p = qos.find("reliability");
- if(p != qos.end())
- {
- reliability = p->second;
- if(reliability != "ordered")
- {
- qos.erase(p);
- }
- }
- if(reliability == "ordered")
- {
- servant = new OrderEventI(communicator, events);
- }
- else
- {
- servant = new CountEventI(communicator, events);
- }
+ map<string, string>::iterator p = qos.find("reliability");
+ if(p != qos.end())
+ {
+ reliability = p->second;
+ if(reliability != "ordered")
+ {
+ qos.erase(p);
+ }
+ }
+ if(reliability == "ordered")
+ {
+ servant = new OrderEventI(communicator, events);
+ }
+ else
+ {
+ servant = new CountEventI(communicator, events);
+ }
}
//
@@ -300,11 +300,11 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
//
if(subs.empty())
{
- Subscription item;
- item.adapter = adapter;
- item.servant = servant;
- item.qos = qos;
- subs.push_back(item);
+ Subscription item;
+ item.adapter = adapter;
+ item.servant = servant;
+ item.qos = qos;
+ subs.push_back(item);
}
TopicPrx topic;
@@ -314,43 +314,43 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
}
catch(const IceStorm::NoSuchTopic& e)
{
- cerr << argv[0] << ": NoSuchTopic: " << e.name << endl;
- return EXIT_FAILURE;
+ cerr << argv[0] << ": NoSuchTopic: " << e.name << endl;
+ return EXIT_FAILURE;
}
{
- for(vector<Subscription>::iterator p = subs.begin(); p != subs.end(); ++p)
- {
- p->obj = p->adapter->addWithUUID(p->servant);
- if(reliability == "twoway" || reliability == "ordered")
- {
- // Do nothing.
- }
- else if(reliability == "batch")
- {
- p->obj = p->obj->ice_batchOneway();
- }
- else //if(reliability == "oneway")
- {
- p->obj = p->obj->ice_oneway();
- }
- p->adapter->activate();
- topic->subscribeAndGetPublisher(qos, p->obj);
- }
+ for(vector<Subscription>::iterator p = subs.begin(); p != subs.end(); ++p)
+ {
+ p->obj = p->adapter->addWithUUID(p->servant);
+ if(reliability == "twoway" || reliability == "ordered")
+ {
+ // Do nothing.
+ }
+ else if(reliability == "batch")
+ {
+ p->obj = p->obj->ice_batchOneway();
+ }
+ else //if(reliability == "oneway")
+ {
+ p->obj = p->obj->ice_oneway();
+ }
+ p->adapter->activate();
+ topic->subscribeAndGetPublisher(qos, p->obj);
+ }
}
communicator->waitForShutdown();
{
- for(vector<Subscription>::const_iterator p = subs.begin(); p != subs.end(); ++p)
- {
- topic->unsubscribe(p->obj);
- if(p->servant->count() != events)
- {
- cerr << "expected " << events << " events but got " << p->servant->count() << " events." << endl;
- return EXIT_FAILURE;
- }
- }
+ for(vector<Subscription>::const_iterator p = subs.begin(); p != subs.end(); ++p)
+ {
+ topic->unsubscribe(p->obj);
+ if(p->servant->count() != events)
+ {
+ cerr << "expected " << events << " events but got " << p->servant->count() << " events." << endl;
+ return EXIT_FAILURE;
+ }
+ }
}
return EXIT_SUCCESS;
@@ -364,26 +364,26 @@ main(int argc, char* argv[])
try
{
- communicator = initialize(argc, argv);
- status = run(argc, argv, communicator);
+ communicator = initialize(argc, argv);
+ status = run(argc, argv, communicator);
}
catch(const Exception& ex)
{
- cerr << ex << endl;
- status = EXIT_FAILURE;
+ cerr << ex << endl;
+ status = EXIT_FAILURE;
}
if(communicator)
{
- try
- {
- communicator->destroy();
- }
- catch(const Exception& ex)
- {
- cerr << ex << endl;
- status = EXIT_FAILURE;
- }
+ try
+ {
+ communicator->destroy();
+ }
+ catch(const Exception& ex)
+ {
+ cerr << ex << endl;
+ status = EXIT_FAILURE;
+ }
}
return status;