diff options
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 128 |
1 files changed, 80 insertions, 48 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 9d4d9db3169..e1fd5ace7e6 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -8,14 +8,15 @@ // ********************************************************************** // -// The following is required for GetThreadIOPendingFlag +// The following is required to bring in some definitions. Don't +// define with Metro Style apps. // -#if defined(_WIN32) && !defined(_WIN32_WINNT) -# define _WIN32_WINNT 0x0501 +#if defined(_WIN32) && !defined(_WIN32_WINNT) && WINAPI_FAMILY != 0x02 +# define _WIN32_WINNT 0x0501 #endif -#include <IceUtil/DisableWarnings.h> #include <Ice/ThreadPool.h> +#include <IceUtil/DisableWarnings.h> #include <Ice/EventHandler.h> #include <Ice/Network.h> #include <Ice/LocalException.h> @@ -26,6 +27,10 @@ #include <Ice/Properties.h> #include <Ice/TraceLevels.h> +#if defined(ICE_OS_WINRT) +# include <IceUtil/Unicode.h> +#endif + using namespace std; using namespace Ice; using namespace IceInternal; @@ -157,11 +162,11 @@ IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(ThreadPool* threadPool, _instance(instance), _selector(selector), _destroyed(false) -#ifdef ICE_USE_IOCP +#ifdef ICE_USE_IOCP , _info(SocketOperationRead) #endif { -#ifndef ICE_USE_IOCP +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) SOCKET fds[2]; createPipe(fds); _fdIntrRead = fds[0]; @@ -176,7 +181,7 @@ IceInternal::ThreadPoolWorkQueue::~ThreadPoolWorkQueue() { assert(_destroyed); -#ifndef ICE_USE_IOCP +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) try { closeSocket(_fdIntrRead); @@ -217,7 +222,7 @@ IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item) throw CommunicatorDestroyedException(__FILE__, __LINE__); } _workItems.push_back(item); -#ifndef ICE_USE_IOCP +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) if(_workItems.size() == 1) { postMessage(); @@ -227,7 +232,7 @@ IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item) #endif } -#ifdef ICE_USE_IOCP +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) bool IceInternal::ThreadPoolWorkQueue::startAsync(SocketOperation) { @@ -254,18 +259,18 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current) workItem = _workItems.front(); _workItems.pop_front(); -#ifndef ICE_USE_IOCP +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) if(_workItems.empty()) { char c; while(true) { ssize_t ret; -#ifdef _WIN32 +# ifdef _WIN32 ret = ::recv(_fdIntrRead, &c, 1, 0); -#else +# else ret = ::read(_fdIntrRead, &c, 1); -#endif +# endif if(ret == SOCKET_ERROR) { if(interrupted()) @@ -285,7 +290,7 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current) else { assert(_destroyed); -#ifdef ICE_USE_IOCP +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) postMessage(); #endif } @@ -317,24 +322,40 @@ IceInternal::ThreadPoolWorkQueue::toString() const NativeInfoPtr IceInternal::ThreadPoolWorkQueue::getNativeInfo() { -#ifndef ICE_USE_IOCP +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) return new NativeInfo(_fdIntrRead); -#endif +#else return 0; +#endif } void IceInternal::ThreadPoolWorkQueue::postMessage() { -#ifndef ICE_USE_IOCP +#if defined(ICE_USE_IOCP) + if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this), +# if defined(_MSC_VER) && (_MSC_VER < 1300) // COMPILER FIX: VC60 + reinterpret_cast<LPOVERLAPPED>(&_info) +# else + &_info +# endif + )) + { + SocketException ex(__FILE__, __LINE__); + ex.error = GetLastError(); + throw ex; + } +#elif defined(ICE_OS_WINRT) + _selector.completed(this, SocketOperationRead); +#else char c = 0; while(true) { -#ifdef _WIN32 +# ifdef _WIN32 if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR) -#else +# else if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR) -#endif +# endif { if(interrupted()) { @@ -347,19 +368,6 @@ IceInternal::ThreadPoolWorkQueue::postMessage() } break; } -#else - if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this), -#if defined(_MSC_VER) && (_MSC_VER < 1300) // COMPILER FIX: VC60 - reinterpret_cast<LPOVERLAPPED>(&_info) -#else - &_info -#endif - )) - { - SocketException ex(__FILE__, __LINE__); - ex.error = GetLastError(); - throw ex; - } #endif } @@ -379,21 +387,22 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _threadIdleTime(0), _stackSize(0), _inUse(0), -#ifndef ICE_USE_IOCP +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) _inUseIO(0), _nextHandler(_handlers.end()), #endif _promote(true) { PropertiesPtr properties = _instance->initializationData().properties; - -#ifdef _WIN32 +#ifndef ICE_OS_WINRT +# ifdef _WIN32 SYSTEM_INFO sysInfo; GetSystemInfo(&sysInfo); int nProcessors = sysInfo.dwNumberOfProcessors; -#else +# else int nProcessors = static_cast<int>(sysconf(_SC_NPROCESSORS_ONLN)); -#endif +# endif +#endif // // We use just one thread as the default. This is the fastest @@ -409,10 +418,12 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p } int sizeMax = properties->getPropertyAsIntWithDefault(_prefix + ".SizeMax", size); +#ifndef ICE_OS_WINRT if(sizeMax == -1) { sizeMax = nProcessors; } +#endif if(sizeMax < size) { Warning out(_instance->initializationData().logger); @@ -445,7 +456,11 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p const_cast<int&>(_size) = size; const_cast<int&>(_sizeMax) = sizeMax; const_cast<int&>(_sizeWarn) = sizeWarn; +#ifndef ICE_OS_WINRT const_cast<int&>(_sizeIO) = min(sizeMax, nProcessors); +#else + const_cast<int&>(_sizeIO) = sizeMax; +#endif const_cast<int&>(_threadIdleTime) = threadIdleTime; #ifdef ICE_USE_IOCP @@ -460,7 +475,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p stackSize = 0; } const_cast<size_t&>(_stackSize) = static_cast<size_t>(stackSize); - + const_cast<bool&>(_hasPriority) = properties->getProperty(_prefix + ".ThreadPriority") != ""; const_cast<int&>(_priority) = properties->getPropertyAsInt(_prefix + ".ThreadPriority"); if(!_hasPriority) @@ -468,7 +483,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p const_cast<bool&>(_hasPriority) = properties->getProperty("Ice.ThreadPriority") != ""; const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority"); } - + _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector); if(_instance->traceLevels()->threadPool >= 1) @@ -550,7 +565,7 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler) { Lock sync(*this); assert(!_destroyed); -#ifndef ICE_USE_IOCP +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) _selector.finish(handler.get()); // This must be called before! _workQueue->queue(new FinishedWorkItem(handler)); @@ -596,7 +611,7 @@ IceInternal::ThreadPool::joinWithAllThreads() (*p)->getThreadControl().join(); } -#ifndef ICE_USE_IOCP +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) _selector.finish(_workQueue.get()); #endif _selector.destroy(); @@ -611,7 +626,7 @@ IceInternal::ThreadPool::prefix() const void IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) { -#ifndef ICE_USE_IOCP +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) ThreadPoolCurrent current(_instance, this); bool select = false; vector<pair<EventHandler*, SocketOperation> > handlers; @@ -633,6 +648,14 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString(); } +#ifdef ICE_OS_WINRT + catch(Platform::Exception^ ex) + { + Error out(_instance->initializationData().logger); + out << "exception in `" << _prefix << "':\n" << IceUtil::wstringToString(ex->Message->Data()) + << "\nevent handler: " << current._handler->toString(); + } +#endif catch(...) { Error out(_instance->initializationData().logger); @@ -767,13 +790,14 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) } else if(_inUse < static_cast<int>(_threads.size() - 1)) // If not the last idle thread, we can exit. { +#ifndef ICE_OS_WINRT BOOL hasIO = false; GetThreadIOPendingFlag(GetCurrentThread(), &hasIO); if(hasIO) { continue; } - +#endif if(_instance->traceLevels()->threadPool >= 1) { Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); @@ -782,7 +806,7 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) _threads.erase(thread); _workQueue->queue(new JoinThreadWorkItem(thread)); return; - } + } else if(_inUse > 0) { // @@ -831,6 +855,14 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) Error out(_instance->initializationData().logger); out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString(); } +#ifdef ICE_OS_WINRT + catch(Platform::Exception^ ex) + { + Error out(_instance->initializationData().logger); + out << "exception in `" << _prefix << "':\n" << IceUtil::wstringToString(ex->Message->Data()) + << "\nevent handler: " << current._handler->toString(); + } +#endif catch(...) { Error out(_instance->initializationData().logger); @@ -848,7 +880,7 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) if(_sizeMax > 1) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); -#ifndef ICE_USE_IOCP +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) --_inUseIO; if(_serialize && !_destroyed) @@ -915,7 +947,7 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) return _serialize; } -#ifdef ICE_USE_IOCP +#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) bool IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) { @@ -1133,7 +1165,7 @@ ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, const ThreadPo stream(instance.get(), Ice::currentProtocolEncoding), _threadPool(threadPool.get()), _ioCompleted(false) -#ifndef ICE_USE_IOCP +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) , _leader(false) #endif { |