summaryrefslogtreecommitdiff
path: root/cpp/src/IceUtil/Cond.cpp
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2007-06-01 04:17:02 +0000
committerMatthew Newhook <matthew@zeroc.com>2007-06-01 04:17:02 +0000
commita6f0fb014ff7b646357d9b5d550eaa530dbc3455 (patch)
treefe3c952f0aa8c20ee0c9684ece815c454747c3fb /cpp/src/IceUtil/Cond.cpp
parenthttp://bugzilla.zeroc.com/bugzilla/show_bug.cgi?id=2229 (diff)
downloadice-a6f0fb014ff7b646357d9b5d550eaa530dbc3455.tar.bz2
ice-a6f0fb014ff7b646357d9b5d550eaa530dbc3455.tar.xz
ice-a6f0fb014ff7b646357d9b5d550eaa530dbc3455.zip
http://bugzilla.zeroc.com/bugzilla/show_bug.cgi?id=2229
Diffstat (limited to 'cpp/src/IceUtil/Cond.cpp')
-rw-r--r--cpp/src/IceUtil/Cond.cpp150
1 files changed, 127 insertions, 23 deletions
diff --git a/cpp/src/IceUtil/Cond.cpp b/cpp/src/IceUtil/Cond.cpp
index 0f895980c9a..87f27e1e413 100644
--- a/cpp/src/IceUtil/Cond.cpp
+++ b/cpp/src/IceUtil/Cond.cpp
@@ -50,6 +50,7 @@ IceUtil::Semaphore::timedWait(const Time& timeout) const
return rc != WAIT_TIMEOUT;
}
+
void
IceUtil::Semaphore::post(int count) const
{
@@ -60,6 +61,37 @@ IceUtil::Semaphore::post(int count) const
}
}
+//
+// The _queue semaphore is used to wait for the condition variable to
+// be signaled. When signal is called any further thread signaling or
+// threads waiting to wait (in preWait) are blocked from proceeding
+// using an addition _gate semaphore) until the correct number of
+// threads waiting on the _queue semaphore drain through postWait
+//
+// As each thread drains through postWait if there are further threads
+// to unblock (toUnblock > 0) the _queue is posted again to wake a
+// further waiting thread, otherwise the _gate is posted which permits
+// any signaling or preWait threads to continue. Therefore, the _gate
+// semaphore is used protect further entry into signal or wait until
+// all signaled threads have woken.
+//
+// _blocked is the number of waiting threads. _unblocked is the
+// number of threads which have unblocked. We use two variables
+// because _blocked is protected by the _gate, whereas _unblocked is
+// protected by the _internal mutex. There is an assumption here about
+// memory visibility since postWait does not itself acquire the _gate
+// semaphore (note that the _gate must be held if _toUnblock != 0).
+//
+// _toUnblock is a tri-state variable. 0 is no signal/broadcast
+// pending. -1 is signal pending. 1 is a broadcast pending.
+//
+// Threads timing out present a particular issue because they may have
+// woken without a corresponding notification and its easy to leave
+// the _queue in a state where a spurious wakeup will occur --
+// consider a notify and a timed wake occuring at the same time. In
+// this case, if we are not careful the _queue will have been posted,
+// but the waking thread may not consume the semaphore.
+//
IceUtil::Cond::Cond() :
_gate(1),
_blocked(0),
@@ -88,73 +120,145 @@ void
IceUtil::Cond::wake(bool broadcast)
{
//
- // Lock gate & mutex.
+ // Lock gate. The gate will be locked if there are threads waiting
+ // to drain from postWait.
//
_gate.wait();
- _internal.lock();
+ //
+ // Lock the internal mutex.
+ //
+ IceUtil::Mutex::Lock sync(_internal);
+
+ //
+ // Adjust the count of the number of waiting/blocked threads.
+ //
if(_unblocked != 0)
{
_blocked -= _unblocked;
_unblocked = 0;
}
+ //
+ // If there are blocked threads then we set _toUnblock to the
+ // number of waiting threads if broadcast was called, or 1 if
+ // signal is called.
+ //
if(_blocked > 0)
{
//
- // Unblock some number of waiters.
+ // Unblock some number of waiters. We use -1 for the signal
+ // case.
+ //
+ assert(_toUnblock == 0);
+ _toUnblock = (broadcast) ? 1 : -1;
+ //
+ // Posting the queue wakes a single waiting thread. After this
+ // occurs the waiting thread will wake and then either post on
+ // the _queue to wake the next waiting thread, or post on the
+ // gate to permit more signaling to proceed.
//
- _toUnblock = (broadcast) ? _blocked : 1;
- _internal.unlock();
+ // Release before posting to avoid potential immediate
+ // context switch due to the mutex being locked.
+ //
+ sync.release();
_queue.post();
}
else
{
//
- // Otherwise no blocked waiters, release gate & mutex.
+ // Otherwise no blocked waiters, release the gate.
+ //
+ // Release before posting to avoid potential immediate
+ // context switch due to the mutex being locked.
//
+ sync.release();
_gate.post();
- _internal.unlock();
}
}
void
IceUtil::Cond::preWait() const
{
+ //
+ // _gate is used to protect _blocked. Furthermore, this prevents
+ // further threads from entering the wait state while a
+ // signal/broadcast is being processed.
+ //
_gate.wait();
_blocked++;
_gate.post();
}
void
-IceUtil::Cond::postWait(bool timedOut) const
+IceUtil::Cond::postWait(bool timedOutOrFailed) const
{
- _internal.lock();
+ IceUtil::Mutex::Lock sync(_internal);
+
+ //
+ // One more thread has unblocked.
+ //
_unblocked++;
- if(_toUnblock != 0)
+ //
+ // If _toUnblock is 0 then this must be a timeout, otherwise its a
+ // spurious wakeup which is incorrect.
+ //
+ if(_toUnblock == 0)
+ {
+ assert(timedOutOrFailed);
+ return;
+ }
+
+ if(timedOutOrFailed)
{
- bool last = --_toUnblock == 0;
- _internal.unlock();
-
- if(timedOut)
+ //
+ // If the thread was the last blocked thread and there's a
+ // pending signal/broadcast, reset the signal/broadcast to
+ // prevent spurious wakeup.
+ //
+ if(_blocked == _unblocked)
{
+ _toUnblock = 0;
+ //
+ // Consume the queue post to prevent spurious wakeup. Note
+ // that although the internal mutex could be released
+ // prior to this wait() call, doing so gains nothing since
+ // this wait() MUST return immediately (if it does not
+ // there is a major bug and the entire application will
+ // deadlock).
+ //
_queue.wait();
+ //
+ // Release before posting to avoid potential immediate
+ // context switch due to the mutex being locked.
+ //
+ sync.release();
+ _gate.post();
}
-
- if(last)
+ }
+ else
+ {
+ //
+ // At this point, the thread must have been woken up because
+ // of a signal/broadcast.
+ //
+ if(_toUnblock == -1 || _blocked == _unblocked) // Signal or broadcast and no more blocked threads
{
+ _toUnblock = 0;
+ // Release before posting to avoid potential immediate
+ // context switch due to the mutex being locked.
+ sync.release();
_gate.post();
}
- else
+ else // Broadcast and more blocked threads to wake up.
{
+ // Release before posting to avoid potential immediate
+ // context switch due to the mutex being locked.
+ sync.release();
_queue.post();
}
}
- else
- {
- _internal.unlock();
- }
}
void
@@ -167,7 +271,7 @@ IceUtil::Cond::dowait() const
}
catch(...)
{
- postWait(false);
+ postWait(true);
throw;
}
}
@@ -183,7 +287,7 @@ IceUtil::Cond::timedDowait(const Time& timeout) const
}
catch(...)
{
- postWait(false);
+ postWait(true);
throw;
}
}