summaryrefslogtreecommitdiff
path: root/cpp/test/IceStorm/single/Subscriber.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2005-02-28 08:57:05 +0000
committerBenoit Foucher <benoit@zeroc.com>2005-02-28 08:57:05 +0000
commit789e38b1ac0c03a19c5a9a2d74326236d3e0f482 (patch)
treecc9ba6cf6d1e3e383d0f112531881d2cedeefbbd /cpp/test/IceStorm/single/Subscriber.cpp
parentFixed text for new throughput demo. (diff)
downloadice-789e38b1ac0c03a19c5a9a2d74326236d3e0f482.tar.bz2
ice-789e38b1ac0c03a19c5a9a2d74326236d3e0f482.tar.xz
ice-789e38b1ac0c03a19c5a9a2d74326236d3e0f482.zip
Added support for twoway delivery mode.
Diffstat (limited to 'cpp/test/IceStorm/single/Subscriber.cpp')
-rw-r--r--cpp/test/IceStorm/single/Subscriber.cpp90
1 files changed, 76 insertions, 14 deletions
diff --git a/cpp/test/IceStorm/single/Subscriber.cpp b/cpp/test/IceStorm/single/Subscriber.cpp
index 1152bc36a3c..eafd6ced5b5 100644
--- a/cpp/test/IceStorm/single/Subscriber.cpp
+++ b/cpp/test/IceStorm/single/Subscriber.cpp
@@ -10,6 +10,7 @@
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
#include <Single.h>
+#include <TestCommon.h>
#include <fcntl.h>
#ifdef _WIN32
@@ -24,31 +25,60 @@ using namespace Ice;
using namespace IceStorm;
using namespace Test;
-class SingleI : public Single, public IceUtil::Mutex
+class SingleI : public Single, public IceUtil::Monitor<IceUtil::Mutex>
{
public:
- SingleI(const CommunicatorPtr& communicator) :
+ SingleI(const CommunicatorPtr& communicator, const string& name, bool ordered = false) :
_communicator(communicator),
- _count(0)
+ _name(name),
+ _count(0),
+ _ordered(ordered),
+ _last(0)
{
}
- virtual void event(const Current&)
+ virtual void event(int i, const Current&)
{
- IceUtil::Mutex::Lock sync(*this);
+ Lock sync(*this);
- if(++_count == 10)
+ if(_ordered && i != _last)
{
- _communicator->shutdown();
+ cerr << endl << "received unordered event for `" << _name << "': " << i << " " << _last;
+ test(false);
+ }
+ ++_last;
+
+ if(++_count == 1000)
+ {
+ notify();
}
}
+ virtual void
+ waitForEvents()
+ {
+ Lock sync(*this);
+ cout << "testing " << _name << " reliability... " << flush;
+ while(_count < 1000)
+ {
+ if(!timedWait(IceUtil::Time::seconds(10)))
+ {
+ test(false);
+ }
+ }
+ cout << "ok" << endl;
+ }
+
private:
CommunicatorPtr _communicator;
+ const string _name;
int _count;
+ bool _ordered;
+ int _last;
};
+typedef IceUtil::Handle<SingleI> SingleIPtr;
void
createLock(const string& name)
@@ -102,15 +132,11 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
}
ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("SingleAdapter", "default");
- ObjectPtr single = new SingleI(communicator);
- ObjectPrx object = adapter->addWithUUID(single);
- IceStorm::QoS qos;
- //TODO: qos["reliability"] = "batch";
+ TopicPrx topic;
try
{
- TopicPrx topic = manager->retrieve("single");
- topic->subscribe(qos, object);
+ topic = manager->retrieve("single");
}
catch(const IceStorm::NoSuchTopic& e)
{
@@ -118,9 +144,45 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
return EXIT_FAILURE;
}
+ //
+ // Create subscribers with different QoS.
+ //
+ vector<SingleIPtr> subscribers;
+ {
+ subscribers.push_back(new SingleI(communicator, "default"));
+ topic->subscribe(IceStorm::QoS(), adapter->addWithUUID(subscribers.back()));
+ }
+ {
+ subscribers.push_back(new SingleI(communicator, "oneway"));
+ IceStorm::QoS qos;
+ qos["reliability"] = "oneway";
+ topic->subscribe(qos, adapter->addWithUUID(subscribers.back()));
+ }
+ {
+ subscribers.push_back(new SingleI(communicator, "twoway"));
+ IceStorm::QoS qos;
+ qos["reliability"] = "twoway";
+ topic->subscribe(qos, adapter->addWithUUID(subscribers.back()));
+ }
+ {
+ subscribers.push_back(new SingleI(communicator, "batch"));
+ IceStorm::QoS qos;
+ qos["reliability"] = "batch";
+ topic->subscribe(qos, adapter->addWithUUID(subscribers.back()));
+ }
+ {
+ subscribers.push_back(new SingleI(communicator, "twoway ordered", true)); // Ordered
+ IceStorm::QoS qos;
+ qos["reliability"] = "twoway-ordered";
+ topic->subscribe(qos, adapter->addWithUUID(subscribers.back()));
+ }
+
adapter->activate();
- communicator->waitForShutdown();
+ for(vector<SingleIPtr>::const_iterator p = subscribers.begin(); p != subscribers.end(); ++p)
+ {
+ (*p)->waitForEvents();
+ }
deleteLock(lockfile);