diff options
Diffstat (limited to 'cpp/test/IceStorm/stress/Subscriber.cpp')
-rw-r--r-- | cpp/test/IceStorm/stress/Subscriber.cpp | 306 |
1 files changed, 153 insertions, 153 deletions
diff --git a/cpp/test/IceStorm/stress/Subscriber.cpp b/cpp/test/IceStorm/stress/Subscriber.cpp index 0a10931b223..01590e4702f 100644 --- a/cpp/test/IceStorm/stress/Subscriber.cpp +++ b/cpp/test/IceStorm/stress/Subscriber.cpp @@ -28,14 +28,14 @@ class EventI : public Event, public IceUtil::Mutex public: EventI(const CommunicatorPtr& communicator, int total) : - _communicator(communicator), _total(total), _count(0) + _communicator(communicator), _total(total), _count(0) { } int count() const { - Lock sync(*this); - return _count; + Lock sync(*this); + return _count; } protected: @@ -51,24 +51,24 @@ class OrderEventI : public EventI public: OrderEventI(const CommunicatorPtr& communicator, int total) : - EventI(communicator, total) + EventI(communicator, total) { } virtual void pub(int counter, const Ice::Current&) { - Lock sync(*this); - - if(counter != _count || counter == _total-1) - { - if(counter != _count) - { - cerr << "failed! expected event: " << _count << " received event: " << counter << endl; - } - _communicator->shutdown(); - } - _count++; + Lock sync(*this); + + if(counter != _count || counter == _total-1) + { + if(counter != _count) + { + cerr << "failed! expected event: " << _count << " received event: " << counter << endl; + } + _communicator->shutdown(); + } + _count++; } }; @@ -77,19 +77,19 @@ class CountEventI : public EventI public: CountEventI(const CommunicatorPtr& communicator, int total) : - EventI(communicator, total) + EventI(communicator, total) { } virtual void pub(int, const Ice::Current&) { - Lock sync(*this); + Lock sync(*this); - if(++_count == _total) - { - _communicator->shutdown(); - } + if(++_count == _total) + { + _communicator->shutdown(); + } } }; @@ -98,28 +98,28 @@ class SlowEventI : public EventI public: SlowEventI(const CommunicatorPtr& communicator, int total) : - EventI(communicator, total) + EventI(communicator, total) { } virtual void pub(int, const Ice::Current&) { - Lock sync(*this); - - // - // Ignore events over and above the expected. - // - if(_count >= _total) - { - return; - } - // Sleep for 3 seconds - IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(3)); - if(++_count == _total) - { - _communicator->shutdown(); - } + Lock sync(*this); + + // + // Ignore events over and above the expected. + // + if(_count >= _total) + { + return; + } + // Sleep for 3 seconds + IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(3)); + if(++_count == _total) + { + _communicator->shutdown(); + } } }; @@ -128,32 +128,32 @@ class ErraticEventI : public EventI public: ErraticEventI(const CommunicatorPtr& communicator, int total) : - EventI(communicator, total), _done(false) + EventI(communicator, total), _done(false) { - IceUtil::StaticMutex::Lock sync(_remainingMutex); - ++_remaining; + IceUtil::StaticMutex::Lock sync(_remainingMutex); + ++_remaining; } virtual void pub(int, const Ice::Current& current) { - Lock sync(*this); - - // Randomly close the connection. - if(!_done && (IceUtil::random(10) == 1 || ++_count == _total)) - { - _done = true; - current.con->close(true); - _count = _total; - { - IceUtil::StaticMutex::Lock sync(_remainingMutex); - --_remaining; - if(_remaining == 0) - { - _communicator->shutdown(); - } - } - } + Lock sync(*this); + + // Randomly close the connection. + if(!_done && (IceUtil::random(10) == 1 || ++_count == _total)) + { + _done = true; + current.con->close(true); + _count = _total; + { + IceUtil::StaticMutex::Lock sync(_remainingMutex); + --_remaining; + if(_remaining == 0) + { + _communicator->shutdown(); + } + } + } } private: @@ -189,20 +189,20 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) } catch(const IceUtil::BadOptException& e) { - cerr << argv[0] << ": " << e.reason << endl; - return EXIT_FAILURE; + cerr << argv[0] << ": " << e.reason << endl; + return EXIT_FAILURE; } int events = 1000; string s = opts.optArg("events"); if(!s.empty()) { - events = atoi(s.c_str()); + events = atoi(s.c_str()); } if(events <= 0) { - cerr << argv[0] << ": events must be > 0." << endl; - return EXIT_FAILURE; + cerr << argv[0] << ": events must be > 0." << endl; + return EXIT_FAILURE; } IceStorm::QoS qos; @@ -210,13 +210,13 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) vector<string> sqos = opts.argVec("qos"); for(vector<string>::const_iterator q = sqos.begin(); q != sqos.end(); ++q) { - string::size_type off = q->find(","); - if(off == string::npos) - { - cerr << argv[0] << ": parse error: no , in QoS" << endl; - return EXIT_FAILURE; - } - qos[q->substr(0, off)] = q->substr(off+1); + string::size_type off = q->find(","); + if(off == string::npos) + { + cerr << argv[0] << ": parse error: no , in QoS" << endl; + return EXIT_FAILURE; + } + qos[q->substr(0, off)] = q->substr(off+1); } bool slow = opts.isSet("slow"); @@ -225,13 +225,13 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) s = opts.optArg("erratic"); if(!s.empty()) { - erratic = true; - erraticNum = atoi(s.c_str()); + erratic = true; + erraticNum = atoi(s.c_str()); } if(events <= 0) { - cerr << argv[0] << ": events must be > 0." << endl; - return EXIT_FAILURE; + cerr << argv[0] << ": events must be > 0." << endl; + return EXIT_FAILURE; } PropertiesPtr properties = communicator->getProperties(); @@ -239,16 +239,16 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) string managerProxy = properties->getProperty(managerProxyProperty); if(managerProxy.empty()) { - cerr << argv[0] << ": property `" << managerProxyProperty << "' is not set" << endl; - return EXIT_FAILURE; + cerr << argv[0] << ": property `" << managerProxyProperty << "' is not set" << endl; + return EXIT_FAILURE; } IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast( - communicator->stringToProxy(managerProxy)); + communicator->stringToProxy(managerProxy)); if(!manager) { - cerr << argv[0] << ": `" << managerProxy << "' is not running" << endl; - return EXIT_FAILURE; + cerr << argv[0] << ": `" << managerProxy << "' is not running" << endl; + return EXIT_FAILURE; } vector<Subscription> subs; @@ -259,40 +259,40 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) EventIPtr servant; if(erratic) { - for(int i = 0 ; i< erraticNum; ++i) - { - ostringstream os; - os << "SubscriberAdapter" << i; - Subscription item; - item.adapter = communicator->createObjectAdapterWithEndpoints(os.str(), "default"); - item.servant = new ErraticEventI(communicator, events); - item.qos["reliability"] = "twoway"; - subs.push_back(item); - } + for(int i = 0 ; i< erraticNum; ++i) + { + ostringstream os; + os << "SubscriberAdapter" << i; + Subscription item; + item.adapter = communicator->createObjectAdapterWithEndpoints(os.str(), "default"); + item.servant = new ErraticEventI(communicator, events); + item.qos["reliability"] = "twoway"; + subs.push_back(item); + } } else if(slow) { - servant = new SlowEventI(communicator, events); + servant = new SlowEventI(communicator, events); } else { - map<string, string>::iterator p = qos.find("reliability"); - if(p != qos.end()) - { - reliability = p->second; - if(reliability != "ordered") - { - qos.erase(p); - } - } - if(reliability == "ordered") - { - servant = new OrderEventI(communicator, events); - } - else - { - servant = new CountEventI(communicator, events); - } + map<string, string>::iterator p = qos.find("reliability"); + if(p != qos.end()) + { + reliability = p->second; + if(reliability != "ordered") + { + qos.erase(p); + } + } + if(reliability == "ordered") + { + servant = new OrderEventI(communicator, events); + } + else + { + servant = new CountEventI(communicator, events); + } } // @@ -300,11 +300,11 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) // if(subs.empty()) { - Subscription item; - item.adapter = adapter; - item.servant = servant; - item.qos = qos; - subs.push_back(item); + Subscription item; + item.adapter = adapter; + item.servant = servant; + item.qos = qos; + subs.push_back(item); } TopicPrx topic; @@ -314,43 +314,43 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) } catch(const IceStorm::NoSuchTopic& e) { - cerr << argv[0] << ": NoSuchTopic: " << e.name << endl; - return EXIT_FAILURE; + cerr << argv[0] << ": NoSuchTopic: " << e.name << endl; + return EXIT_FAILURE; } { - for(vector<Subscription>::iterator p = subs.begin(); p != subs.end(); ++p) - { - p->obj = p->adapter->addWithUUID(p->servant); - if(reliability == "twoway" || reliability == "ordered") - { - // Do nothing. - } - else if(reliability == "batch") - { - p->obj = p->obj->ice_batchOneway(); - } - else //if(reliability == "oneway") - { - p->obj = p->obj->ice_oneway(); - } - p->adapter->activate(); - topic->subscribeAndGetPublisher(qos, p->obj); - } + for(vector<Subscription>::iterator p = subs.begin(); p != subs.end(); ++p) + { + p->obj = p->adapter->addWithUUID(p->servant); + if(reliability == "twoway" || reliability == "ordered") + { + // Do nothing. + } + else if(reliability == "batch") + { + p->obj = p->obj->ice_batchOneway(); + } + else //if(reliability == "oneway") + { + p->obj = p->obj->ice_oneway(); + } + p->adapter->activate(); + topic->subscribeAndGetPublisher(qos, p->obj); + } } communicator->waitForShutdown(); { - for(vector<Subscription>::const_iterator p = subs.begin(); p != subs.end(); ++p) - { - topic->unsubscribe(p->obj); - if(p->servant->count() != events) - { - cerr << "expected " << events << " events but got " << p->servant->count() << " events." << endl; - return EXIT_FAILURE; - } - } + for(vector<Subscription>::const_iterator p = subs.begin(); p != subs.end(); ++p) + { + topic->unsubscribe(p->obj); + if(p->servant->count() != events) + { + cerr << "expected " << events << " events but got " << p->servant->count() << " events." << endl; + return EXIT_FAILURE; + } + } } return EXIT_SUCCESS; @@ -364,26 +364,26 @@ main(int argc, char* argv[]) try { - communicator = initialize(argc, argv); - status = run(argc, argv, communicator); + communicator = initialize(argc, argv); + status = run(argc, argv, communicator); } catch(const Exception& ex) { - cerr << ex << endl; - status = EXIT_FAILURE; + cerr << ex << endl; + status = EXIT_FAILURE; } if(communicator) { - try - { - communicator->destroy(); - } - catch(const Exception& ex) - { - cerr << ex << endl; - status = EXIT_FAILURE; - } + try + { + communicator->destroy(); + } + catch(const Exception& ex) + { + cerr << ex << endl; + status = EXIT_FAILURE; + } } return status; |