diff options
author | Dwayne Boone <dwayne@zeroc.com> | 2009-10-02 09:35:28 -0230 |
---|---|---|
committer | Dwayne Boone <dwayne@zeroc.com> | 2009-10-02 09:35:28 -0230 |
commit | 6d8d16b3761eaa24c9c754dd0f2cc1a70de8fad0 (patch) | |
tree | ff37056717cff166bc705112f54e98074f8d1f40 /cpp/src/Ice/ThreadPool.cpp | |
parent | 3772. Recovering from Glacier2 / Ice router session failure. (diff) | |
download | ice-6d8d16b3761eaa24c9c754dd0f2cc1a70de8fad0.tar.bz2 ice-6d8d16b3761eaa24c9c754dd0f2cc1a70de8fad0.tar.xz ice-6d8d16b3761eaa24c9c754dd0f2cc1a70de8fad0.zip |
C++Builder 2010 support
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 315 |
1 files changed, 151 insertions, 164 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index d5ff021a660..3834d0898dc 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -112,230 +112,217 @@ class ThreadPoolDestroyedException } -namespace IceInternal -{ - -class ThreadPoolWorkQueue : public EventHandler, public IceUtil::Mutex -{ -public: - - ThreadPoolWorkQueue(ThreadPool* threadPool, const InstancePtr& instance, Selector& selector) : - _threadPool(threadPool), - _instance(instance), - _selector(selector), - _destroyed(false) +IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(ThreadPool* threadPool, + const InstancePtr& instance, + Selector& selector) : + _threadPool(threadPool), + _instance(instance), + _selector(selector), + _destroyed(false) #ifdef ICE_USE_IOCP - , _info(SocketOperationRead) + , _info(SocketOperationRead) #endif - { +{ #ifndef ICE_USE_IOCP - SOCKET fds[2]; - createPipe(fds); - _fdIntrRead = fds[0]; - _fdIntrWrite = fds[1]; + SOCKET fds[2]; + createPipe(fds); + _fdIntrRead = fds[0]; + _fdIntrWrite = fds[1]; - _selector.initialize(this); - _selector.update(this, SocketOperationNone, SocketOperationRead); + _selector.initialize(this); + _selector.update(this, SocketOperationNone, SocketOperationRead); #endif - } +} - ~ThreadPoolWorkQueue() - { - assert(_destroyed); +IceInternal::ThreadPoolWorkQueue::~ThreadPoolWorkQueue() +{ + assert(_destroyed); #ifndef ICE_USE_IOCP - try - { - closeSocket(_fdIntrRead); - } - catch(const LocalException& ex) - { - Error out(_instance->initializationData().logger); - out << "exception in selector while calling closeSocket():\n" << ex; - } - - try - { - closeSocket(_fdIntrWrite); - } - catch(const LocalException& ex) - { - Error out(_instance->initializationData().logger); - out << "exception in selector while calling closeSocket():\n" << ex; - } -#endif + try + { + closeSocket(_fdIntrRead); } - - void destroy() + catch(const LocalException& ex) { - Lock sync(*this); - assert(!_destroyed); - _destroyed = true; - postMessage(); + Error out(_instance->initializationData().logger); + out << "exception in selector while calling closeSocket():\n" << ex; + } + + try + { + closeSocket(_fdIntrWrite); } + catch(const LocalException& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in selector while calling closeSocket():\n" << ex; + } +#endif +} + +void +IceInternal::ThreadPoolWorkQueue::destroy() +{ + Lock sync(*this); + assert(!_destroyed); + _destroyed = true; + postMessage(); +} - void queue(const ThreadPoolWorkItemPtr& item) +void +IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item) +{ + Lock sync(*this); + if(_destroyed) { - Lock sync(*this); - if(_destroyed) - { - throw CommunicatorDestroyedException(__FILE__, __LINE__); - } - _workItems.push_back(item); + throw CommunicatorDestroyedException(__FILE__, __LINE__); + } + _workItems.push_back(item); #ifndef ICE_USE_IOCP - if(_workItems.size() == 1) - { - postMessage(); - } -#else + if(_workItems.size() == 1) + { postMessage(); -#endif } +#else + postMessage(); +#endif +} #ifdef ICE_USE_IOCP - bool startAsync(SocketOperation) - { - assert(false); - return false; - } +bool +IceInternal::ThreadPoolWorkQueue::startAsync(SocketOperation) +{ + assert(false); + return false; +} - bool finishAsync(SocketOperation) - { - assert(false); - return false; - } +bool +IceInternal::ThreadPoolWorkQueue::finishAsync(SocketOperation) +{ + assert(false); + return false; +} #endif - virtual void message(ThreadPoolCurrent& current) - { - ThreadPoolWorkItemPtr workItem; - { - Lock sync(*this); - if(!_workItems.empty()) - { - workItem = _workItems.front(); - _workItems.pop_front(); +void +IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current) +{ + ThreadPoolWorkItemPtr workItem; + { + Lock sync(*this); + if(!_workItems.empty()) + { + workItem = _workItems.front(); + _workItems.pop_front(); #ifndef ICE_USE_IOCP - if(_workItems.empty()) + if(_workItems.empty()) + { + char c; + while(true) { - char c; - while(true) - { - ssize_t ret; + ssize_t ret; #ifdef _WIN32 - ret = ::recv(_fdIntrRead, &c, 1, 0); + ret = ::recv(_fdIntrRead, &c, 1, 0); #else - ret = ::read(_fdIntrRead, &c, 1); + ret = ::read(_fdIntrRead, &c, 1); #endif - if(ret == SOCKET_ERROR) + if(ret == SOCKET_ERROR) + { + if(interrupted()) { - if(interrupted()) - { - continue; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; + continue; } - break; + + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; } + break; } -#endif } - else - { - assert(_destroyed); -#ifdef ICE_USE_IOCP - postMessage(); #endif - } - } - - if(workItem) - { - workItem->execute(current); } else { - current.ioCompleted(); - throw ThreadPoolDestroyedException(); + assert(_destroyed); +#ifdef ICE_USE_IOCP + postMessage(); +#endif } } - virtual void finished(ThreadPoolCurrent&) + if(workItem) { - assert(false); + workItem->execute(current); } - - virtual string toString() const + else { - return "work queue"; + current.ioCompleted(); + throw ThreadPoolDestroyedException(); } +} - virtual NativeInfoPtr getNativeInfo() - { +void +IceInternal::ThreadPoolWorkQueue::finished(ThreadPoolCurrent&) +{ + assert(false); +} + +string +IceInternal::ThreadPoolWorkQueue::toString() const +{ + return "work queue"; +} + +NativeInfoPtr +IceInternal::ThreadPoolWorkQueue::getNativeInfo() +{ #ifndef ICE_USE_IOCP - return new NativeInfo(_fdIntrRead); + return new NativeInfo(_fdIntrRead); #endif - return 0; - } + return 0; +} - virtual void postMessage() - { +void +IceInternal::ThreadPoolWorkQueue::postMessage() +{ #ifndef ICE_USE_IOCP - char c = 0; - while(true) - { + char c = 0; + while(true) + { #ifdef _WIN32 - if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR) + if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR) #else - if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR) + if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR) #endif + { + if(interrupted()) { - if(interrupted()) - { - continue; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; + continue; } - break; + + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; } + break; + } #else - if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this), + if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this), #if defined(_MSC_VER) && (_MSC_VER < 1300) // COMPILER FIX: VC60 - reinterpret_cast<LPOVERLAPPED>(&_info) + reinterpret_cast<LPOVERLAPPED>(&_info) #else - &_info -#endif - )) - { - SocketException ex(__FILE__, __LINE__); - ex.error = GetLastError(); - throw ex; - } + &_info #endif + )) + { + SocketException ex(__FILE__, __LINE__); + ex.error = GetLastError(); + throw ex; } - -private: - - const ThreadPool* _threadPool; - const InstancePtr _instance; - Selector& _selector; - bool _destroyed; -#ifdef ICE_USE_IOCP - AsyncInfo _info; -#else - SOCKET _fdIntrRead; - SOCKET _fdIntrWrite; #endif - list<ThreadPoolWorkItemPtr> _workItems; -}; - } IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) : |