diff options
Diffstat (limited to 'cpp/test/IceStorm/stress/Subscriber.cpp')
-rw-r--r-- | cpp/test/IceStorm/stress/Subscriber.cpp | 231 |
1 files changed, 96 insertions, 135 deletions
diff --git a/cpp/test/IceStorm/stress/Subscriber.cpp b/cpp/test/IceStorm/stress/Subscriber.cpp index 77d5dd9e029..c7e3cdd3c2b 100644 --- a/cpp/test/IceStorm/stress/Subscriber.cpp +++ b/cpp/test/IceStorm/stress/Subscriber.cpp @@ -5,33 +5,29 @@ #include <Ice/Ice.h> #include <IceStorm/IceStorm.h> #include <IceUtil/Options.h> -#include <IceUtil/Thread.h> -#include <IceUtil/Time.h> -#include <IceUtil/Random.h> #include <Event.h> -#include <IceUtil/Mutex.h> -#include <IceUtil/MutexPtrLock.h> #include <TestHelper.h> +#include <random> using namespace std; using namespace Ice; using namespace IceStorm; using namespace Test; -struct Subscription; // Forward declaration. +struct Subscription; -class EventI : public Event, public IceUtil::Mutex +class EventI : public Event { public: - EventI(const CommunicatorPtr& communicator, int total) : - _communicator(communicator), _total(total), _count(0) + EventI(shared_ptr<Communicator> communicator, int total) : + _communicator(move(communicator)), _total(total), _count(0) { } - int count() const + int count() { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); return _count; } @@ -41,41 +37,36 @@ public: protected: - const CommunicatorPtr _communicator; + shared_ptr<Communicator> _communicator; const int _total; int _count; + mutex _mutex; }; -typedef IceUtil::Handle<EventI> EventIPtr; -struct Subscription +struct Subscription final { - Subscription() : activate(true) - { - } - - Ice::ObjectAdapterPtr adapter; - Ice::ObjectPrx obj; - EventIPtr servant; + shared_ptr<Ice::ObjectAdapter> adapter; + shared_ptr<Ice::ObjectPrx> obj; + shared_ptr<EventI> servant; IceStorm::QoS qos; - Ice::ObjectPrx publisher; - bool activate; + shared_ptr<Ice::ObjectPrx> publisher; + bool activate = true; }; -class OrderEventI : public EventI +class OrderEventI final : public EventI { public: - OrderEventI(const CommunicatorPtr& communicator, int total) : - EventI(communicator, total) + OrderEventI(shared_ptr<Communicator> communicator, int total) : + EventI(move(communicator), total) { } - virtual void - pub(int counter, const Ice::Current&) + void + pub(int counter, const Ice::Current&) override { - Lock sync(*this); - - if(counter != _count || counter == _total-1) + lock_guard<mutex> lg(_mutex); + if(counter != _count || counter == _total - 1) { if(counter != _count) { @@ -87,20 +78,19 @@ public: } }; -class CountEventI : public EventI +class CountEventI final : public EventI { public: - CountEventI(const CommunicatorPtr& communicator, int total) : - EventI(communicator, total) + CountEventI(shared_ptr<Communicator> communicator, int total) : + EventI(move(communicator), total) { } - virtual void - pub(int, const Ice::Current&) + void + pub(int, const Ice::Current&) override { - Lock sync(*this); - + lock_guard<mutex> lg(_mutex); if(++_count == _total) { _communicator->shutdown(); @@ -108,20 +98,19 @@ public: } }; -class SlowEventI : public EventI +class SlowEventI final : public EventI { public: - SlowEventI(const CommunicatorPtr& communicator, int total) : - EventI(communicator, total) + SlowEventI(shared_ptr<Communicator> communicator, int total) : + EventI(move(communicator), total) { } - virtual void - pub(int, const Ice::Current&) + void + pub(int, const Ice::Current&) override { - Lock sync(*this); - + lock_guard<mutex> lg(_mutex); // // Ignore events over and above the expected. // @@ -130,7 +119,7 @@ public: return; } // Sleep for 3 seconds - IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(3)); + this_thread::sleep_for(3s); if(++_count == _total) { _communicator->shutdown(); @@ -138,35 +127,32 @@ public: } }; -class ErraticEventI : public EventI +class ErraticEventI final : public EventI { public: - ErraticEventI(const CommunicatorPtr& communicator, int total) : - EventI(communicator, total), _done(false) + ErraticEventI(shared_ptr<Communicator> communicator, int total) : + EventI(move(communicator), total) { - IceUtilInternal::MutexPtrLock<IceUtil::Mutex> sync(_remainingMutex); ++_remaining; } - virtual void - pub(int, const Ice::Current& current) + void + pub(int, const Ice::Current& current) override { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); // Randomly close the connection. - if(!_done && (IceUtilInternal::random(10) == 1 || ++_count == _total)) + if(!_done && (_rd() % 10 == 1 || ++_count == _total)) { _done = true; - current.con->close(ICE_SCOPED_ENUM(ConnectionClose, Forcefully)); + current.con->close(ConnectionClose::Forcefully); // Deactivate the OA. This ensures that the subscribers // that have subscribed with oneway QoS will be booted. current.adapter->deactivate(); _count = _total; { - IceUtilInternal::MutexPtrLock<IceUtil::Mutex> sync2(_remainingMutex); - --_remaining; - if(_remaining == 0) + if(--_remaining == 0) { _communicator->shutdown(); } @@ -174,30 +160,28 @@ public: } } - static IceUtil::Mutex* _remainingMutex; - private: - static int _remaining; - bool _done; + static atomic_int _remaining; + bool _done = false; + random_device _rd; }; -IceUtil::Mutex* ErraticEventI::_remainingMutex = 0; -int ErraticEventI::_remaining = 0; +atomic_int ErraticEventI::_remaining = 0; -class MaxQueueEventI : public EventI +class MaxQueueEventI final : public EventI { public: - MaxQueueEventI(const CommunicatorPtr& communicator, int expected, int total, bool removeSubscriber) : - EventI(communicator, total), _removeSubscriber(removeSubscriber), _expected(expected) + MaxQueueEventI(shared_ptr<Communicator> communicator, int expected, int total, bool removeSubscriber) : + EventI(move(communicator), total), _removeSubscriber(removeSubscriber), _expected(expected) { } - virtual void - pub(int counter, const Ice::Current&) + void + pub(int counter, const Ice::Current&) override { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); if(counter != _count) { @@ -221,8 +205,8 @@ public: } } - virtual void - check(const Subscription& subscription) + void + check(const Subscription& subscription) override { if(_removeSubscriber) { @@ -236,7 +220,7 @@ public: while(--nRetry > 0) { subscription.publisher->ice_ping(); - IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(200)); + this_thread::sleep_for(200ms); } test(false); } @@ -252,19 +236,19 @@ private: int _expected; }; -class ControllerEventI: public EventI +class ControllerEventI final : public EventI { public: - ControllerEventI(const CommunicatorPtr& communicator, int total, const Ice::ObjectAdapterPtr& adapter) : - EventI(communicator, total), _adapter(adapter) + ControllerEventI(shared_ptr<Communicator> communicator, int total, shared_ptr<ObjectAdapter> adapter) : + EventI(move(communicator), total), _adapter(move(adapter)) { } - virtual void - pub(int, const Ice::Current&) + void + pub(int, const Ice::Current&) override { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); if(++_count == _total) { _adapter->activate(); @@ -273,37 +257,14 @@ public: private: - const Ice::ObjectAdapterPtr _adapter; -}; - -namespace -{ - -class Init -{ -public: - - Init() - { - ErraticEventI::_remainingMutex = new IceUtil::Mutex; - } - - ~Init() - { - delete ErraticEventI::_remainingMutex; - ErraticEventI::_remainingMutex = 0; - } + const shared_ptr<Ice::ObjectAdapter> _adapter; }; -Init init; - -} - -class Subscriber : public Test::TestHelper +class Subscriber final : public Test::TestHelper { public: - void run(int, char**); + void run(int, char**) override; }; void @@ -345,16 +306,16 @@ Subscriber::run(int argc, char** argv) IceStorm::QoS cmdLineQos; vector<string> sqos = opts.argVec("qos"); - for(vector<string>::const_iterator q = sqos.begin(); q != sqos.end(); ++q) + for(const auto& q: sqos) { - string::size_type off = q->find(","); + string::size_type off = q.find(","); if(off == string::npos) { ostringstream os; os << argv[0] << ": parse error: no , in QoS"; throw invalid_argument(os.str()); } - cmdLineQos[q->substr(0, off)] = q->substr(off+1); + cmdLineQos[q.substr(0, off)] = q.substr(off+1); } bool slow = opts.isSet("slow"); @@ -375,8 +336,8 @@ Subscriber::run(int argc, char** argv) throw invalid_argument(os.str()); } - PropertiesPtr properties = communicator->getProperties(); - const char* managerProxyProperty = "IceStormAdmin.TopicManager.Default"; + auto properties = communicator->getProperties(); + string managerProxyProperty = "IceStormAdmin.TopicManager.Default"; string managerProxy = properties->getProperty(managerProxyProperty); if(managerProxy.empty()) { @@ -385,7 +346,7 @@ Subscriber::run(int argc, char** argv) throw invalid_argument(os.str()); } - IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast( + auto manager = checkedCast<IceStorm::TopicManagerPrx>( communicator->stringToProxy(managerProxy)); if(!manager) { @@ -404,7 +365,7 @@ Subscriber::run(int argc, char** argv) os << "SubscriberAdapter" << i; Subscription item; item.adapter = communicator->createObjectAdapterWithEndpoints(os.str(), "default"); - item.servant = new ErraticEventI(communicator.communicator(), events); + item.servant = make_shared<ErraticEventI>(communicator.communicator(), events); item.qos["reliability"] = "twoway"; subs.push_back(item); } @@ -413,7 +374,7 @@ Subscriber::run(int argc, char** argv) { Subscription item; item.adapter = communicator->createObjectAdapterWithEndpoints("SubscriberAdapter", "default"); - item.servant = new SlowEventI(communicator.communicator(), events); + item.servant = make_shared<SlowEventI>(communicator.communicator(), events); item.qos = cmdLineQos; subs.push_back(item); } @@ -423,11 +384,11 @@ Subscriber::run(int argc, char** argv) item1.adapter = communicator->createObjectAdapterWithEndpoints("MaxQueueAdapter", "default"); if(maxQueueDropEvents) { - item1.servant = new MaxQueueEventI(communicator.communicator(), maxQueueDropEvents, events, false); + item1.servant = make_shared<MaxQueueEventI>(communicator.communicator(), maxQueueDropEvents, events, false); } else { - item1.servant = new MaxQueueEventI(communicator.communicator(), maxQueueRemoveSub, events, true); + item1.servant = make_shared<MaxQueueEventI>(communicator.communicator(), maxQueueRemoveSub, events, true); } item1.qos = cmdLineQos; item1.activate = false; @@ -435,7 +396,7 @@ Subscriber::run(int argc, char** argv) Subscription item2; item2.adapter = communicator->createObjectAdapterWithEndpoints("ControllerAdapter", "default"); - item2.servant = new ControllerEventI(communicator.communicator(), events, item1.adapter); + item2.servant = make_shared<ControllerEventI>(communicator.communicator(), events, item1.adapter); item2.qos["reliability"] = "oneway"; subs.push_back(item2); } @@ -444,29 +405,29 @@ Subscriber::run(int argc, char** argv) Subscription item; item.adapter = communicator->createObjectAdapterWithEndpoints("SubscriberAdapter", "default"); item.qos = cmdLineQos; - map<string, string>::const_iterator p = item.qos.find("reliability"); + auto p = item.qos.find("reliability"); if(p != item.qos.end() && p->second == "ordered") { - item.servant = new OrderEventI(communicator.communicator(), events); + item.servant = make_shared<OrderEventI>(communicator.communicator(), events); } else { - item.servant = new CountEventI(communicator.communicator(), events); + item.servant = make_shared<CountEventI>(communicator.communicator(), events); } subs.push_back(item); } - TopicPrx topic = manager->retrieve("fed1"); + auto topic = manager->retrieve("fed1"); { - for(vector<Subscription>::iterator p = subs.begin(); p != subs.end(); ++p) + for(auto& p: subs) { - p->obj = p->adapter->addWithUUID(p->servant); + p.obj = p.adapter->addWithUUID(p.servant); IceStorm::QoS qos; string reliability = ""; - IceStorm::QoS::const_iterator q = p->qos.find("reliability"); - if(q != p->qos.end()) + IceStorm::QoS::const_iterator q = p.qos.find("reliability"); + if(q != p.qos.end()) { reliability = q->second; } @@ -480,22 +441,22 @@ Subscriber::run(int argc, char** argv) } else if(reliability == "batch") { - p->obj = p->obj->ice_batchOneway(); + p.obj = p.obj->ice_batchOneway(); } else //if(reliability == "oneway") { - p->obj = p->obj->ice_oneway(); + p.obj = p.obj->ice_oneway(); } - p->publisher = topic->subscribeAndGetPublisher(qos, p->obj); + p.publisher = topic->subscribeAndGetPublisher(qos, p.obj); } } { - for(vector<Subscription>::iterator p = subs.begin(); p != subs.end(); ++p) + for(const auto& p: subs) { - if(p->activate) + if(p.activate) { - p->adapter->activate(); + p.adapter->activate(); } } } @@ -503,14 +464,14 @@ Subscriber::run(int argc, char** argv) communicator->waitForShutdown(); { - for(vector<Subscription>::const_iterator p = subs.begin(); p != subs.end(); ++p) + for(const auto& p: subs) { - p->servant->check(*p); - topic->unsubscribe(p->obj); - if(p->servant->count() != events) + p.servant->check(p); + topic->unsubscribe(p.obj); + if(p.servant->count() != events) { ostringstream os; - os << "expected " << events << " events but got " << p->servant->count() << " events."; + os << "expected " << events << " events but got " << p.servant->count() << " events."; throw invalid_argument(os.str()); } } |