diff options
author | Benoit Foucher <benoit@zeroc.com> | 2016-09-22 16:47:36 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2016-09-22 16:47:36 +0200 |
commit | 7b5454f97092c7a3952d4981bb057f1bd7d61993 (patch) | |
tree | d26dadf7fa19ef8fb9ed92851c1cf5cc179fc908 /cpp/test/IceStorm/stress/Subscriber.cpp | |
parent | Slice/unicodePaths fails in Sles12 (diff) | |
download | ice-7b5454f97092c7a3952d4981bb057f1bd7d61993.tar.bz2 ice-7b5454f97092c7a3952d4981bb057f1bd7d61993.tar.xz ice-7b5454f97092c7a3952d4981bb057f1bd7d61993.zip |
Fixed ICE-7032 - support for limiting subscriber queue size
Diffstat (limited to 'cpp/test/IceStorm/stress/Subscriber.cpp')
-rw-r--r-- | cpp/test/IceStorm/stress/Subscriber.cpp | 145 |
1 files changed, 135 insertions, 10 deletions
diff --git a/cpp/test/IceStorm/stress/Subscriber.cpp b/cpp/test/IceStorm/stress/Subscriber.cpp index daa63ec5d75..e955bada99a 100644 --- a/cpp/test/IceStorm/stress/Subscriber.cpp +++ b/cpp/test/IceStorm/stress/Subscriber.cpp @@ -23,6 +23,8 @@ using namespace Ice; using namespace IceStorm; using namespace Test; +struct Subscription; // Forward declaration. + class EventI : public Event, public IceUtil::Mutex { public: @@ -38,6 +40,10 @@ public: return _count; } + virtual void check(const Subscription&) + { + } + protected: const CommunicatorPtr _communicator; @@ -46,6 +52,20 @@ protected: }; typedef IceUtil::Handle<EventI> EventIPtr; +struct Subscription +{ + Subscription() : activate(true) + { + } + + Ice::ObjectAdapterPtr adapter; + Ice::ObjectPrx obj; + EventIPtr servant; + IceStorm::QoS qos; + Ice::ObjectPrx publisher; + bool activate; +}; + class OrderEventI : public EventI { public: @@ -170,6 +190,89 @@ private: IceUtil::Mutex* ErraticEventI::_remainingMutex = 0; int ErraticEventI::_remaining = 0; +class MaxQueueEventI : public EventI +{ +public: + + MaxQueueEventI(const CommunicatorPtr& communicator, int expected, int total, bool removeSubscriber) : + EventI(communicator, total), _removeSubscriber(removeSubscriber), _expected(expected) + { + } + + virtual void + pub(int counter, const Ice::Current&) + { + Lock sync(*this); + + if(counter != _count) + { + cerr << "failed! expected event: " << _count << " received event: " << counter << endl; + } + + if(_removeSubscriber) + { + _count = _total; + _communicator->shutdown(); + return; + } + + if(_count == 0) + { + _count = _total - _expected; + } + else if(++_count == _total) + { + _communicator->shutdown(); + } + } + + virtual void + check(const Subscription& subscription) + { + if(_removeSubscriber) + { + try + { + subscription.publisher->ice_ping(); + test(false); + } + catch(const Ice::ObjectNotExistException&) + { + } + } + } + +private: + + bool _removeSubscriber; + int _expected; +}; + + +class ControllerEventI: public EventI +{ +public: + + ControllerEventI(const CommunicatorPtr& communicator, int total, const Ice::ObjectAdapterPtr& adapter) : + EventI(communicator, total), _adapter(adapter) + { + } + + virtual void + pub(int, const Ice::Current&) + { + Lock sync(*this); + if(++_count == _total) + { + _adapter->activate(); + } + } + +private: + + const Ice::ObjectAdapterPtr _adapter; +}; + namespace { @@ -193,14 +296,6 @@ Init init; } -struct Subscription -{ - Ice::ObjectAdapterPtr adapter; - Ice::ObjectPrx obj; - EventIPtr servant; - IceStorm::QoS qos; -}; - int run(int argc, char* argv[], const CommunicatorPtr& communicator) { @@ -209,6 +304,8 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) opts.addOpt("", "qos", IceUtilInternal::Options::NeedArg, "", IceUtilInternal::Options::Repeat); opts.addOpt("", "slow"); opts.addOpt("", "erratic", IceUtilInternal::Options::NeedArg); + opts.addOpt("", "maxQueueDropEvents", IceUtilInternal::Options::NeedArg); + opts.addOpt("", "maxQueueRemoveSub", IceUtilInternal::Options::NeedArg); try { @@ -247,6 +344,8 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) } bool slow = opts.isSet("slow"); + int maxQueueDropEvents = opts.isSet("maxQueueDropEvents") ? atoi(opts.optArg("maxQueueDropEvents").c_str()) : 0; + int maxQueueRemoveSub = opts.isSet("maxQueueRemoveSub") ? atoi(opts.optArg("maxQueueRemoveSub").c_str()) : 0; bool erratic = false; int erraticNum = 0; s = opts.optArg("erratic"); @@ -301,6 +400,28 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) item.qos = cmdLineQos; subs.push_back(item); } + else if(maxQueueDropEvents || maxQueueRemoveSub) + { + Subscription item1; + item1.adapter = communicator->createObjectAdapterWithEndpoints("MaxQueueAdapter", "default"); + if(maxQueueDropEvents) + { + item1.servant = new MaxQueueEventI(communicator, maxQueueDropEvents, events, false); + } + else + { + item1.servant = new MaxQueueEventI(communicator, maxQueueRemoveSub, events, true); + } + item1.qos = cmdLineQos; + item1.activate = false; + subs.push_back(item1); + + Subscription item2; + item2.adapter = communicator->createObjectAdapterWithEndpoints("ControllerAdapter", "default"); + item2.servant = new ControllerEventI(communicator, events, item1.adapter); + item2.qos = cmdLineQos; + subs.push_back(item2); + } else { Subscription item; @@ -357,14 +478,17 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) { p->obj = p->obj->ice_oneway(); } - topic->subscribeAndGetPublisher(qos, p->obj); + p->publisher = topic->subscribeAndGetPublisher(qos, p->obj); } } { for(vector<Subscription>::iterator p = subs.begin(); p != subs.end(); ++p) { - p->adapter->activate(); + if(p->activate) + { + p->adapter->activate(); + } } } @@ -373,6 +497,7 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) { for(vector<Subscription>::const_iterator p = subs.begin(); p != subs.end(); ++p) { + p->servant->check(*p); topic->unsubscribe(p->obj); if(p->servant->count() != events) { |