summaryrefslogtreecommitdiff
path: root/cpp/test/IceStorm/stress/Subscriber.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2016-09-22 16:47:36 +0200
committerBenoit Foucher <benoit@zeroc.com>2016-09-22 16:47:36 +0200
commit7b5454f97092c7a3952d4981bb057f1bd7d61993 (patch)
treed26dadf7fa19ef8fb9ed92851c1cf5cc179fc908 /cpp/test/IceStorm/stress/Subscriber.cpp
parentSlice/unicodePaths fails in Sles12 (diff)
downloadice-7b5454f97092c7a3952d4981bb057f1bd7d61993.tar.bz2
ice-7b5454f97092c7a3952d4981bb057f1bd7d61993.tar.xz
ice-7b5454f97092c7a3952d4981bb057f1bd7d61993.zip
Fixed ICE-7032 - support for limiting subscriber queue size
Diffstat (limited to 'cpp/test/IceStorm/stress/Subscriber.cpp')
-rw-r--r--cpp/test/IceStorm/stress/Subscriber.cpp145
1 files changed, 135 insertions, 10 deletions
diff --git a/cpp/test/IceStorm/stress/Subscriber.cpp b/cpp/test/IceStorm/stress/Subscriber.cpp
index daa63ec5d75..e955bada99a 100644
--- a/cpp/test/IceStorm/stress/Subscriber.cpp
+++ b/cpp/test/IceStorm/stress/Subscriber.cpp
@@ -23,6 +23,8 @@ using namespace Ice;
using namespace IceStorm;
using namespace Test;
+struct Subscription; // Forward declaration.
+
class EventI : public Event, public IceUtil::Mutex
{
public:
@@ -38,6 +40,10 @@ public:
return _count;
}
+ virtual void check(const Subscription&)
+ {
+ }
+
protected:
const CommunicatorPtr _communicator;
@@ -46,6 +52,20 @@ protected:
};
typedef IceUtil::Handle<EventI> EventIPtr;
+struct Subscription
+{
+ Subscription() : activate(true)
+ {
+ }
+
+ Ice::ObjectAdapterPtr adapter;
+ Ice::ObjectPrx obj;
+ EventIPtr servant;
+ IceStorm::QoS qos;
+ Ice::ObjectPrx publisher;
+ bool activate;
+};
+
class OrderEventI : public EventI
{
public:
@@ -170,6 +190,89 @@ private:
IceUtil::Mutex* ErraticEventI::_remainingMutex = 0;
int ErraticEventI::_remaining = 0;
+class MaxQueueEventI : public EventI
+{
+public:
+
+ MaxQueueEventI(const CommunicatorPtr& communicator, int expected, int total, bool removeSubscriber) :
+ EventI(communicator, total), _removeSubscriber(removeSubscriber), _expected(expected)
+ {
+ }
+
+ virtual void
+ pub(int counter, const Ice::Current&)
+ {
+ Lock sync(*this);
+
+ if(counter != _count)
+ {
+ cerr << "failed! expected event: " << _count << " received event: " << counter << endl;
+ }
+
+ if(_removeSubscriber)
+ {
+ _count = _total;
+ _communicator->shutdown();
+ return;
+ }
+
+ if(_count == 0)
+ {
+ _count = _total - _expected;
+ }
+ else if(++_count == _total)
+ {
+ _communicator->shutdown();
+ }
+ }
+
+ virtual void
+ check(const Subscription& subscription)
+ {
+ if(_removeSubscriber)
+ {
+ try
+ {
+ subscription.publisher->ice_ping();
+ test(false);
+ }
+ catch(const Ice::ObjectNotExistException&)
+ {
+ }
+ }
+ }
+
+private:
+
+ bool _removeSubscriber;
+ int _expected;
+};
+
+
+class ControllerEventI: public EventI
+{
+public:
+
+ ControllerEventI(const CommunicatorPtr& communicator, int total, const Ice::ObjectAdapterPtr& adapter) :
+ EventI(communicator, total), _adapter(adapter)
+ {
+ }
+
+ virtual void
+ pub(int, const Ice::Current&)
+ {
+ Lock sync(*this);
+ if(++_count == _total)
+ {
+ _adapter->activate();
+ }
+ }
+
+private:
+
+ const Ice::ObjectAdapterPtr _adapter;
+};
+
namespace
{
@@ -193,14 +296,6 @@ Init init;
}
-struct Subscription
-{
- Ice::ObjectAdapterPtr adapter;
- Ice::ObjectPrx obj;
- EventIPtr servant;
- IceStorm::QoS qos;
-};
-
int
run(int argc, char* argv[], const CommunicatorPtr& communicator)
{
@@ -209,6 +304,8 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
opts.addOpt("", "qos", IceUtilInternal::Options::NeedArg, "", IceUtilInternal::Options::Repeat);
opts.addOpt("", "slow");
opts.addOpt("", "erratic", IceUtilInternal::Options::NeedArg);
+ opts.addOpt("", "maxQueueDropEvents", IceUtilInternal::Options::NeedArg);
+ opts.addOpt("", "maxQueueRemoveSub", IceUtilInternal::Options::NeedArg);
try
{
@@ -247,6 +344,8 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
}
bool slow = opts.isSet("slow");
+ int maxQueueDropEvents = opts.isSet("maxQueueDropEvents") ? atoi(opts.optArg("maxQueueDropEvents").c_str()) : 0;
+ int maxQueueRemoveSub = opts.isSet("maxQueueRemoveSub") ? atoi(opts.optArg("maxQueueRemoveSub").c_str()) : 0;
bool erratic = false;
int erraticNum = 0;
s = opts.optArg("erratic");
@@ -301,6 +400,28 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
item.qos = cmdLineQos;
subs.push_back(item);
}
+ else if(maxQueueDropEvents || maxQueueRemoveSub)
+ {
+ Subscription item1;
+ item1.adapter = communicator->createObjectAdapterWithEndpoints("MaxQueueAdapter", "default");
+ if(maxQueueDropEvents)
+ {
+ item1.servant = new MaxQueueEventI(communicator, maxQueueDropEvents, events, false);
+ }
+ else
+ {
+ item1.servant = new MaxQueueEventI(communicator, maxQueueRemoveSub, events, true);
+ }
+ item1.qos = cmdLineQos;
+ item1.activate = false;
+ subs.push_back(item1);
+
+ Subscription item2;
+ item2.adapter = communicator->createObjectAdapterWithEndpoints("ControllerAdapter", "default");
+ item2.servant = new ControllerEventI(communicator, events, item1.adapter);
+ item2.qos = cmdLineQos;
+ subs.push_back(item2);
+ }
else
{
Subscription item;
@@ -357,14 +478,17 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
{
p->obj = p->obj->ice_oneway();
}
- topic->subscribeAndGetPublisher(qos, p->obj);
+ p->publisher = topic->subscribeAndGetPublisher(qos, p->obj);
}
}
{
for(vector<Subscription>::iterator p = subs.begin(); p != subs.end(); ++p)
{
- p->adapter->activate();
+ if(p->activate)
+ {
+ p->adapter->activate();
+ }
}
}
@@ -373,6 +497,7 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
{
for(vector<Subscription>::const_iterator p = subs.begin(); p != subs.end(); ++p)
{
+ p->servant->check(*p);
topic->unsubscribe(p->obj);
if(p->servant->count() != events)
{