summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorJose <jose@zeroc.com>2012-07-13 00:18:06 +0200
committerJose <jose@zeroc.com>2012-07-13 00:18:06 +0200
commit70802b63320582f0afa8229659ea9fe4a21d02ec (patch)
treeeb455947cc774cc558f96b8d7c78373d2a6f1c2b /cpp/src/Ice/ThreadPool.cpp
parentICE-4839 - Glacier2 sessionHelper IceSSL plug-in (diff)
downloadice-70802b63320582f0afa8229659ea9fe4a21d02ec.tar.bz2
ice-70802b63320582f0afa8229659ea9fe4a21d02ec.tar.xz
ice-70802b63320582f0afa8229659ea9fe4a21d02ec.zip
WinRT support
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp128
1 files changed, 80 insertions, 48 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index db412ed06f8..288377987db 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -8,14 +8,15 @@
// **********************************************************************
//
-// The following is required for GetThreadIOPendingFlag
+// The following is required to bring in some definitions. Don't
+// define with Metro Style apps.
//
-#if defined(_WIN32) && !defined(_WIN32_WINNT)
-# define _WIN32_WINNT 0x0501
+#if defined(_WIN32) && !defined(_WIN32_WINNT) && WINAPI_FAMILY != 0x02
+# define _WIN32_WINNT 0x0501
#endif
-#include <IceUtil/DisableWarnings.h>
#include <Ice/ThreadPool.h>
+#include <IceUtil/DisableWarnings.h>
#include <Ice/EventHandler.h>
#include <Ice/Network.h>
#include <Ice/LocalException.h>
@@ -26,6 +27,10 @@
#include <Ice/Properties.h>
#include <Ice/TraceLevels.h>
+#if defined(ICE_OS_WINRT)
+# include <IceUtil/Unicode.h>
+#endif
+
using namespace std;
using namespace Ice;
using namespace IceInternal;
@@ -157,11 +162,11 @@ IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(ThreadPool* threadPool,
_instance(instance),
_selector(selector),
_destroyed(false)
-#ifdef ICE_USE_IOCP
+#ifdef ICE_USE_IOCP
, _info(SocketOperationRead)
#endif
{
-#ifndef ICE_USE_IOCP
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
SOCKET fds[2];
createPipe(fds);
_fdIntrRead = fds[0];
@@ -176,7 +181,7 @@ IceInternal::ThreadPoolWorkQueue::~ThreadPoolWorkQueue()
{
assert(_destroyed);
-#ifndef ICE_USE_IOCP
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
try
{
closeSocket(_fdIntrRead);
@@ -217,7 +222,7 @@ IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item)
throw CommunicatorDestroyedException(__FILE__, __LINE__);
}
_workItems.push_back(item);
-#ifndef ICE_USE_IOCP
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
if(_workItems.size() == 1)
{
postMessage();
@@ -227,7 +232,7 @@ IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item)
#endif
}
-#ifdef ICE_USE_IOCP
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
bool
IceInternal::ThreadPoolWorkQueue::startAsync(SocketOperation)
{
@@ -254,18 +259,18 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current)
workItem = _workItems.front();
_workItems.pop_front();
-#ifndef ICE_USE_IOCP
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
if(_workItems.empty())
{
char c;
while(true)
{
ssize_t ret;
-#ifdef _WIN32
+# ifdef _WIN32
ret = ::recv(_fdIntrRead, &c, 1, 0);
-#else
+# else
ret = ::read(_fdIntrRead, &c, 1);
-#endif
+# endif
if(ret == SOCKET_ERROR)
{
if(interrupted())
@@ -285,7 +290,7 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current)
else
{
assert(_destroyed);
-#ifdef ICE_USE_IOCP
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
postMessage();
#endif
}
@@ -317,24 +322,40 @@ IceInternal::ThreadPoolWorkQueue::toString() const
NativeInfoPtr
IceInternal::ThreadPoolWorkQueue::getNativeInfo()
{
-#ifndef ICE_USE_IOCP
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
return new NativeInfo(_fdIntrRead);
-#endif
+#else
return 0;
+#endif
}
void
IceInternal::ThreadPoolWorkQueue::postMessage()
{
-#ifndef ICE_USE_IOCP
+#if defined(ICE_USE_IOCP)
+ if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this),
+# if defined(_MSC_VER) && (_MSC_VER < 1300) // COMPILER FIX: VC60
+ reinterpret_cast<LPOVERLAPPED>(&_info)
+# else
+ &_info
+# endif
+ ))
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = GetLastError();
+ throw ex;
+ }
+#elif defined(ICE_OS_WINRT)
+ _selector.completed(this, SocketOperationRead);
+#else
char c = 0;
while(true)
{
-#ifdef _WIN32
+# ifdef _WIN32
if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR)
-#else
+# else
if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR)
-#endif
+# endif
{
if(interrupted())
{
@@ -347,19 +368,6 @@ IceInternal::ThreadPoolWorkQueue::postMessage()
}
break;
}
-#else
- if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this),
-#if defined(_MSC_VER) && (_MSC_VER < 1300) // COMPILER FIX: VC60
- reinterpret_cast<LPOVERLAPPED>(&_info)
-#else
- &_info
-#endif
- ))
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = GetLastError();
- throw ex;
- }
#endif
}
@@ -379,21 +387,22 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
_threadIdleTime(0),
_stackSize(0),
_inUse(0),
-#ifndef ICE_USE_IOCP
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
_inUseIO(0),
_nextHandler(_handlers.end()),
#endif
_promote(true)
{
PropertiesPtr properties = _instance->initializationData().properties;
-
-#ifdef _WIN32
+#ifndef ICE_OS_WINRT
+# ifdef _WIN32
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
int nProcessors = sysInfo.dwNumberOfProcessors;
-#else
+# else
int nProcessors = static_cast<int>(sysconf(_SC_NPROCESSORS_ONLN));
-#endif
+# endif
+#endif
//
// We use just one thread as the default. This is the fastest
@@ -409,10 +418,12 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
}
int sizeMax = properties->getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
+#ifndef ICE_OS_WINRT
if(sizeMax == -1)
{
sizeMax = nProcessors;
}
+#endif
if(sizeMax < size)
{
Warning out(_instance->initializationData().logger);
@@ -445,7 +456,11 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
const_cast<int&>(_size) = size;
const_cast<int&>(_sizeMax) = sizeMax;
const_cast<int&>(_sizeWarn) = sizeWarn;
+#ifndef ICE_OS_WINRT
const_cast<int&>(_sizeIO) = min(sizeMax, nProcessors);
+#else
+ const_cast<int&>(_sizeIO) = sizeMax;
+#endif
const_cast<int&>(_threadIdleTime) = threadIdleTime;
#ifdef ICE_USE_IOCP
@@ -460,7 +475,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
stackSize = 0;
}
const_cast<size_t&>(_stackSize) = static_cast<size_t>(stackSize);
-
+
const_cast<bool&>(_hasPriority) = properties->getProperty(_prefix + ".ThreadPriority") != "";
const_cast<int&>(_priority) = properties->getPropertyAsInt(_prefix + ".ThreadPriority");
if(!_hasPriority)
@@ -468,7 +483,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
const_cast<bool&>(_hasPriority) = properties->getProperty("Ice.ThreadPriority") != "";
const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority");
}
-
+
_workQueue = new ThreadPoolWorkQueue(this, _instance, _selector);
if(_instance->traceLevels()->threadPool >= 1)
@@ -550,7 +565,7 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler)
{
Lock sync(*this);
assert(!_destroyed);
-#ifndef ICE_USE_IOCP
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
_selector.finish(handler.get()); // This must be called before!
_workQueue->queue(new FinishedWorkItem(handler));
@@ -596,7 +611,7 @@ IceInternal::ThreadPool::joinWithAllThreads()
(*p)->getThreadControl().join();
}
-#ifndef ICE_USE_IOCP
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
_selector.finish(_workQueue.get());
#endif
_selector.destroy();
@@ -611,7 +626,7 @@ IceInternal::ThreadPool::prefix() const
void
IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread)
{
-#ifndef ICE_USE_IOCP
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
ThreadPoolCurrent current(_instance, this);
bool select = false;
vector<pair<EventHandler*, SocketOperation> > handlers;
@@ -633,6 +648,14 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread)
out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: "
<< current._handler->toString();
}
+#ifdef ICE_OS_WINRT
+ catch(Platform::Exception^ ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "':\n" << IceUtil::wstringToString(ex->Message->Data())
+ << "\nevent handler: " << current._handler->toString();
+ }
+#endif
catch(...)
{
Error out(_instance->initializationData().logger);
@@ -767,13 +790,14 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread)
}
else if(_inUse < static_cast<int>(_threads.size() - 1)) // If not the last idle thread, we can exit.
{
+#ifndef ICE_OS_WINRT
BOOL hasIO = false;
GetThreadIOPendingFlag(GetCurrentThread(), &hasIO);
if(hasIO)
{
continue;
}
-
+#endif
if(_instance->traceLevels()->threadPool >= 1)
{
Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
@@ -782,7 +806,7 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread)
_threads.erase(thread);
_workQueue->queue(new JoinThreadWorkItem(thread));
return;
- }
+ }
else if(_inUse > 0)
{
//
@@ -831,6 +855,14 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread)
Error out(_instance->initializationData().logger);
out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString();
}
+#ifdef ICE_OS_WINRT
+ catch(Platform::Exception^ ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "':\n" << IceUtil::wstringToString(ex->Message->Data())
+ << "\nevent handler: " << current._handler->toString();
+ }
+#endif
catch(...)
{
Error out(_instance->initializationData().logger);
@@ -848,7 +880,7 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
if(_sizeMax > 1)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-#ifndef ICE_USE_IOCP
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
--_inUseIO;
if(_serialize && !_destroyed)
@@ -915,7 +947,7 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
return _serialize;
}
-#ifdef ICE_USE_IOCP
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
bool
IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
{
@@ -1133,7 +1165,7 @@ ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, const ThreadPo
stream(instance.get()),
_threadPool(threadPool.get()),
_ioCompleted(false)
-#ifndef ICE_USE_IOCP
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
, _leader(false)
#endif
{