summaryrefslogtreecommitdiff
path: root/cpp/test/IceStorm/stress/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/test/IceStorm/stress/Subscriber.cpp')
-rw-r--r--cpp/test/IceStorm/stress/Subscriber.cpp231
1 files changed, 96 insertions, 135 deletions
diff --git a/cpp/test/IceStorm/stress/Subscriber.cpp b/cpp/test/IceStorm/stress/Subscriber.cpp
index 77d5dd9e029..c7e3cdd3c2b 100644
--- a/cpp/test/IceStorm/stress/Subscriber.cpp
+++ b/cpp/test/IceStorm/stress/Subscriber.cpp
@@ -5,33 +5,29 @@
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
#include <IceUtil/Options.h>
-#include <IceUtil/Thread.h>
-#include <IceUtil/Time.h>
-#include <IceUtil/Random.h>
#include <Event.h>
-#include <IceUtil/Mutex.h>
-#include <IceUtil/MutexPtrLock.h>
#include <TestHelper.h>
+#include <random>
using namespace std;
using namespace Ice;
using namespace IceStorm;
using namespace Test;
-struct Subscription; // Forward declaration.
+struct Subscription;
-class EventI : public Event, public IceUtil::Mutex
+class EventI : public Event
{
public:
- EventI(const CommunicatorPtr& communicator, int total) :
- _communicator(communicator), _total(total), _count(0)
+ EventI(shared_ptr<Communicator> communicator, int total) :
+ _communicator(move(communicator)), _total(total), _count(0)
{
}
- int count() const
+ int count()
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
return _count;
}
@@ -41,41 +37,36 @@ public:
protected:
- const CommunicatorPtr _communicator;
+ shared_ptr<Communicator> _communicator;
const int _total;
int _count;
+ mutex _mutex;
};
-typedef IceUtil::Handle<EventI> EventIPtr;
-struct Subscription
+struct Subscription final
{
- Subscription() : activate(true)
- {
- }
-
- Ice::ObjectAdapterPtr adapter;
- Ice::ObjectPrx obj;
- EventIPtr servant;
+ shared_ptr<Ice::ObjectAdapter> adapter;
+ shared_ptr<Ice::ObjectPrx> obj;
+ shared_ptr<EventI> servant;
IceStorm::QoS qos;
- Ice::ObjectPrx publisher;
- bool activate;
+ shared_ptr<Ice::ObjectPrx> publisher;
+ bool activate = true;
};
-class OrderEventI : public EventI
+class OrderEventI final : public EventI
{
public:
- OrderEventI(const CommunicatorPtr& communicator, int total) :
- EventI(communicator, total)
+ OrderEventI(shared_ptr<Communicator> communicator, int total) :
+ EventI(move(communicator), total)
{
}
- virtual void
- pub(int counter, const Ice::Current&)
+ void
+ pub(int counter, const Ice::Current&) override
{
- Lock sync(*this);
-
- if(counter != _count || counter == _total-1)
+ lock_guard<mutex> lg(_mutex);
+ if(counter != _count || counter == _total - 1)
{
if(counter != _count)
{
@@ -87,20 +78,19 @@ public:
}
};
-class CountEventI : public EventI
+class CountEventI final : public EventI
{
public:
- CountEventI(const CommunicatorPtr& communicator, int total) :
- EventI(communicator, total)
+ CountEventI(shared_ptr<Communicator> communicator, int total) :
+ EventI(move(communicator), total)
{
}
- virtual void
- pub(int, const Ice::Current&)
+ void
+ pub(int, const Ice::Current&) override
{
- Lock sync(*this);
-
+ lock_guard<mutex> lg(_mutex);
if(++_count == _total)
{
_communicator->shutdown();
@@ -108,20 +98,19 @@ public:
}
};
-class SlowEventI : public EventI
+class SlowEventI final : public EventI
{
public:
- SlowEventI(const CommunicatorPtr& communicator, int total) :
- EventI(communicator, total)
+ SlowEventI(shared_ptr<Communicator> communicator, int total) :
+ EventI(move(communicator), total)
{
}
- virtual void
- pub(int, const Ice::Current&)
+ void
+ pub(int, const Ice::Current&) override
{
- Lock sync(*this);
-
+ lock_guard<mutex> lg(_mutex);
//
// Ignore events over and above the expected.
//
@@ -130,7 +119,7 @@ public:
return;
}
// Sleep for 3 seconds
- IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(3));
+ this_thread::sleep_for(3s);
if(++_count == _total)
{
_communicator->shutdown();
@@ -138,35 +127,32 @@ public:
}
};
-class ErraticEventI : public EventI
+class ErraticEventI final : public EventI
{
public:
- ErraticEventI(const CommunicatorPtr& communicator, int total) :
- EventI(communicator, total), _done(false)
+ ErraticEventI(shared_ptr<Communicator> communicator, int total) :
+ EventI(move(communicator), total)
{
- IceUtilInternal::MutexPtrLock<IceUtil::Mutex> sync(_remainingMutex);
++_remaining;
}
- virtual void
- pub(int, const Ice::Current& current)
+ void
+ pub(int, const Ice::Current& current) override
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
// Randomly close the connection.
- if(!_done && (IceUtilInternal::random(10) == 1 || ++_count == _total))
+ if(!_done && (_rd() % 10 == 1 || ++_count == _total))
{
_done = true;
- current.con->close(ICE_SCOPED_ENUM(ConnectionClose, Forcefully));
+ current.con->close(ConnectionClose::Forcefully);
// Deactivate the OA. This ensures that the subscribers
// that have subscribed with oneway QoS will be booted.
current.adapter->deactivate();
_count = _total;
{
- IceUtilInternal::MutexPtrLock<IceUtil::Mutex> sync2(_remainingMutex);
- --_remaining;
- if(_remaining == 0)
+ if(--_remaining == 0)
{
_communicator->shutdown();
}
@@ -174,30 +160,28 @@ public:
}
}
- static IceUtil::Mutex* _remainingMutex;
-
private:
- static int _remaining;
- bool _done;
+ static atomic_int _remaining;
+ bool _done = false;
+ random_device _rd;
};
-IceUtil::Mutex* ErraticEventI::_remainingMutex = 0;
-int ErraticEventI::_remaining = 0;
+atomic_int ErraticEventI::_remaining = 0;
-class MaxQueueEventI : public EventI
+class MaxQueueEventI final : public EventI
{
public:
- MaxQueueEventI(const CommunicatorPtr& communicator, int expected, int total, bool removeSubscriber) :
- EventI(communicator, total), _removeSubscriber(removeSubscriber), _expected(expected)
+ MaxQueueEventI(shared_ptr<Communicator> communicator, int expected, int total, bool removeSubscriber) :
+ EventI(move(communicator), total), _removeSubscriber(removeSubscriber), _expected(expected)
{
}
- virtual void
- pub(int counter, const Ice::Current&)
+ void
+ pub(int counter, const Ice::Current&) override
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
if(counter != _count)
{
@@ -221,8 +205,8 @@ public:
}
}
- virtual void
- check(const Subscription& subscription)
+ void
+ check(const Subscription& subscription) override
{
if(_removeSubscriber)
{
@@ -236,7 +220,7 @@ public:
while(--nRetry > 0)
{
subscription.publisher->ice_ping();
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(200));
+ this_thread::sleep_for(200ms);
}
test(false);
}
@@ -252,19 +236,19 @@ private:
int _expected;
};
-class ControllerEventI: public EventI
+class ControllerEventI final : public EventI
{
public:
- ControllerEventI(const CommunicatorPtr& communicator, int total, const Ice::ObjectAdapterPtr& adapter) :
- EventI(communicator, total), _adapter(adapter)
+ ControllerEventI(shared_ptr<Communicator> communicator, int total, shared_ptr<ObjectAdapter> adapter) :
+ EventI(move(communicator), total), _adapter(move(adapter))
{
}
- virtual void
- pub(int, const Ice::Current&)
+ void
+ pub(int, const Ice::Current&) override
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
if(++_count == _total)
{
_adapter->activate();
@@ -273,37 +257,14 @@ public:
private:
- const Ice::ObjectAdapterPtr _adapter;
-};
-
-namespace
-{
-
-class Init
-{
-public:
-
- Init()
- {
- ErraticEventI::_remainingMutex = new IceUtil::Mutex;
- }
-
- ~Init()
- {
- delete ErraticEventI::_remainingMutex;
- ErraticEventI::_remainingMutex = 0;
- }
+ const shared_ptr<Ice::ObjectAdapter> _adapter;
};
-Init init;
-
-}
-
-class Subscriber : public Test::TestHelper
+class Subscriber final : public Test::TestHelper
{
public:
- void run(int, char**);
+ void run(int, char**) override;
};
void
@@ -345,16 +306,16 @@ Subscriber::run(int argc, char** argv)
IceStorm::QoS cmdLineQos;
vector<string> sqos = opts.argVec("qos");
- for(vector<string>::const_iterator q = sqos.begin(); q != sqos.end(); ++q)
+ for(const auto& q: sqos)
{
- string::size_type off = q->find(",");
+ string::size_type off = q.find(",");
if(off == string::npos)
{
ostringstream os;
os << argv[0] << ": parse error: no , in QoS";
throw invalid_argument(os.str());
}
- cmdLineQos[q->substr(0, off)] = q->substr(off+1);
+ cmdLineQos[q.substr(0, off)] = q.substr(off+1);
}
bool slow = opts.isSet("slow");
@@ -375,8 +336,8 @@ Subscriber::run(int argc, char** argv)
throw invalid_argument(os.str());
}
- PropertiesPtr properties = communicator->getProperties();
- const char* managerProxyProperty = "IceStormAdmin.TopicManager.Default";
+ auto properties = communicator->getProperties();
+ string managerProxyProperty = "IceStormAdmin.TopicManager.Default";
string managerProxy = properties->getProperty(managerProxyProperty);
if(managerProxy.empty())
{
@@ -385,7 +346,7 @@ Subscriber::run(int argc, char** argv)
throw invalid_argument(os.str());
}
- IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(
+ auto manager = checkedCast<IceStorm::TopicManagerPrx>(
communicator->stringToProxy(managerProxy));
if(!manager)
{
@@ -404,7 +365,7 @@ Subscriber::run(int argc, char** argv)
os << "SubscriberAdapter" << i;
Subscription item;
item.adapter = communicator->createObjectAdapterWithEndpoints(os.str(), "default");
- item.servant = new ErraticEventI(communicator.communicator(), events);
+ item.servant = make_shared<ErraticEventI>(communicator.communicator(), events);
item.qos["reliability"] = "twoway";
subs.push_back(item);
}
@@ -413,7 +374,7 @@ Subscriber::run(int argc, char** argv)
{
Subscription item;
item.adapter = communicator->createObjectAdapterWithEndpoints("SubscriberAdapter", "default");
- item.servant = new SlowEventI(communicator.communicator(), events);
+ item.servant = make_shared<SlowEventI>(communicator.communicator(), events);
item.qos = cmdLineQos;
subs.push_back(item);
}
@@ -423,11 +384,11 @@ Subscriber::run(int argc, char** argv)
item1.adapter = communicator->createObjectAdapterWithEndpoints("MaxQueueAdapter", "default");
if(maxQueueDropEvents)
{
- item1.servant = new MaxQueueEventI(communicator.communicator(), maxQueueDropEvents, events, false);
+ item1.servant = make_shared<MaxQueueEventI>(communicator.communicator(), maxQueueDropEvents, events, false);
}
else
{
- item1.servant = new MaxQueueEventI(communicator.communicator(), maxQueueRemoveSub, events, true);
+ item1.servant = make_shared<MaxQueueEventI>(communicator.communicator(), maxQueueRemoveSub, events, true);
}
item1.qos = cmdLineQos;
item1.activate = false;
@@ -435,7 +396,7 @@ Subscriber::run(int argc, char** argv)
Subscription item2;
item2.adapter = communicator->createObjectAdapterWithEndpoints("ControllerAdapter", "default");
- item2.servant = new ControllerEventI(communicator.communicator(), events, item1.adapter);
+ item2.servant = make_shared<ControllerEventI>(communicator.communicator(), events, item1.adapter);
item2.qos["reliability"] = "oneway";
subs.push_back(item2);
}
@@ -444,29 +405,29 @@ Subscriber::run(int argc, char** argv)
Subscription item;
item.adapter = communicator->createObjectAdapterWithEndpoints("SubscriberAdapter", "default");
item.qos = cmdLineQos;
- map<string, string>::const_iterator p = item.qos.find("reliability");
+ auto p = item.qos.find("reliability");
if(p != item.qos.end() && p->second == "ordered")
{
- item.servant = new OrderEventI(communicator.communicator(), events);
+ item.servant = make_shared<OrderEventI>(communicator.communicator(), events);
}
else
{
- item.servant = new CountEventI(communicator.communicator(), events);
+ item.servant = make_shared<CountEventI>(communicator.communicator(), events);
}
subs.push_back(item);
}
- TopicPrx topic = manager->retrieve("fed1");
+ auto topic = manager->retrieve("fed1");
{
- for(vector<Subscription>::iterator p = subs.begin(); p != subs.end(); ++p)
+ for(auto& p: subs)
{
- p->obj = p->adapter->addWithUUID(p->servant);
+ p.obj = p.adapter->addWithUUID(p.servant);
IceStorm::QoS qos;
string reliability = "";
- IceStorm::QoS::const_iterator q = p->qos.find("reliability");
- if(q != p->qos.end())
+ IceStorm::QoS::const_iterator q = p.qos.find("reliability");
+ if(q != p.qos.end())
{
reliability = q->second;
}
@@ -480,22 +441,22 @@ Subscriber::run(int argc, char** argv)
}
else if(reliability == "batch")
{
- p->obj = p->obj->ice_batchOneway();
+ p.obj = p.obj->ice_batchOneway();
}
else //if(reliability == "oneway")
{
- p->obj = p->obj->ice_oneway();
+ p.obj = p.obj->ice_oneway();
}
- p->publisher = topic->subscribeAndGetPublisher(qos, p->obj);
+ p.publisher = topic->subscribeAndGetPublisher(qos, p.obj);
}
}
{
- for(vector<Subscription>::iterator p = subs.begin(); p != subs.end(); ++p)
+ for(const auto& p: subs)
{
- if(p->activate)
+ if(p.activate)
{
- p->adapter->activate();
+ p.adapter->activate();
}
}
}
@@ -503,14 +464,14 @@ Subscriber::run(int argc, char** argv)
communicator->waitForShutdown();
{
- for(vector<Subscription>::const_iterator p = subs.begin(); p != subs.end(); ++p)
+ for(const auto& p: subs)
{
- p->servant->check(*p);
- topic->unsubscribe(p->obj);
- if(p->servant->count() != events)
+ p.servant->check(p);
+ topic->unsubscribe(p.obj);
+ if(p.servant->count() != events)
{
ostringstream os;
- os << "expected " << events << " events but got " << p->servant->count() << " events.";
+ os << "expected " << events << " events but got " << p.servant->count() << " events.";
throw invalid_argument(os.str());
}
}