diff options
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 83 |
1 files changed, 41 insertions, 42 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index b2e7883c325..f7c25cc20f4 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -8,7 +8,6 @@ // ********************************************************************** #include <Ice/ThreadPool.h> -#include <IceUtil/DisableWarnings.h> #include <Ice/EventHandler.h> #include <Ice/Network.h> #include <Ice/LocalException.h> @@ -40,7 +39,7 @@ public: ShutdownWorkItem(const InstancePtr& instance) : _instance(instance) { } - + virtual void execute(ThreadPoolCurrent& current) { @@ -51,9 +50,9 @@ public: } catch(const CommunicatorDestroyedException&) { - } + } } - + private: const InstancePtr _instance; @@ -66,13 +65,13 @@ public: FinishedWorkItem(const EventHandlerPtr& handler, bool close) : _handler(handler), _close(close) { } - + virtual void execute(ThreadPoolCurrent& current) { _handler->finished(current, _close); } - + private: const EventHandlerPtr _handler; @@ -133,11 +132,11 @@ class ThreadPoolDestroyedException } -IceInternal::DispatchWorkItem::DispatchWorkItem() +IceInternal::DispatchWorkItem::DispatchWorkItem() { } -IceInternal::DispatchWorkItem::DispatchWorkItem(const Ice::ConnectionPtr& connection) : _connection(connection) +IceInternal::DispatchWorkItem::DispatchWorkItem(const Ice::ConnectionPtr& connection) : _connection(connection) { } @@ -152,7 +151,7 @@ IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(const InstancePtr& instanc _instance(instance), _selector(selector), _destroyed(false) -#ifdef ICE_USE_IOCP +#ifdef ICE_USE_IOCP , _info(SocketOperationRead) #endif { @@ -181,7 +180,7 @@ IceInternal::ThreadPoolWorkQueue::~ThreadPoolWorkQueue() Error out(_instance->initializationData().logger); out << "exception in selector while calling closeSocket():\n" << ex; } - + try { closeSocket(_fdIntrWrite); @@ -242,7 +241,7 @@ void IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current) { ThreadPoolWorkItemPtr workItem; - { + { Lock sync(*this); if(!_workItems.empty()) { @@ -267,7 +266,7 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current) { continue; } - + SocketException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); throw ex; @@ -345,7 +344,7 @@ IceInternal::ThreadPoolWorkQueue::postMessage() { continue; } - + SocketException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); throw ex; @@ -387,7 +386,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p int nProcessors = sysInfo.dwNumberOfProcessors; # else int nProcessors = static_cast<int>(sysconf(_SC_NPROCESSORS_ONLN)); -# endif +# endif #endif // @@ -416,7 +415,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p out << _prefix << ".SizeMax < " << _prefix << ".Size; SizeMax adjusted to Size (" << size << ")"; sizeMax = size; } - + int sizeWarn = properties->getPropertyAsInt(_prefix + ".SizeWarn"); if(sizeWarn != 0 && sizeWarn < size) { @@ -461,7 +460,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p stackSize = 0; } const_cast<size_t&>(_stackSize) = static_cast<size_t>(stackSize); - + const_cast<bool&>(_hasPriority) = properties->getProperty(_prefix + ".ThreadPriority") != ""; const_cast<int&>(_priority) = properties->getPropertyAsInt(_prefix + ".ThreadPriority"); if(!_hasPriority) @@ -469,7 +468,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p const_cast<bool&>(_hasPriority) = properties->getProperty("Ice.ThreadPriority") != ""; const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority"); } - + _workQueue = new ThreadPoolWorkQueue(_instance, _selector); if(_instance->traceLevels()->threadPool >= 1) @@ -525,11 +524,11 @@ void IceInternal::ThreadPool::destroy() { Lock sync(*this); - if(_destroyed) + if(_destroyed) { return; } - + _destroyed = true; _workQueue->destroy(); } @@ -568,7 +567,7 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation { return; } - + _selector.update(handler.get(), remove, add); #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) if(add & SocketOperationRead && handler->_hasMoreData && !(handler->_disabled & SocketOperationRead)) @@ -640,7 +639,7 @@ IceInternal::ThreadPool::dispatchFromThisThread(const DispatchWorkItemPtr& workI else { workItem->run(); - } + } } void @@ -699,7 +698,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) catch(const exception& ex) { Error out(_instance->initializationData().logger); - out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " + out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString(); } catch(...) @@ -759,7 +758,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) if(!current._ioCompleted) { // - // The handler didn't call ioCompleted() so we take care of decreasing + // The handler didn't call ioCompleted() so we take care of decreasing // the IO thread count now. // --_inUseIO; @@ -831,9 +830,9 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) if(!current._handler) { // - // If there are no more ready handlers and there are still threads busy performing + // If there are no more ready handlers and there are still threads busy performing // IO, we give up leadership and promote another follower (which will perform the - // select() only once all the IOs are completed). Otherwise, if there are no more + // select() only once all the IOs are completed). Otherwise, if there are no more // threads peforming IOs, it's time to do another select(). // if(_inUseIO > 0) @@ -881,7 +880,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) if(_sizeMax > 1) { Lock sync(*this); - + if(_destroyed) { continue; @@ -904,14 +903,14 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) _threads.erase(thread); _workQueue->queue(new JoinThreadWorkItem(thread)); return; - } + } else if(_inUse > 0) { // // If this is the last idle thread but there are still other threads // busy dispatching, we go back waiting with _threadIdleTime. We only // wait with _serverIdleTime when there's only one thread left. - // + // continue; } assert(_threads.size() == 1); @@ -964,7 +963,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) // because the wide string is using the platform default encoding. // Error out(_instance->initializationData().logger); - out << "exception in `" << _prefix << "':\n" + out << "exception in `" << _prefix << "':\n" << IceUtil::wstringToString(ex->Message->Data(), _instance->getStringConverter()) << "\nevent handler: " << current._handler->toString(); } @@ -999,7 +998,7 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) if(_sizeMax > 1) { - + #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) --_inUseIO; @@ -1039,16 +1038,16 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) assert(_inUse >= 0); ++_inUse; - + if(_inUse == _sizeWarn) { Warning out(_instance->initializationData().logger); out << "thread pool `" << _prefix << "' is running low on threads\n" << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn; } - + if(!_destroyed) - { + { assert(_inUse <= static_cast<int>(_threads.size())); if(_inUse < _sizeMax && _inUse == static_cast<int>(_threads.size())) { @@ -1057,7 +1056,7 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); out << "growing " << _prefix << ": Size=" << _threads.size() + 1; } - + try { EventHandlerThreadPtr thread = new EventHandlerThread(this, nextThreadId()); @@ -1195,7 +1194,7 @@ bool IceInternal::ThreadPool::followerWait(ThreadPoolCurrent& current) { assert(!current._leader); - + current._thread->setState(ThreadStateIdle); // @@ -1216,7 +1215,7 @@ IceInternal::ThreadPool::followerWait(ThreadPoolCurrent& current) { if(!timedWait(IceUtil::Time::seconds(_threadIdleTime))) { - if(!_destroyed && (!_promote || _inUseIO == _sizeIO || + if(!_destroyed && (!_promote || _inUseIO == _sizeIO || (_nextHandler == _handlers.end() && _inUseIO > 0))) { if(_instance->traceLevels()->threadPool >= 1) @@ -1246,7 +1245,7 @@ string IceInternal::ThreadPool::nextThreadId() { ostringstream os; - os << _prefix << "-" << _nextThreadId++; + os << _prefix << "-" << _nextThreadId++; return os.str(); } @@ -1316,7 +1315,7 @@ IceInternal::ThreadPool::EventHandlerThread::run() catch(...) { Error out(_pool->_instance->initializationData().logger); - out << "unknown exception in `" << _pool->_prefix << "'"; + out << "unknown exception in `" << _pool->_prefix << "'"; } _observer.detach(); @@ -1342,12 +1341,12 @@ IceInternal::ThreadPool::EventHandlerThread::run() _pool = 0; // Break cyclic dependency. } -ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, +ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, const ThreadPoolPtr& threadPool, const ThreadPool::EventHandlerThreadPtr& thread) : - operation(SocketOperationNone), - stream(instance.get(), Ice::currentProtocolEncoding), - _threadPool(threadPool.get()), + operation(SocketOperationNone), + stream(instance.get(), Ice::currentProtocolEncoding), + _threadPool(threadPool.get()), _thread(thread), _ioCompleted(false) #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) |