summaryrefslogtreecommitdiff
path: root/cpp/test/IceUtil/condvar/WorkQueue.cpp
diff options
context:
space:
mode:
authorJoe George <joe@zeroc.com>2015-03-03 17:30:50 -0500
committerJoe George <joe@zeroc.com>2015-05-12 11:41:55 -0400
commitd35bb9f5c19e34aee31f83d445695a8186ef675e (patch)
treed5324eaf44f5f9776495537c51653f50a66a7237 /cpp/test/IceUtil/condvar/WorkQueue.cpp
downloadice-d35bb9f5c19e34aee31f83d445695a8186ef675e.tar.bz2
ice-d35bb9f5c19e34aee31f83d445695a8186ef675e.tar.xz
ice-d35bb9f5c19e34aee31f83d445695a8186ef675e.zip
Ice 3.4.2 Source Distributionv3.4.2
Diffstat (limited to 'cpp/test/IceUtil/condvar/WorkQueue.cpp')
-rw-r--r--cpp/test/IceUtil/condvar/WorkQueue.cpp310
1 files changed, 310 insertions, 0 deletions
diff --git a/cpp/test/IceUtil/condvar/WorkQueue.cpp b/cpp/test/IceUtil/condvar/WorkQueue.cpp
new file mode 100644
index 00000000000..4fb4cfbddf7
--- /dev/null
+++ b/cpp/test/IceUtil/condvar/WorkQueue.cpp
@@ -0,0 +1,310 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <IceUtil/Thread.h>
+#include <IceUtil/Time.h>
+#include <IceUtil/Monitor.h>
+#include <IceUtil/Options.h>
+
+#include <iostream>
+#include <list>
+
+using namespace std;
+using namespace IceUtil;
+using namespace IceUtilInternal;
+
+class CountDown : public Monitor<Mutex>, public Shared
+{
+public:
+
+ CountDown(int count) : _count(count) { }
+
+ void decrement()
+ {
+ Monitor<Mutex>::Lock lock(*this);
+ assert(_count > 0);
+ --_count;
+ if(_count == 0)
+ {
+ notifyAll();
+ }
+ }
+
+ void waitZero()
+ {
+ Monitor<Mutex>::Lock lock(*this);
+ while(_count > 0)
+ {
+ wait();
+ }
+ }
+
+private:
+
+ int _count;
+};
+typedef Handle<CountDown> CountDownPtr;
+
+class Queue: public Monitor<Mutex>, public Shared
+{
+public:
+
+ Queue(bool broadcast) :
+ _broadcast(broadcast), _terminate(false)
+ {
+ }
+
+ void
+ put(const int& item)
+ {
+ Monitor<Mutex>::Lock lock(*this);
+ _q.push_back(item);
+ if(_broadcast)
+ {
+ notifyAll();
+ }
+ else
+ {
+ notify();
+ }
+ }
+
+ void
+ terminate()
+ {
+ Monitor<Mutex>::Lock lock(*this);
+ _terminate = true;
+ notifyAll();
+ }
+
+ bool
+ timedGet(int& ret, const Time& timeout)
+ {
+ Monitor<Mutex>::Lock lock(*this);
+ if(_q.empty())
+ {
+ timedWait(timeout);
+ }
+
+ // We only report the termination sentinel when the queue is
+ // empty.
+ if(_q.empty())
+ {
+ if(_terminate)
+ {
+ ret = -1;
+ }
+ return false;
+ }
+
+ assert(!_q.empty());
+ ret = _q.front();
+ if(ret % 100 == 0)
+ {
+ cout << "." << flush;
+ }
+ _q.pop_front();
+ return true;
+ }
+
+ int
+ get()
+ {
+ Monitor<Mutex>::Lock lock(*this);
+ while(_q.empty() && !_terminate)
+ {
+ wait();
+ }
+
+ // We only report the termination sentinel when the queue is
+ // empty.
+ if(_q.empty())
+ {
+ assert(_terminate);
+ return -1;
+ }
+
+ assert(!_q.empty());
+ int ret = _q.front();
+ if(ret % 100 == 0)
+ {
+ cout << "." << flush;
+ }
+ _q.pop_front();
+ return ret;
+ }
+
+private:
+ const bool _broadcast;
+ bool _terminate;
+ list<int> _q;
+};
+typedef Handle<Queue> QueuePtr;
+
+class TestThread : public Thread
+{
+public:
+
+ TestThread(const CountDownPtr& cd, const QueuePtr& q, bool poll) :
+ _cd(cd), _q(q), _poll(poll)
+ {
+ }
+ virtual void
+ run()
+ {
+ _cd->decrement();
+ while(true)
+ {
+ int res = 0;
+ if(_poll)
+ {
+ _q->timedGet(res, Time::milliSeconds(10));
+ }
+ else
+ {
+ res = _q->get();
+ }
+ if(res == -1)
+ {
+ return;
+ }
+ }
+ }
+
+private:
+ const CountDownPtr _cd;
+ const QueuePtr _q;
+ const bool _poll;
+};
+typedef Handle<TestThread> TestThreadPtr;
+
+class EnqueueThread : public Thread
+{
+public:
+
+ EnqueueThread(const CountDownPtr& cd, const QueuePtr& q, int v) :
+ _cd(cd), _q(q), _v(v)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ _cd->decrement();
+ // Forever
+ if(_v == 0)
+ {
+ while(true)
+ {
+ _q->put(_v++);
+ ThreadControl::yield();
+ }
+ }
+ else
+ {
+ while(_v > 0)
+ {
+ _q->put(_v);
+ --_v;
+ ThreadControl::yield();
+ }
+ }
+ }
+
+private:
+
+ const CountDownPtr _cd;
+ const QueuePtr _q;
+ int _v;
+};
+typedef Handle<EnqueueThread> EnqueueThreadPtr;
+
+int
+main(int argc, char** argv)
+{
+ Options opts;
+ opts.addOpt("n", "events", Options::NeedArg, "5000");
+ opts.addOpt("v", "verbose");
+ try
+ {
+ opts.parse(argc, (const char**)argv);
+ }
+ catch(const IceUtilInternal::BadOptException& e)
+ {
+ cerr << argv[0] << ": " << e.reason << endl;
+ return EXIT_FAILURE;
+ }
+
+ srand(0);
+
+ //
+ // if n = 0 then we'll enqueue forever. Otherwise the test will
+ // terminate after n events have been enqueued by each enqueue
+ // thread.
+ //
+ int n = atoi(opts.optArg("n").c_str());
+
+ cout << "running signal/broadcast timeout test" << flush;
+ QueuePtr signalQ = new Queue(false);
+
+ QueuePtr broadcastQ = new Queue(true);
+
+ CountDownPtr cd = new CountDown(210);
+ list<TestThreadPtr> testThreads;
+ list<EnqueueThreadPtr> enqThreads;
+ int i;
+
+ for(i = 0; i < 100; i++)
+ {
+ TestThreadPtr p = new TestThread(cd, signalQ, i % 2);
+ p->start();
+ testThreads.push_back(p);
+ }
+ for(i = 0; i < 5; i++)
+ {
+ EnqueueThreadPtr p = new EnqueueThread(cd, signalQ, n);
+ p->start();
+ enqThreads.push_back(p);
+ }
+
+ for(i = 0; i < 100; i++)
+ {
+ TestThreadPtr p = new TestThread(cd, broadcastQ, i % 2);
+ p->start();
+ testThreads.push_back(p);
+ }
+ for(i = 0; i < 5; i++)
+ {
+ EnqueueThreadPtr p = new EnqueueThread(cd, broadcastQ, n);
+ p->start();
+ enqThreads.push_back(p);
+ }
+ cd->waitZero();
+
+ while(!enqThreads.empty())
+ {
+ EnqueueThreadPtr p = enqThreads.front();
+ enqThreads.pop_front();
+ p->getThreadControl().join();
+ }
+
+ signalQ->terminate();
+ broadcastQ->terminate();
+
+ while(!testThreads.empty())
+ {
+ TestThreadPtr p = testThreads.front();
+ testThreads.pop_front();
+ p->getThreadControl().join();
+ }
+
+ cout << " ok" << endl;
+
+ return 0;
+}