summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/test/IceUtil/condvar/WorkQueue.cpp158
-rwxr-xr-xcpp/test/IceUtil/condvar/run.py6
-rwxr-xr-xscripts/Expect.py26
3 files changed, 59 insertions, 131 deletions
diff --git a/cpp/test/IceUtil/condvar/WorkQueue.cpp b/cpp/test/IceUtil/condvar/WorkQueue.cpp
index c6045b2ccbd..274c3329452 100644
--- a/cpp/test/IceUtil/condvar/WorkQueue.cpp
+++ b/cpp/test/IceUtil/condvar/WorkQueue.cpp
@@ -81,14 +81,7 @@ public:
{
Monitor<Mutex>::Lock lock(*this);
_terminate = true;
- if(_broadcast)
- {
- notifyAll();
- }
- else
- {
- notify();
- }
+ notifyAll();
}
bool
@@ -113,102 +106,56 @@ public:
assert(!_q.empty());
ret = _q.front();
+ if(ret % 100 == 0)
+ {
+ cout << "." << flush;
+ }
_q.pop_front();
return true;
}
-private:
- const bool _broadcast;
- bool _terminate;
- list<int> _q;
-};
-typedef Handle<Queue> QueuePtr;
-
-static IceUtil::StaticMutex coutMutex = ICE_STATIC_MUTEX_INITIALIZER;
-
-class WatchDog : public Thread, public Monitor<Mutex>
-{
-public:
-
- WatchDog(bool verbose) :
- _verbose(verbose), _terminate(false), _touches(0), _timeout(0), _overallTouches(0), _overallTimeout(0)
- {
- }
-
-
- virtual void
- run()
+ int
+ get()
{
- Monitor<Mutex>::Lock sync(*this);
-
- while(true)
+ Monitor<Mutex>::Lock lock(*this);
+ while(_q.empty() && !_terminate)
{
- timedWait(Time::milliSeconds(1000));
- if(!_terminate && _touches == 0)
- {
- cout << _overallTouches << "/" << _overallTimeout
- << ": DEADLOCK DETECTED" << endl;
- abort();
- }
-
- IceUtil::StaticMutex::Lock outputMutex(coutMutex);
- if(_verbose)
- {
- cout << _touches << "(" << _timeout << ") " << flush;
- }
- _overallTouches += _touches;
- _overallTimeout += _timeout;
- _touches = 0;
- _timeout = 0;
- if(_terminate)
- {
- return;
- }
+ wait();
}
- }
- void
- touch(bool timeout)
- {
- Monitor<Mutex>::Lock sync(*this);
- _touches++;
- if(timeout)
+ // We only report the termination sentinel when the queue is
+ // empty.
+ if(_q.empty())
{
- _timeout++;
+ assert(_terminate);
+ return -1;
}
- }
- void
- terminate()
- {
- Monitor<Mutex>::Lock sync(*this);
- _terminate = true;
- notify();
- }
-
- void
- dump()
- {
- cout << _overallTouches << "/" << _overallTimeout;
+ assert(!_q.empty());
+ int ret = _q.front();
+ if(ret % 100 == 0)
+ {
+ cout << "." << flush;
+ }
+ _q.pop_front();
+ return ret;
}
private:
-
- bool _verbose;
+ const bool _broadcast;
bool _terminate;
- int _touches;
- int _timeout;
- long _overallTouches;
- long _overallTimeout;
+ list<int> _q;
};
-typedef Handle<WatchDog> WatchDogPtr;
+typedef Handle<Queue> QueuePtr;
+
+static IceUtil::StaticMutex coutMutex = ICE_STATIC_MUTEX_INITIALIZER;
class TestThread : public Thread
{
public:
- TestThread(const CountDownPtr& cd, const WatchDogPtr& dog, const QueuePtr& q) :
- _cd(cd), _dog(dog), _q(q)
+ TestThread(const CountDownPtr& cd, const QueuePtr& q, bool poll) :
+ _cd(cd), _q(q), _poll(poll)
{
}
virtual void
@@ -218,9 +165,14 @@ public:
while(true)
{
int res = 0;
- // This is a poll.
- bool tout = _q->timedGet(res, Time::milliSeconds(1));
- _dog->touch(!tout);
+ if(_poll)
+ {
+ _q->timedGet(res, Time::milliSeconds(10));
+ }
+ else
+ {
+ res = _q->get();
+ }
if(res == -1)
{
return;
@@ -230,8 +182,8 @@ public:
private:
const CountDownPtr _cd;
- const WatchDogPtr _dog;
const QueuePtr _q;
+ const bool _poll;
};
typedef Handle<TestThread> TestThreadPtr;
@@ -253,9 +205,9 @@ public:
{
while(true)
{
- _q->put(_v);
+ _q->put(_v++);
+ ThreadControl::yield();
}
- ThreadControl::yield();
}
else
{
@@ -302,12 +254,10 @@ main(int argc, char** argv)
int n = atoi(opts.optArg("n").c_str());
bool verbose = opts.isSet("v");
- cout << "running signal/broadcast timeout test... " << flush;
+ cout << "running signal/broadcast timeout test" << flush;
QueuePtr signalQ = new Queue(false);
- WatchDogPtr signalDog = new WatchDog(verbose);
QueuePtr broadcastQ = new Queue(true);
- WatchDogPtr broadcastDog = new WatchDog(verbose);
CountDownPtr cd = new CountDown(210);
list<TestThreadPtr> testThreads;
@@ -316,7 +266,7 @@ main(int argc, char** argv)
for(i = 0; i < 100; i++)
{
- TestThreadPtr p = new TestThread(cd, signalDog, signalQ);
+ TestThreadPtr p = new TestThread(cd, signalQ, i % 2);
p->start();
testThreads.push_back(p);
}
@@ -329,7 +279,7 @@ main(int argc, char** argv)
for(i = 0; i < 100; i++)
{
- TestThreadPtr p = new TestThread(cd, broadcastDog, broadcastQ);
+ TestThreadPtr p = new TestThread(cd, broadcastQ, i % 2);
p->start();
testThreads.push_back(p);
}
@@ -341,9 +291,6 @@ main(int argc, char** argv)
}
cd->waitZero();
- signalDog->start();
- broadcastDog->start();
-
while(!enqThreads.empty())
{
EnqueueThreadPtr p = enqThreads.front();
@@ -361,22 +308,7 @@ main(int argc, char** argv)
p->getThreadControl().join();
}
- if(verbose)
- {
- cout << endl;
- }
- broadcastDog->terminate();
- broadcastDog->getThreadControl().join();
-
- signalDog->terminate();
- signalDog->getThreadControl().join();
-
- cout << "broadcast (";
- broadcastDog->dump();
-
- cout << ") signal (";
- signalDog->dump();
- cout << ") ok" << endl;
+ cout << " ok" << endl;
return 0;
}
diff --git a/cpp/test/IceUtil/condvar/run.py b/cpp/test/IceUtil/condvar/run.py
index 33387d3fa79..50c762bf86b 100755
--- a/cpp/test/IceUtil/condvar/run.py
+++ b/cpp/test/IceUtil/condvar/run.py
@@ -22,19 +22,13 @@ from scripts import *
workqueue = os.path.join(os.getcwd(), "workqueue")
-print "starting workqueue...",
client = TestUtil.spawnClient(workqueue)
-print "ok"
client.waitTestSuccess()
match = os.path.join(os.getcwd(), "match")
-print "starting signal match...",
client = TestUtil.spawnClient(match)
-print "ok"
client.waitTestSuccess()
-print "starting broadcast match...",
client = TestUtil.spawnClient(match + " -b")
-print "ok"
client.waitTestSuccess()
diff --git a/scripts/Expect.py b/scripts/Expect.py
index eec13384150..05a1558234e 100755
--- a/scripts/Expect.py
+++ b/scripts/Expect.py
@@ -101,18 +101,20 @@ class reader(threading.Thread):
def trace(self, c):
if self._trace:
- self._tbuf.write(c)
- if c == '\n':
- content = self._tbuf.getvalue()
- supress = False
- if self._tracesupress:
- for p in self._tracesupress:
- if p.search(content):
- supress = True
- break
- if not supress:
- sys.stdout.write(content)
- self._tbuf.truncate(0)
+ if self._tracesupress:
+ self._tbuf.write(c)
+ if c == '\n':
+ content = self._tbuf.getvalue()
+ supress = False
+ for p in self._tracesupress:
+ if p.search(content):
+ supress = True
+ break
+ if not supress:
+ sys.stdout.write(content)
+ self._tbuf.truncate(0)
+ else:
+ sys.stdout.write(c)
def enabletrace(self, supress = None):
self.cv.acquire()