summaryrefslogtreecommitdiff
path: root/cpp/test/IceUtil/priority/PriorityInversion.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/test/IceUtil/priority/PriorityInversion.cpp')
-rw-r--r--cpp/test/IceUtil/priority/PriorityInversion.cpp389
1 files changed, 389 insertions, 0 deletions
diff --git a/cpp/test/IceUtil/priority/PriorityInversion.cpp b/cpp/test/IceUtil/priority/PriorityInversion.cpp
new file mode 100644
index 00000000000..828f689f6f1
--- /dev/null
+++ b/cpp/test/IceUtil/priority/PriorityInversion.cpp
@@ -0,0 +1,389 @@
+
+#include <PriorityInversion.h>
+#include <IceUtil/Thread.h>
+#include <IceUtil/Shared.h>
+#include <IceUtil/Mutex.h>
+#include <IceUtil/Monitor.h>
+#include <IceUtil/RecMutex.h>
+
+#include <sstream>
+#include <TestCommon.h>
+#include <vector>
+#include <map>
+
+using namespace std;
+using namespace IceUtil;
+
+long fib_num ( long n)
+{
+ if ( n == 0)
+ {
+ return 0;
+ }
+
+ else if ( n == 1 )
+ {
+ return 1;
+ }
+ else
+ {
+ return fib_num( n -1) + fib_num(n-2);
+ }
+}
+
+class TaskCollector : public IceUtil::Shared
+{
+public:
+
+ TaskCollector(int cores, int high, int medium, int low, Monitor<Mutex>& monitor) :
+ _lowBegin(0),
+ _lowEnd(0),
+ _mediumBegin(0),
+ _mediumEnd(0),
+ _highBegin(0),
+ _highEnd(0),
+ _cores(cores),
+ _high(high),
+ _medium(medium),
+ _low(low),
+ _monitor(monitor)
+ {
+ }
+
+ void waitAll()
+ {
+ Monitor<Mutex>::Lock lock(_monitor);
+ while(_mediumBegin < _cores)
+ {
+ //Wait until all task are ready to compete by processors
+ _monitor.wait();
+ }
+ }
+
+ void taskBegin(int priority)
+ {
+ Monitor<Mutex>::Lock lock(_monitor);
+ if(priority == _low)
+ {
+ _lowBegin++;
+ }
+ else if(priority == _medium)
+ {
+ _mediumBegin++;
+ }
+ else if(priority == _high)
+ {
+ _highBegin++;
+ }
+ _monitor.notifyAll();
+ }
+
+ void taskEnd(int priority)
+ {
+ Monitor<Mutex>::Lock lock(_monitor);
+ //
+ // Test all task begin run before any task ends.
+ //
+ test(_lowBegin == 1);
+ test(_highBegin == 1);
+ test(_mediumBegin == _cores);
+ if(priority == _low)
+ {
+ //
+ // Low priority thread should end before all medium priority threads.
+ //
+ test(_mediumEnd == 0);
+ _lowEnd++;
+ }
+ else if(priority == _medium)
+ {
+ //
+ // When the first medium priority task end the
+ // low priority task completed.
+ //
+ test(_lowEnd > 0);
+ _mediumEnd++;
+ }
+ }
+
+private:
+
+ int _lowBegin;
+ int _lowEnd;
+ int _mediumBegin;
+ int _mediumEnd;
+ int _highBegin;
+ int _highEnd;
+ int _cores;
+ int _high;
+ int _medium;
+ int _low;
+ Monitor<Mutex>& _monitor;
+ IceUtil::Mutex _mutex;
+};
+typedef IceUtil::Handle<TaskCollector> TaskCollectorPtr;
+
+
+class SharedResource : public IceUtil::Shared
+{
+public:
+
+ SharedResource(const TaskCollectorPtr& taskCollector) :
+ _taskCollector(taskCollector)
+ {
+ }
+
+ TaskCollectorPtr taskCollector() const { return _taskCollector; }
+
+ virtual void run(int priority) = 0;
+
+private:
+
+ TaskCollectorPtr _taskCollector;
+};
+typedef IceUtil::Handle<SharedResource> SharedResourcePtr;
+
+class SharedResourceMutex : public SharedResource
+{
+public:
+
+ SharedResourceMutex(const TaskCollectorPtr& taskCollector) :
+ SharedResource(taskCollector)
+ {
+ }
+
+ virtual void run(int priority)
+ {
+ taskCollector()->taskBegin(priority);
+ Mutex::Lock lock(_mutex);
+ taskCollector()->waitAll();
+ fib_num(30);
+ taskCollector()->taskEnd(priority);
+ }
+
+private:
+
+ IceUtil::Mutex _mutex;
+};
+
+
+class SharedResourceRecMutex : public SharedResource
+{
+public:
+
+ SharedResourceRecMutex(const TaskCollectorPtr& taskCollector) :
+ SharedResource(taskCollector)
+ {
+ }
+
+ void run(int priority)
+ {
+ taskCollector()->taskBegin(priority);
+ RecMutex::Lock lock(_mutex);
+ taskCollector()->waitAll();
+ fib_num(30);
+ taskCollector()->taskEnd(priority);
+ }
+
+private:
+
+ IceUtil::RecMutex _mutex;
+};
+
+class ThreadCommon : public IceUtil::Thread
+{
+public:
+
+ virtual void run() = 0;
+ int getPriority()
+ {
+#ifdef _WIN32_WCE
+ return CeGetThreadPriority(GetCurrentThread());
+#elif defined _WIN32
+ return GetThreadPriority(GetCurrentThread());
+#else
+ sched_param param;
+ int sched_policy;
+ pthread_t thread = pthread_self();
+ pthread_getschedparam(thread, &sched_policy, &param);
+ return param.sched_priority;
+#endif
+ }
+};
+
+class Task : public ThreadCommon
+{
+public:
+
+ Task(const SharedResourcePtr& shared) :
+ _shared(shared)
+ {
+ }
+
+ virtual void run()
+ {
+ _shared->run(getPriority());
+ }
+
+private:
+
+ SharedResourcePtr _shared;
+};
+typedef IceUtil::Handle<Task> TaskPtr;
+
+class MediumPriorityThread : public ThreadCommon
+{
+public:
+
+ MediumPriorityThread(const TaskCollectorPtr& taskCollector, const ThreadPtr& highPriorityThread, int timeout) :
+ _taskCollector(taskCollector),
+ _highPriorityThread(highPriorityThread),
+ _timeout(IceUtil::Time::seconds(timeout))
+ {
+ }
+
+ virtual void run()
+ {
+ IceUtil::Time timestamp = IceUtil::Time::now(IceUtil::Time::Monotonic);
+ _taskCollector->taskBegin(getPriority());
+ while(true)
+ {
+ if(IceUtil::Time::now(IceUtil::Time::Monotonic) - timestamp > _timeout)
+ {
+ // If high priority task do not end with the specific timeout means
+ // that the low priority task priority was not bosted so we are having
+ // the clasic priority inversion issue.
+ test(false);
+ }
+ if(!_highPriorityThread->isAlive())
+ {
+ break;
+ }
+ fib_num(10);
+ }
+ _taskCollector->taskEnd(getPriority());
+ }
+
+private:
+
+ const TaskCollectorPtr _taskCollector;
+ const ThreadPtr _highPriorityThread;
+ const IceUtil::Time _timeout;
+};
+
+static const string priorityTestName("priority inversion");
+
+PriorityInversionTest::PriorityInversionTest() :
+ TestBase(priorityTestName)
+{
+}
+
+void
+PriorityInversionTest::run()
+{
+ int cores, high, medium, low, timeout;
+ timeout = 2;
+#ifdef _WIN32
+ return; //Priority inversion is not supported by WIN32
+#else
+ cores = sysconf(_SC_NPROCESSORS_ONLN);
+ high = 45;
+ medium = 35;
+ low = 1;
+#endif
+
+ {
+ Monitor<Mutex> monitor;
+ TaskCollectorPtr collector = new TaskCollector(cores, high, medium, low, monitor);
+ vector<ThreadControl> threads;
+
+ SharedResourcePtr shared = new SharedResourceMutex(collector);
+
+ //
+ // Create one low priority thread.
+ //
+ ThreadPtr lowThread = new Task(shared);
+ threads.push_back(lowThread->start(128, low));
+
+ //
+ // Create one high priority thread that use the same shared resource
+ // as the previous low priority thread
+ //
+ ThreadPtr highThread = new Task(shared);
+ threads.push_back(highThread->start(128, high));
+
+ //
+ // Create one medium priority thread per core.
+ //
+ for(int cont = 0; cont < cores; ++cont)
+ {
+ ThreadPtr t = new MediumPriorityThread(collector, highThread, timeout);
+ threads.push_back(t->start(128, medium));
+ }
+
+ //
+ // Join with all the threads.
+ //
+ vector<ThreadControl>::iterator it;
+ for(it = threads.begin(); it != threads.end(); ++it)
+ {
+ try
+ {
+ (*it).join();
+ }
+ catch(...)
+ {
+ }
+ }
+ }
+
+ //
+ // Same test with a recursive mutex.
+ //
+ {
+ Monitor<Mutex> monitor;
+ TaskCollectorPtr collector = new TaskCollector(cores, high, medium, low, monitor);
+
+ SharedResourcePtr shared = new SharedResourceRecMutex(collector);
+
+ vector<ThreadControl> threads;
+
+ //
+ // Create one low priority thread.
+ //
+ ThreadPtr lowThread = new Task(shared);
+ threads.push_back(lowThread->start(128, low));
+
+ //
+ // Create one high priority thread that use the same shared resource
+ // as the previous low priority thread.
+ //
+ ThreadPtr highThread = new Task(shared);
+ threads.push_back(highThread->start(128, high));
+
+ //
+ // Create one medium priority tasks per core that runs until
+ // the high priority thread is running.
+ //
+ for(int cont = 0; cont < cores; ++cont)
+ {
+ ThreadPtr t = new MediumPriorityThread(collector, highThread, timeout);
+ threads.push_back(t->start(128, medium));
+ }
+
+ //
+ // Join with all the threads.
+ //
+ vector<ThreadControl>::iterator it;
+ for(it = threads.begin(); it != threads.end(); ++it)
+ {
+ try
+ {
+ (*it).join();
+ }
+ catch(...)
+ {
+ }
+ }
+ }
+}