diff options
author | Benoit Foucher <benoit@zeroc.com> | 2005-02-28 08:57:05 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2005-02-28 08:57:05 +0000 |
commit | 789e38b1ac0c03a19c5a9a2d74326236d3e0f482 (patch) | |
tree | cc9ba6cf6d1e3e383d0f112531881d2cedeefbbd /cpp/test/IceStorm/single/Subscriber.cpp | |
parent | Fixed text for new throughput demo. (diff) | |
download | ice-789e38b1ac0c03a19c5a9a2d74326236d3e0f482.tar.bz2 ice-789e38b1ac0c03a19c5a9a2d74326236d3e0f482.tar.xz ice-789e38b1ac0c03a19c5a9a2d74326236d3e0f482.zip |
Added support for twoway delivery mode.
Diffstat (limited to 'cpp/test/IceStorm/single/Subscriber.cpp')
-rw-r--r-- | cpp/test/IceStorm/single/Subscriber.cpp | 90 |
1 files changed, 76 insertions, 14 deletions
diff --git a/cpp/test/IceStorm/single/Subscriber.cpp b/cpp/test/IceStorm/single/Subscriber.cpp index 1152bc36a3c..eafd6ced5b5 100644 --- a/cpp/test/IceStorm/single/Subscriber.cpp +++ b/cpp/test/IceStorm/single/Subscriber.cpp @@ -10,6 +10,7 @@ #include <Ice/Ice.h> #include <IceStorm/IceStorm.h> #include <Single.h> +#include <TestCommon.h> #include <fcntl.h> #ifdef _WIN32 @@ -24,31 +25,60 @@ using namespace Ice; using namespace IceStorm; using namespace Test; -class SingleI : public Single, public IceUtil::Mutex +class SingleI : public Single, public IceUtil::Monitor<IceUtil::Mutex> { public: - SingleI(const CommunicatorPtr& communicator) : + SingleI(const CommunicatorPtr& communicator, const string& name, bool ordered = false) : _communicator(communicator), - _count(0) + _name(name), + _count(0), + _ordered(ordered), + _last(0) { } - virtual void event(const Current&) + virtual void event(int i, const Current&) { - IceUtil::Mutex::Lock sync(*this); + Lock sync(*this); - if(++_count == 10) + if(_ordered && i != _last) { - _communicator->shutdown(); + cerr << endl << "received unordered event for `" << _name << "': " << i << " " << _last; + test(false); + } + ++_last; + + if(++_count == 1000) + { + notify(); } } + virtual void + waitForEvents() + { + Lock sync(*this); + cout << "testing " << _name << " reliability... " << flush; + while(_count < 1000) + { + if(!timedWait(IceUtil::Time::seconds(10))) + { + test(false); + } + } + cout << "ok" << endl; + } + private: CommunicatorPtr _communicator; + const string _name; int _count; + bool _ordered; + int _last; }; +typedef IceUtil::Handle<SingleI> SingleIPtr; void createLock(const string& name) @@ -102,15 +132,11 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) } ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("SingleAdapter", "default"); - ObjectPtr single = new SingleI(communicator); - ObjectPrx object = adapter->addWithUUID(single); - IceStorm::QoS qos; - //TODO: qos["reliability"] = "batch"; + TopicPrx topic; try { - TopicPrx topic = manager->retrieve("single"); - topic->subscribe(qos, object); + topic = manager->retrieve("single"); } catch(const IceStorm::NoSuchTopic& e) { @@ -118,9 +144,45 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) return EXIT_FAILURE; } + // + // Create subscribers with different QoS. + // + vector<SingleIPtr> subscribers; + { + subscribers.push_back(new SingleI(communicator, "default")); + topic->subscribe(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())); + } + { + subscribers.push_back(new SingleI(communicator, "oneway")); + IceStorm::QoS qos; + qos["reliability"] = "oneway"; + topic->subscribe(qos, adapter->addWithUUID(subscribers.back())); + } + { + subscribers.push_back(new SingleI(communicator, "twoway")); + IceStorm::QoS qos; + qos["reliability"] = "twoway"; + topic->subscribe(qos, adapter->addWithUUID(subscribers.back())); + } + { + subscribers.push_back(new SingleI(communicator, "batch")); + IceStorm::QoS qos; + qos["reliability"] = "batch"; + topic->subscribe(qos, adapter->addWithUUID(subscribers.back())); + } + { + subscribers.push_back(new SingleI(communicator, "twoway ordered", true)); // Ordered + IceStorm::QoS qos; + qos["reliability"] = "twoway-ordered"; + topic->subscribe(qos, adapter->addWithUUID(subscribers.back())); + } + adapter->activate(); - communicator->waitForShutdown(); + for(vector<SingleIPtr>::const_iterator p = subscribers.begin(); p != subscribers.end(); ++p) + { + (*p)->waitForEvents(); + } deleteLock(lockfile); |