diff options
-rw-r--r-- | cpp/test/IceUtil/condvar/WorkQueue.cpp | 158 | ||||
-rwxr-xr-x | cpp/test/IceUtil/condvar/run.py | 6 | ||||
-rwxr-xr-x | scripts/Expect.py | 26 |
3 files changed, 59 insertions, 131 deletions
diff --git a/cpp/test/IceUtil/condvar/WorkQueue.cpp b/cpp/test/IceUtil/condvar/WorkQueue.cpp index c6045b2ccbd..274c3329452 100644 --- a/cpp/test/IceUtil/condvar/WorkQueue.cpp +++ b/cpp/test/IceUtil/condvar/WorkQueue.cpp @@ -81,14 +81,7 @@ public: { Monitor<Mutex>::Lock lock(*this); _terminate = true; - if(_broadcast) - { - notifyAll(); - } - else - { - notify(); - } + notifyAll(); } bool @@ -113,102 +106,56 @@ public: assert(!_q.empty()); ret = _q.front(); + if(ret % 100 == 0) + { + cout << "." << flush; + } _q.pop_front(); return true; } -private: - const bool _broadcast; - bool _terminate; - list<int> _q; -}; -typedef Handle<Queue> QueuePtr; - -static IceUtil::StaticMutex coutMutex = ICE_STATIC_MUTEX_INITIALIZER; - -class WatchDog : public Thread, public Monitor<Mutex> -{ -public: - - WatchDog(bool verbose) : - _verbose(verbose), _terminate(false), _touches(0), _timeout(0), _overallTouches(0), _overallTimeout(0) - { - } - - - virtual void - run() + int + get() { - Monitor<Mutex>::Lock sync(*this); - - while(true) + Monitor<Mutex>::Lock lock(*this); + while(_q.empty() && !_terminate) { - timedWait(Time::milliSeconds(1000)); - if(!_terminate && _touches == 0) - { - cout << _overallTouches << "/" << _overallTimeout - << ": DEADLOCK DETECTED" << endl; - abort(); - } - - IceUtil::StaticMutex::Lock outputMutex(coutMutex); - if(_verbose) - { - cout << _touches << "(" << _timeout << ") " << flush; - } - _overallTouches += _touches; - _overallTimeout += _timeout; - _touches = 0; - _timeout = 0; - if(_terminate) - { - return; - } + wait(); } - } - void - touch(bool timeout) - { - Monitor<Mutex>::Lock sync(*this); - _touches++; - if(timeout) + // We only report the termination sentinel when the queue is + // empty. + if(_q.empty()) { - _timeout++; + assert(_terminate); + return -1; } - } - void - terminate() - { - Monitor<Mutex>::Lock sync(*this); - _terminate = true; - notify(); - } - - void - dump() - { - cout << _overallTouches << "/" << _overallTimeout; + assert(!_q.empty()); + int ret = _q.front(); + if(ret % 100 == 0) + { + cout << "." << flush; + } + _q.pop_front(); + return ret; } private: - - bool _verbose; + const bool _broadcast; bool _terminate; - int _touches; - int _timeout; - long _overallTouches; - long _overallTimeout; + list<int> _q; }; -typedef Handle<WatchDog> WatchDogPtr; +typedef Handle<Queue> QueuePtr; + +static IceUtil::StaticMutex coutMutex = ICE_STATIC_MUTEX_INITIALIZER; class TestThread : public Thread { public: - TestThread(const CountDownPtr& cd, const WatchDogPtr& dog, const QueuePtr& q) : - _cd(cd), _dog(dog), _q(q) + TestThread(const CountDownPtr& cd, const QueuePtr& q, bool poll) : + _cd(cd), _q(q), _poll(poll) { } virtual void @@ -218,9 +165,14 @@ public: while(true) { int res = 0; - // This is a poll. - bool tout = _q->timedGet(res, Time::milliSeconds(1)); - _dog->touch(!tout); + if(_poll) + { + _q->timedGet(res, Time::milliSeconds(10)); + } + else + { + res = _q->get(); + } if(res == -1) { return; @@ -230,8 +182,8 @@ public: private: const CountDownPtr _cd; - const WatchDogPtr _dog; const QueuePtr _q; + const bool _poll; }; typedef Handle<TestThread> TestThreadPtr; @@ -253,9 +205,9 @@ public: { while(true) { - _q->put(_v); + _q->put(_v++); + ThreadControl::yield(); } - ThreadControl::yield(); } else { @@ -302,12 +254,10 @@ main(int argc, char** argv) int n = atoi(opts.optArg("n").c_str()); bool verbose = opts.isSet("v"); - cout << "running signal/broadcast timeout test... " << flush; + cout << "running signal/broadcast timeout test" << flush; QueuePtr signalQ = new Queue(false); - WatchDogPtr signalDog = new WatchDog(verbose); QueuePtr broadcastQ = new Queue(true); - WatchDogPtr broadcastDog = new WatchDog(verbose); CountDownPtr cd = new CountDown(210); list<TestThreadPtr> testThreads; @@ -316,7 +266,7 @@ main(int argc, char** argv) for(i = 0; i < 100; i++) { - TestThreadPtr p = new TestThread(cd, signalDog, signalQ); + TestThreadPtr p = new TestThread(cd, signalQ, i % 2); p->start(); testThreads.push_back(p); } @@ -329,7 +279,7 @@ main(int argc, char** argv) for(i = 0; i < 100; i++) { - TestThreadPtr p = new TestThread(cd, broadcastDog, broadcastQ); + TestThreadPtr p = new TestThread(cd, broadcastQ, i % 2); p->start(); testThreads.push_back(p); } @@ -341,9 +291,6 @@ main(int argc, char** argv) } cd->waitZero(); - signalDog->start(); - broadcastDog->start(); - while(!enqThreads.empty()) { EnqueueThreadPtr p = enqThreads.front(); @@ -361,22 +308,7 @@ main(int argc, char** argv) p->getThreadControl().join(); } - if(verbose) - { - cout << endl; - } - broadcastDog->terminate(); - broadcastDog->getThreadControl().join(); - - signalDog->terminate(); - signalDog->getThreadControl().join(); - - cout << "broadcast ("; - broadcastDog->dump(); - - cout << ") signal ("; - signalDog->dump(); - cout << ") ok" << endl; + cout << " ok" << endl; return 0; } diff --git a/cpp/test/IceUtil/condvar/run.py b/cpp/test/IceUtil/condvar/run.py index 33387d3fa79..50c762bf86b 100755 --- a/cpp/test/IceUtil/condvar/run.py +++ b/cpp/test/IceUtil/condvar/run.py @@ -22,19 +22,13 @@ from scripts import * workqueue = os.path.join(os.getcwd(), "workqueue") -print "starting workqueue...", client = TestUtil.spawnClient(workqueue) -print "ok" client.waitTestSuccess() match = os.path.join(os.getcwd(), "match") -print "starting signal match...", client = TestUtil.spawnClient(match) -print "ok" client.waitTestSuccess() -print "starting broadcast match...", client = TestUtil.spawnClient(match + " -b") -print "ok" client.waitTestSuccess() diff --git a/scripts/Expect.py b/scripts/Expect.py index eec13384150..05a1558234e 100755 --- a/scripts/Expect.py +++ b/scripts/Expect.py @@ -101,18 +101,20 @@ class reader(threading.Thread): def trace(self, c): if self._trace: - self._tbuf.write(c) - if c == '\n': - content = self._tbuf.getvalue() - supress = False - if self._tracesupress: - for p in self._tracesupress: - if p.search(content): - supress = True - break - if not supress: - sys.stdout.write(content) - self._tbuf.truncate(0) + if self._tracesupress: + self._tbuf.write(c) + if c == '\n': + content = self._tbuf.getvalue() + supress = False + for p in self._tracesupress: + if p.search(content): + supress = True + break + if not supress: + sys.stdout.write(content) + self._tbuf.truncate(0) + else: + sys.stdout.write(c) def enabletrace(self, supress = None): self.cv.acquire() |