diff options
author | Matthew Newhook <matthew@zeroc.com> | 2007-06-01 04:17:02 +0000 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2007-06-01 04:17:02 +0000 |
commit | a6f0fb014ff7b646357d9b5d550eaa530dbc3455 (patch) | |
tree | fe3c952f0aa8c20ee0c9684ece815c454747c3fb /cpp/src/IceUtil/Cond.cpp | |
parent | http://bugzilla.zeroc.com/bugzilla/show_bug.cgi?id=2229 (diff) | |
download | ice-a6f0fb014ff7b646357d9b5d550eaa530dbc3455.tar.bz2 ice-a6f0fb014ff7b646357d9b5d550eaa530dbc3455.tar.xz ice-a6f0fb014ff7b646357d9b5d550eaa530dbc3455.zip |
http://bugzilla.zeroc.com/bugzilla/show_bug.cgi?id=2229
Diffstat (limited to 'cpp/src/IceUtil/Cond.cpp')
-rw-r--r-- | cpp/src/IceUtil/Cond.cpp | 150 |
1 files changed, 127 insertions, 23 deletions
diff --git a/cpp/src/IceUtil/Cond.cpp b/cpp/src/IceUtil/Cond.cpp index 0f895980c9a..87f27e1e413 100644 --- a/cpp/src/IceUtil/Cond.cpp +++ b/cpp/src/IceUtil/Cond.cpp @@ -50,6 +50,7 @@ IceUtil::Semaphore::timedWait(const Time& timeout) const return rc != WAIT_TIMEOUT; } + void IceUtil::Semaphore::post(int count) const { @@ -60,6 +61,37 @@ IceUtil::Semaphore::post(int count) const } } +// +// The _queue semaphore is used to wait for the condition variable to +// be signaled. When signal is called any further thread signaling or +// threads waiting to wait (in preWait) are blocked from proceeding +// using an addition _gate semaphore) until the correct number of +// threads waiting on the _queue semaphore drain through postWait +// +// As each thread drains through postWait if there are further threads +// to unblock (toUnblock > 0) the _queue is posted again to wake a +// further waiting thread, otherwise the _gate is posted which permits +// any signaling or preWait threads to continue. Therefore, the _gate +// semaphore is used protect further entry into signal or wait until +// all signaled threads have woken. +// +// _blocked is the number of waiting threads. _unblocked is the +// number of threads which have unblocked. We use two variables +// because _blocked is protected by the _gate, whereas _unblocked is +// protected by the _internal mutex. There is an assumption here about +// memory visibility since postWait does not itself acquire the _gate +// semaphore (note that the _gate must be held if _toUnblock != 0). +// +// _toUnblock is a tri-state variable. 0 is no signal/broadcast +// pending. -1 is signal pending. 1 is a broadcast pending. +// +// Threads timing out present a particular issue because they may have +// woken without a corresponding notification and its easy to leave +// the _queue in a state where a spurious wakeup will occur -- +// consider a notify and a timed wake occuring at the same time. In +// this case, if we are not careful the _queue will have been posted, +// but the waking thread may not consume the semaphore. +// IceUtil::Cond::Cond() : _gate(1), _blocked(0), @@ -88,73 +120,145 @@ void IceUtil::Cond::wake(bool broadcast) { // - // Lock gate & mutex. + // Lock gate. The gate will be locked if there are threads waiting + // to drain from postWait. // _gate.wait(); - _internal.lock(); + // + // Lock the internal mutex. + // + IceUtil::Mutex::Lock sync(_internal); + + // + // Adjust the count of the number of waiting/blocked threads. + // if(_unblocked != 0) { _blocked -= _unblocked; _unblocked = 0; } + // + // If there are blocked threads then we set _toUnblock to the + // number of waiting threads if broadcast was called, or 1 if + // signal is called. + // if(_blocked > 0) { // - // Unblock some number of waiters. + // Unblock some number of waiters. We use -1 for the signal + // case. + // + assert(_toUnblock == 0); + _toUnblock = (broadcast) ? 1 : -1; + // + // Posting the queue wakes a single waiting thread. After this + // occurs the waiting thread will wake and then either post on + // the _queue to wake the next waiting thread, or post on the + // gate to permit more signaling to proceed. // - _toUnblock = (broadcast) ? _blocked : 1; - _internal.unlock(); + // Release before posting to avoid potential immediate + // context switch due to the mutex being locked. + // + sync.release(); _queue.post(); } else { // - // Otherwise no blocked waiters, release gate & mutex. + // Otherwise no blocked waiters, release the gate. + // + // Release before posting to avoid potential immediate + // context switch due to the mutex being locked. // + sync.release(); _gate.post(); - _internal.unlock(); } } void IceUtil::Cond::preWait() const { + // + // _gate is used to protect _blocked. Furthermore, this prevents + // further threads from entering the wait state while a + // signal/broadcast is being processed. + // _gate.wait(); _blocked++; _gate.post(); } void -IceUtil::Cond::postWait(bool timedOut) const +IceUtil::Cond::postWait(bool timedOutOrFailed) const { - _internal.lock(); + IceUtil::Mutex::Lock sync(_internal); + + // + // One more thread has unblocked. + // _unblocked++; - if(_toUnblock != 0) + // + // If _toUnblock is 0 then this must be a timeout, otherwise its a + // spurious wakeup which is incorrect. + // + if(_toUnblock == 0) + { + assert(timedOutOrFailed); + return; + } + + if(timedOutOrFailed) { - bool last = --_toUnblock == 0; - _internal.unlock(); - - if(timedOut) + // + // If the thread was the last blocked thread and there's a + // pending signal/broadcast, reset the signal/broadcast to + // prevent spurious wakeup. + // + if(_blocked == _unblocked) { + _toUnblock = 0; + // + // Consume the queue post to prevent spurious wakeup. Note + // that although the internal mutex could be released + // prior to this wait() call, doing so gains nothing since + // this wait() MUST return immediately (if it does not + // there is a major bug and the entire application will + // deadlock). + // _queue.wait(); + // + // Release before posting to avoid potential immediate + // context switch due to the mutex being locked. + // + sync.release(); + _gate.post(); } - - if(last) + } + else + { + // + // At this point, the thread must have been woken up because + // of a signal/broadcast. + // + if(_toUnblock == -1 || _blocked == _unblocked) // Signal or broadcast and no more blocked threads { + _toUnblock = 0; + // Release before posting to avoid potential immediate + // context switch due to the mutex being locked. + sync.release(); _gate.post(); } - else + else // Broadcast and more blocked threads to wake up. { + // Release before posting to avoid potential immediate + // context switch due to the mutex being locked. + sync.release(); _queue.post(); } } - else - { - _internal.unlock(); - } } void @@ -167,7 +271,7 @@ IceUtil::Cond::dowait() const } catch(...) { - postWait(false); + postWait(true); throw; } } @@ -183,7 +287,7 @@ IceUtil::Cond::timedDowait(const Time& timeout) const } catch(...) { - postWait(false); + postWait(true); throw; } } |