summaryrefslogtreecommitdiff
path: root/cpp/test/IceUtil/condvar/WorkQueue.cpp
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2009-01-09 11:49:20 -0330
committerMatthew Newhook <matthew@zeroc.com>2009-01-09 11:49:20 -0330
commit9117e9040c02465cb9f0a1e0bcc6aa963f71c61a (patch)
treeef345adb5612327136e70c8207182e015dfc6349 /cpp/test/IceUtil/condvar/WorkQueue.cpp
parentFixed NRVO demo depend file (diff)
parenthttp://bugzilla.zeroc.com/bugzilla/show_bug.cgi?id=3553. database demo uses w... (diff)
downloadice-9117e9040c02465cb9f0a1e0bcc6aa963f71c61a.tar.bz2
ice-9117e9040c02465cb9f0a1e0bcc6aa963f71c61a.tar.xz
ice-9117e9040c02465cb9f0a1e0bcc6aa963f71c61a.zip
Merge commit 'origin/R3_3_branch'
Conflicts: cs/demo/WCF/latency/Client.cs cs/demo/WCF/latency_m/Client.cs cs/demo/WCF/throughput/Client.cs cs/demo/WCF/throughput_m/Client.cs cs/demo/WCF/throughput_m/Service.cs java/demo/Database/library/BookI.java java/demo/Database/library/BookQueryResultI.java java/demo/Database/library/Client.java java/demo/Database/library/ConnectionPool.java java/demo/Database/library/DispatchInterceptorI.java java/demo/Database/library/Glacier2Session.ice java/demo/Database/library/Glacier2SessionManagerI.java java/demo/Database/library/Grammar.java java/demo/Database/library/Library.ice java/demo/Database/library/LibraryI.java java/demo/Database/library/Parser.java java/demo/Database/library/ReapThread.java java/demo/Database/library/RunParser.java java/demo/Database/library/SQLRequestContext.java java/demo/Database/library/Scanner.java java/demo/Database/library/Server.java java/demo/Database/library/Session.ice java/demo/Database/library/SessionFactoryI.java java/demo/Database/library/SessionI.java java/demo/Database/library/Token.java java/demo/Database/library/build.xml java/demo/Database/library/config.client
Diffstat (limited to 'cpp/test/IceUtil/condvar/WorkQueue.cpp')
-rw-r--r--cpp/test/IceUtil/condvar/WorkQueue.cpp158
1 files changed, 45 insertions, 113 deletions
diff --git a/cpp/test/IceUtil/condvar/WorkQueue.cpp b/cpp/test/IceUtil/condvar/WorkQueue.cpp
index c6045b2ccbd..274c3329452 100644
--- a/cpp/test/IceUtil/condvar/WorkQueue.cpp
+++ b/cpp/test/IceUtil/condvar/WorkQueue.cpp
@@ -81,14 +81,7 @@ public:
{
Monitor<Mutex>::Lock lock(*this);
_terminate = true;
- if(_broadcast)
- {
- notifyAll();
- }
- else
- {
- notify();
- }
+ notifyAll();
}
bool
@@ -113,102 +106,56 @@ public:
assert(!_q.empty());
ret = _q.front();
+ if(ret % 100 == 0)
+ {
+ cout << "." << flush;
+ }
_q.pop_front();
return true;
}
-private:
- const bool _broadcast;
- bool _terminate;
- list<int> _q;
-};
-typedef Handle<Queue> QueuePtr;
-
-static IceUtil::StaticMutex coutMutex = ICE_STATIC_MUTEX_INITIALIZER;
-
-class WatchDog : public Thread, public Monitor<Mutex>
-{
-public:
-
- WatchDog(bool verbose) :
- _verbose(verbose), _terminate(false), _touches(0), _timeout(0), _overallTouches(0), _overallTimeout(0)
- {
- }
-
-
- virtual void
- run()
+ int
+ get()
{
- Monitor<Mutex>::Lock sync(*this);
-
- while(true)
+ Monitor<Mutex>::Lock lock(*this);
+ while(_q.empty() && !_terminate)
{
- timedWait(Time::milliSeconds(1000));
- if(!_terminate && _touches == 0)
- {
- cout << _overallTouches << "/" << _overallTimeout
- << ": DEADLOCK DETECTED" << endl;
- abort();
- }
-
- IceUtil::StaticMutex::Lock outputMutex(coutMutex);
- if(_verbose)
- {
- cout << _touches << "(" << _timeout << ") " << flush;
- }
- _overallTouches += _touches;
- _overallTimeout += _timeout;
- _touches = 0;
- _timeout = 0;
- if(_terminate)
- {
- return;
- }
+ wait();
}
- }
- void
- touch(bool timeout)
- {
- Monitor<Mutex>::Lock sync(*this);
- _touches++;
- if(timeout)
+ // We only report the termination sentinel when the queue is
+ // empty.
+ if(_q.empty())
{
- _timeout++;
+ assert(_terminate);
+ return -1;
}
- }
- void
- terminate()
- {
- Monitor<Mutex>::Lock sync(*this);
- _terminate = true;
- notify();
- }
-
- void
- dump()
- {
- cout << _overallTouches << "/" << _overallTimeout;
+ assert(!_q.empty());
+ int ret = _q.front();
+ if(ret % 100 == 0)
+ {
+ cout << "." << flush;
+ }
+ _q.pop_front();
+ return ret;
}
private:
-
- bool _verbose;
+ const bool _broadcast;
bool _terminate;
- int _touches;
- int _timeout;
- long _overallTouches;
- long _overallTimeout;
+ list<int> _q;
};
-typedef Handle<WatchDog> WatchDogPtr;
+typedef Handle<Queue> QueuePtr;
+
+static IceUtil::StaticMutex coutMutex = ICE_STATIC_MUTEX_INITIALIZER;
class TestThread : public Thread
{
public:
- TestThread(const CountDownPtr& cd, const WatchDogPtr& dog, const QueuePtr& q) :
- _cd(cd), _dog(dog), _q(q)
+ TestThread(const CountDownPtr& cd, const QueuePtr& q, bool poll) :
+ _cd(cd), _q(q), _poll(poll)
{
}
virtual void
@@ -218,9 +165,14 @@ public:
while(true)
{
int res = 0;
- // This is a poll.
- bool tout = _q->timedGet(res, Time::milliSeconds(1));
- _dog->touch(!tout);
+ if(_poll)
+ {
+ _q->timedGet(res, Time::milliSeconds(10));
+ }
+ else
+ {
+ res = _q->get();
+ }
if(res == -1)
{
return;
@@ -230,8 +182,8 @@ public:
private:
const CountDownPtr _cd;
- const WatchDogPtr _dog;
const QueuePtr _q;
+ const bool _poll;
};
typedef Handle<TestThread> TestThreadPtr;
@@ -253,9 +205,9 @@ public:
{
while(true)
{
- _q->put(_v);
+ _q->put(_v++);
+ ThreadControl::yield();
}
- ThreadControl::yield();
}
else
{
@@ -302,12 +254,10 @@ main(int argc, char** argv)
int n = atoi(opts.optArg("n").c_str());
bool verbose = opts.isSet("v");
- cout << "running signal/broadcast timeout test... " << flush;
+ cout << "running signal/broadcast timeout test" << flush;
QueuePtr signalQ = new Queue(false);
- WatchDogPtr signalDog = new WatchDog(verbose);
QueuePtr broadcastQ = new Queue(true);
- WatchDogPtr broadcastDog = new WatchDog(verbose);
CountDownPtr cd = new CountDown(210);
list<TestThreadPtr> testThreads;
@@ -316,7 +266,7 @@ main(int argc, char** argv)
for(i = 0; i < 100; i++)
{
- TestThreadPtr p = new TestThread(cd, signalDog, signalQ);
+ TestThreadPtr p = new TestThread(cd, signalQ, i % 2);
p->start();
testThreads.push_back(p);
}
@@ -329,7 +279,7 @@ main(int argc, char** argv)
for(i = 0; i < 100; i++)
{
- TestThreadPtr p = new TestThread(cd, broadcastDog, broadcastQ);
+ TestThreadPtr p = new TestThread(cd, broadcastQ, i % 2);
p->start();
testThreads.push_back(p);
}
@@ -341,9 +291,6 @@ main(int argc, char** argv)
}
cd->waitZero();
- signalDog->start();
- broadcastDog->start();
-
while(!enqThreads.empty())
{
EnqueueThreadPtr p = enqThreads.front();
@@ -361,22 +308,7 @@ main(int argc, char** argv)
p->getThreadControl().join();
}
- if(verbose)
- {
- cout << endl;
- }
- broadcastDog->terminate();
- broadcastDog->getThreadControl().join();
-
- signalDog->terminate();
- signalDog->getThreadControl().join();
-
- cout << "broadcast (";
- broadcastDog->dump();
-
- cout << ") signal (";
- signalDog->dump();
- cout << ") ok" << endl;
+ cout << " ok" << endl;
return 0;
}