// ********************************************************************** // // Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. // // ********************************************************************** #include #include #include #include #include #include #include #include #include #include #include #if defined(ICE_OS_WINRT) # include #endif using namespace std; using namespace Ice; using namespace IceInternal; ICE_DECLSPEC_EXPORT IceUtil::Shared* IceInternal::upCast(ThreadPool* p) { return p; } ICE_DECLSPEC_EXPORT IceUtil::Shared* IceInternal::upCast(ThreadPoolWorkItem* p) { return p; } namespace { class ShutdownWorkItem : public ThreadPoolWorkItem { public: ShutdownWorkItem(const InstancePtr& instance) : _instance(instance) { } virtual void execute(ThreadPoolCurrent& current) { current.ioCompleted(); try { _instance->objectAdapterFactory()->shutdown(); } catch(const CommunicatorDestroyedException&) { } } private: const InstancePtr _instance; }; class FinishedWorkItem : public ThreadPoolWorkItem { public: FinishedWorkItem(const EventHandlerPtr& handler) : _handler(handler) { } virtual void execute(ThreadPoolCurrent& current) { _handler->finished(current); } private: const EventHandlerPtr _handler; }; class JoinThreadWorkItem : public ThreadPoolWorkItem { public: JoinThreadWorkItem(const IceUtil::ThreadPtr& thread) : _thread(thread) { } virtual void execute(ThreadPoolCurrent&) { // No call to ioCompleted, this shouldn't block (and we don't want to cause // a new thread to be started). _thread->getThreadControl().join(); } private: IceUtil::ThreadPtr _thread; }; // // Exception raised by the thread pool work queue when the thread pool // is destroyed. // class ThreadPoolDestroyedException { }; } IceInternal::DispatchWorkItem::DispatchWorkItem(const InstancePtr& instance) : _instance(instance) { } void IceInternal::DispatchWorkItem::execute(ThreadPoolCurrent& current) { Ice::DispatcherPtr dispatcher = _instance->initializationData().dispatcher; if(dispatcher) { try { dispatcher->dispatch(this, 0); } catch(const std::exception& ex) { if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) { Warning out(_instance->initializationData().logger); out << "dispatch exception:\n" << ex; } } catch(...) { if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) { Warning out(_instance->initializationData().logger); out << "dispatch exception:\nunknown c++ exception"; } } } else { current.ioCompleted(); // Promote a follower. run(); } } IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(ThreadPool* threadPool, const InstancePtr& instance, Selector& selector) : _threadPool(threadPool), _instance(instance), _selector(selector), _destroyed(false) #ifdef ICE_USE_IOCP , _info(SocketOperationRead) #endif { #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) SOCKET fds[2]; createPipe(fds); _fdIntrRead = fds[0]; _fdIntrWrite = fds[1]; _selector.initialize(this); _selector.update(this, SocketOperationNone, SocketOperationRead); #endif } IceInternal::ThreadPoolWorkQueue::~ThreadPoolWorkQueue() { assert(_destroyed); #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) try { closeSocket(_fdIntrRead); } catch(const LocalException& ex) { Error out(_instance->initializationData().logger); out << "exception in selector while calling closeSocket():\n" << ex; } try { closeSocket(_fdIntrWrite); } catch(const LocalException& ex) { Error out(_instance->initializationData().logger); out << "exception in selector while calling closeSocket():\n" << ex; } #endif } void IceInternal::ThreadPoolWorkQueue::destroy() { Lock sync(*this); assert(!_destroyed); _destroyed = true; postMessage(); } void IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item) { Lock sync(*this); if(_destroyed) { throw CommunicatorDestroyedException(__FILE__, __LINE__); } _workItems.push_back(item); #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) if(_workItems.size() == 1) { postMessage(); } #else postMessage(); #endif } #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) bool IceInternal::ThreadPoolWorkQueue::startAsync(SocketOperation) { assert(false); return false; } bool IceInternal::ThreadPoolWorkQueue::finishAsync(SocketOperation) { assert(false); return false; } #endif void IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current) { ThreadPoolWorkItemPtr workItem; { Lock sync(*this); if(!_workItems.empty()) { workItem = _workItems.front(); _workItems.pop_front(); #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) if(_workItems.empty()) { char c; while(true) { ssize_t ret; # ifdef _WIN32 ret = ::recv(_fdIntrRead, &c, 1, 0); # else ret = ::read(_fdIntrRead, &c, 1); # endif if(ret == SOCKET_ERROR) { if(interrupted()) { continue; } SocketException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); throw ex; } break; } } #endif } else { assert(_destroyed); #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) postMessage(); #endif } } if(workItem) { workItem->execute(current); } else { current.ioCompleted(); throw ThreadPoolDestroyedException(); } } void IceInternal::ThreadPoolWorkQueue::finished(ThreadPoolCurrent&) { assert(false); } string IceInternal::ThreadPoolWorkQueue::toString() const { return "work queue"; } NativeInfoPtr IceInternal::ThreadPoolWorkQueue::getNativeInfo() { #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) return new NativeInfo(_fdIntrRead); #else return 0; #endif } void IceInternal::ThreadPoolWorkQueue::postMessage() { #if defined(ICE_USE_IOCP) if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast(this), &_info)) { SocketException ex(__FILE__, __LINE__); ex.error = GetLastError(); throw ex; } #elif defined(ICE_OS_WINRT) _selector.completed(this, SocketOperationRead); #else char c = 0; while(true) { # ifdef _WIN32 if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR) # else if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR) # endif { if(interrupted()) { continue; } SocketException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); throw ex; } break; } #endif } IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) : _instance(instance), _destroyed(false), _prefix(prefix), _selector(instance), _size(0), _sizeIO(0), _sizeMax(0), _sizeWarn(0), _serialize(_instance->initializationData().properties->getPropertyAsInt(_prefix + ".Serialize") > 0), _hasPriority(false), _priority(0), _serverIdleTime(timeout), _threadIdleTime(0), _stackSize(0), _inUse(0), #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) _inUseIO(0), _nextHandler(_handlers.end()), #endif _promote(true) { PropertiesPtr properties = _instance->initializationData().properties; #ifndef ICE_OS_WINRT # ifdef _WIN32 SYSTEM_INFO sysInfo; GetSystemInfo(&sysInfo); int nProcessors = sysInfo.dwNumberOfProcessors; # else int nProcessors = static_cast(sysconf(_SC_NPROCESSORS_ONLN)); # endif #endif // // We use just one thread as the default. This is the fastest // possible setting, still allows one level of nesting, and // doesn't require to make the servants thread safe. // int size = properties->getPropertyAsIntWithDefault(_prefix + ".Size", 1); if(size < 1) { Warning out(_instance->initializationData().logger); out << _prefix << ".Size < 1; Size adjusted to 1"; size = 1; } int sizeMax = properties->getPropertyAsIntWithDefault(_prefix + ".SizeMax", size); #ifndef ICE_OS_WINRT if(sizeMax == -1) { sizeMax = nProcessors; } #endif if(sizeMax < size) { Warning out(_instance->initializationData().logger); out << _prefix << ".SizeMax < " << _prefix << ".Size; SizeMax adjusted to Size (" << size << ")"; sizeMax = size; } int sizeWarn = properties->getPropertyAsInt(_prefix + ".SizeWarn"); if(sizeWarn != 0 && sizeWarn < size) { Warning out(_instance->initializationData().logger); out << _prefix << ".SizeWarn < " << _prefix << ".Size; adjusted SizeWarn to Size (" << size << ")"; sizeWarn = size; } else if(sizeWarn > sizeMax) { Warning out(_instance->initializationData().logger); out << _prefix << ".SizeWarn > " << _prefix << ".SizeMax; adjusted SizeWarn to SizeMax (" << sizeMax << ")"; sizeWarn = sizeMax; } int threadIdleTime = properties->getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60); if(threadIdleTime < 0) { Warning out(_instance->initializationData().logger); out << _prefix << ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0"; threadIdleTime = 0; } const_cast(_size) = size; const_cast(_sizeMax) = sizeMax; const_cast(_sizeWarn) = sizeWarn; #ifndef ICE_OS_WINRT const_cast(_sizeIO) = min(sizeMax, nProcessors); #else const_cast(_sizeIO) = sizeMax; #endif const_cast(_threadIdleTime) = threadIdleTime; #ifdef ICE_USE_IOCP _selector.setup(_sizeIO); #endif int stackSize = properties->getPropertyAsInt(_prefix + ".StackSize"); if(stackSize < 0) { Warning out(_instance->initializationData().logger); out << _prefix << ".StackSize < 0; Size adjusted to OS default"; stackSize = 0; } const_cast(_stackSize) = static_cast(stackSize); const_cast(_hasPriority) = properties->getProperty(_prefix + ".ThreadPriority") != ""; const_cast(_priority) = properties->getPropertyAsInt(_prefix + ".ThreadPriority"); if(!_hasPriority) { const_cast(_hasPriority) = properties->getProperty("Ice.ThreadPriority") != ""; const_cast(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority"); } _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector); if(_instance->traceLevels()->threadPool >= 1) { Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); out << "creating " << _prefix << ": Size = " << _size << ", SizeMax = " << _sizeMax << ", SizeWarn = " << _sizeWarn; } __setNoDelete(true); try { for(int i = 0 ; i < _size ; ++i) { IceUtil::ThreadPtr thread = new EventHandlerThread(this); if(_hasPriority) { thread->start(_stackSize, _priority); } else { thread->start(_stackSize); } _threads.insert(thread); } } catch(const IceUtil::Exception& ex) { { Error out(_instance->initializationData().logger); out << "cannot create thread for `" << _prefix << "':\n" << ex; } destroy(); joinWithAllThreads(); __setNoDelete(false); throw; } catch(...) { __setNoDelete(false); throw; } __setNoDelete(false); } IceInternal::ThreadPool::~ThreadPool() { assert(_destroyed); } void IceInternal::ThreadPool::destroy() { Lock sync(*this); assert(!_destroyed); _destroyed = true; _workQueue->destroy(); } void IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler) { Lock sync(*this); assert(!_destroyed); _selector.initialize(handler.get()); } void IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation remove, SocketOperation add) { Lock sync(*this); assert(!_destroyed); _selector.update(handler.get(), remove, add); } void IceInternal::ThreadPool::finish(const EventHandlerPtr& handler) { Lock sync(*this); assert(!_destroyed); #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) _selector.finish(handler.get()); // This must be called before! _workQueue->queue(new FinishedWorkItem(handler)); // // Clear the current ready handlers. The handlers from this vector can't be // reference counted and a handler might get destroyed once it's finished. // _handlers.clear(); _nextHandler = _handlers.end(); #else // If there are no pending asynchronous operations, we can call finish on the handler now. if(!handler->_pending) { _workQueue->queue(new FinishedWorkItem(handler)); _selector.finish(handler.get()); } else { handler->_finish = true; } #endif } void IceInternal::ThreadPool::execute(const ThreadPoolWorkItemPtr& workItem) { _workQueue->queue(workItem); } void IceInternal::ThreadPool::joinWithAllThreads() { assert(_destroyed); // // _threads is immutable after destroy() has been called, // therefore no synchronization is needed. (Synchronization // wouldn't be possible here anyway, because otherwise the other // threads would never terminate.) // for(set::iterator p = _threads.begin(); p != _threads.end(); ++p) { (*p)->getThreadControl().join(); } #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) _selector.finish(_workQueue.get()); #endif _selector.destroy(); } string IceInternal::ThreadPool::prefix() const { return _prefix; } void IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) { #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) ThreadPoolCurrent current(_instance, this); bool select = false; vector > handlers; while(true) { if(current._handler) { try { current._handler->message(current); } catch(ThreadPoolDestroyedException&) { return; } catch(const exception& ex) { Error out(_instance->initializationData().logger); out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString(); } #ifdef ICE_OS_WINRT catch(Platform::Exception^ ex) { Error out(_instance->initializationData().logger); out << "exception in `" << _prefix << "':\n" << IceUtil::wstringToString(ex->Message->Data()) << "\nevent handler: " << current._handler->toString(); } #endif catch(...) { Error out(_instance->initializationData().logger); out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString(); } } else if(select) { try { _selector.select(handlers, _serverIdleTime); } catch(SelectorTimeoutException&) { Lock sync(*this); if(!_destroyed && _inUse == 0) { _workQueue->queue(new ShutdownWorkItem(_instance)); // Select timed-out. } continue; } } { Lock sync(*this); if(!current._handler) { if(select) { _handlers.swap(handlers); _nextHandler = _handlers.begin(); _selector.finishSelect(); select = false; } else if(!current._leader && followerWait(thread, current)) { return; // Wait timed-out. } } else if(_sizeMax > 1) { if(!current._ioCompleted) { // // The handler didn't call ioCompleted() so we take care of decreasing // the IO thread count now. // --_inUseIO; } else { // // If the handler called ioCompleted(), we re-enable the handler in // case it was disabled and we decrease the number of thread in use. // _selector.enable(current._handler.get(), current.operation); assert(_inUse > 0); --_inUse; } if(!current._leader && followerWait(thread, current)) { return; // Wait timed-out. } } // // Get the next ready handler. // if(_nextHandler != _handlers.end()) { current._ioCompleted = false; current._handler = _nextHandler->first; current.operation = _nextHandler->second; ++_nextHandler; } else { current._handler = 0; } if(!current._handler) { // // If there are no more ready handlers and there are still threads busy performing // IO, we give up leadership and promote another follower (which will perform the // select() only once all the IOs are completed). Otherwise, if there are no more // threads peforming IOs, it's time to do another select(). // if(_inUseIO > 0) { promoteFollower(current); } else { _selector.startSelect(); select = true; } } else if(_sizeMax > 1) { // // Increment the IO thread count and if there are still threads available // to perform IO and more handlers ready, we promote a follower. // ++_inUseIO; if(_nextHandler != _handlers.end() && _inUseIO < _sizeIO) { promoteFollower(current); } } } } #else ThreadPoolCurrent current(_instance, this); while(true) { try { current._ioCompleted = false; current._handler = _selector.getNextHandler(current.operation, _threadIdleTime); } catch(const SelectorTimeoutException&) { if(_sizeMax > 1) { Lock sync(*this); if(_destroyed) { continue; } else if(_inUse < static_cast(_threads.size() - 1)) // If not the last idle thread, we can exit. { #ifndef ICE_OS_WINRT BOOL hasIO = false; GetThreadIOPendingFlag(GetCurrentThread(), &hasIO); if(hasIO) { continue; } #endif if(_instance->traceLevels()->threadPool >= 1) { Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); out << "shrinking " << _prefix << ": Size = " << (_threads.size() - 1); } _threads.erase(thread); _workQueue->queue(new JoinThreadWorkItem(thread)); return; } else if(_inUse > 0) { // // If this is the last idle thread but there are still other threads // busy dispatching, we go back waiting with _threadIdleTime. We only // wait with _serverIdleTime when there's only one thread left. // continue; } assert(_threads.size() == 1); } try { current._handler = _selector.getNextHandler(current.operation, _serverIdleTime); } catch(const SelectorTimeoutException&) { Lock sync(*this); if(!_destroyed) { _workQueue->queue(new ShutdownWorkItem(_instance)); } continue; } } assert(current._handler); try { current._handler->message(current); if(_sizeMax > 1 && current._ioCompleted) { Lock sync(*this); assert(_inUse > 0); --_inUse; } } catch(ThreadPoolDestroyedException&) { return; } catch(const exception& ex) { Error out(_instance->initializationData().logger); out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString(); } #ifdef ICE_OS_WINRT catch(Platform::Exception^ ex) { Error out(_instance->initializationData().logger); out << "exception in `" << _prefix << "':\n" << IceUtil::wstringToString(ex->Message->Data()) << "\nevent handler: " << current._handler->toString(); } #endif catch(...) { Error out(_instance->initializationData().logger); out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString(); } } #endif } bool IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) { current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called. if(_sizeMax > 1) { IceUtil::Monitor::Lock sync(*this); #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) --_inUseIO; if(_serialize && !_destroyed) { _selector.disable(current._handler.get(), current.operation); } if(current._leader) { // // If this thread is still the leader, it's time to promote a new leader. // promoteFollower(current); } else if(_promote && (_nextHandler != _handlers.end() || _inUseIO == 0)) { notify(); } #endif assert(_inUse >= 0); ++_inUse; if(_inUse == _sizeWarn) { Warning out(_instance->initializationData().logger); out << "thread pool `" << _prefix << "' is running low on threads\n" << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn; } if(!_destroyed) { assert(_inUse <= static_cast(_threads.size())); if(_inUse < _sizeMax && _inUse == static_cast(_threads.size())) { if(_instance->traceLevels()->threadPool >= 1) { Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); out << "growing " << _prefix << ": Size=" << _threads.size() + 1; } try { IceUtil::ThreadPtr thread = new EventHandlerThread(this); if(_hasPriority) { thread->start(_stackSize, _priority); } else { thread->start(_stackSize); } _threads.insert(thread); } catch(const IceUtil::Exception& ex) { Error out(_instance->initializationData().logger); out << "cannot create thread for `" << _prefix << "':\n" << ex; } } } } return _serialize; } #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) bool IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) { assert(current._handler->_pending & current.operation); if(current._handler->_started & current.operation) { assert(!(current._handler->_ready & current.operation)); current._handler->_ready = static_cast(current._handler->_ready | current.operation); current._handler->_started = static_cast(current._handler->_started & ~current.operation); if(!current._handler->finishAsync(current.operation)) // Returns false if the handler is finished. { current._handler->_pending = static_cast(current._handler->_pending & ~current.operation); if(!current._handler->_pending && current._handler->_finish) { _workQueue->queue(new FinishedWorkItem(current._handler)); _selector.finish(current._handler.get()); } return false; } } else if(!(current._handler->_ready & current.operation) && (current._handler->_registered & current.operation)) { assert(!(current._handler->_started & current.operation)); if(!current._handler->startAsync(current.operation)) { current._handler->_pending = static_cast(current._handler->_pending & ~current.operation); if(!current._handler->_pending && current._handler->_finish) { _workQueue->queue(new FinishedWorkItem(current._handler)); _selector.finish(current._handler.get()); } return false; } else { current._handler->_started = static_cast(current._handler->_started | current.operation); return false; } } if(current._handler->_registered & current.operation) { assert(current._handler->_ready & current.operation); current._handler->_ready = static_cast(current._handler->_ready & ~current.operation); return true; } else { current._handler->_pending = static_cast(current._handler->_pending & ~current.operation); if(!current._handler->_pending && current._handler->_finish) { _workQueue->queue(new FinishedWorkItem(current._handler)); _selector.finish(current._handler.get()); } return false; } } void IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current) { if(current._handler->_registered & current.operation) { assert(!(current._handler->_ready & current.operation)); if(!current._handler->startAsync(current.operation)) { current._handler->_pending = static_cast(current._handler->_pending & ~current.operation); } else { assert(current._handler->_pending & current.operation); current._handler->_started = static_cast(current._handler->_started | current.operation); } } else { current._handler->_pending = static_cast(current._handler->_pending & ~current.operation); } if(!current._handler->_pending && current._handler->_finish) { // There are no more pending async operations, it's time to call finish. _workQueue->queue(new FinishedWorkItem(current._handler)); _selector.finish(current._handler.get()); } } #else void IceInternal::ThreadPool::promoteFollower(ThreadPoolCurrent& current) { assert(!_promote && current._leader); _promote = true; if(_inUseIO < _sizeIO && (_nextHandler != _handlers.end() || _inUseIO == 0)) { notify(); } current._leader = false; } bool IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPoolCurrent& current) { assert(!current._leader); // // It's important to clear the handler before waiting to make sure that // resources for the handler are released now if it's finished. We also // clear the per-thread stream. // current._handler = 0; current.stream.clear(); current.stream.b.clear(); // // Wait to be promoted and for all the IO threads to be done. // while(!_promote || _inUseIO == _sizeIO || (_nextHandler == _handlers.end() && _inUseIO > 0)) { if(_threadIdleTime) { if(!timedWait(IceUtil::Time::seconds(_threadIdleTime))) { if(!_destroyed && (!_promote || _inUseIO == _sizeIO || (_nextHandler == _handlers.end() && _inUseIO > 0))) { if(_instance->traceLevels()->threadPool >= 1) { Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat); out << "shrinking " << _prefix << ": Size=" << (_threads.size() - 1); } assert(_threads.size() > 1); // Can only be called by a waiting follower thread. _threads.erase(thread); _workQueue->queue(new JoinThreadWorkItem(thread)); return true; } } } else { wait(); } } current._leader = true; // The current thread has become the leader. _promote = false; return false; } #endif IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPoolPtr& pool) : IceUtil::Thread(pool->_prefix + " thread"), _pool(pool) { } void IceInternal::ThreadPool::EventHandlerThread::run() { if(_pool->_instance->initializationData().threadHook) { try { _pool->_instance->initializationData().threadHook->start(); } catch(const exception& ex) { Error out(_pool->_instance->initializationData().logger); out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex; } catch(...) { Error out(_pool->_instance->initializationData().logger); out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "'"; } } try { _pool->run(this); } catch(const exception& ex) { Error out(_pool->_instance->initializationData().logger); out << "exception in `" << _pool->_prefix << "':\n" << ex; } catch(...) { Error out(_pool->_instance->initializationData().logger); out << "unknown exception in `" << _pool->_prefix << "'"; } if(_pool->_instance->initializationData().threadHook) { try { _pool->_instance->initializationData().threadHook->stop(); } catch(const exception& ex) { Error out(_pool->_instance->initializationData().logger); out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex; } catch(...) { Error out(_pool->_instance->initializationData().logger); out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "'"; } } _pool = 0; // Break cyclic dependency. } ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, const ThreadPoolPtr& threadPool) : operation(SocketOperationNone), stream(instance.get(), Ice::currentProtocolEncoding), _threadPool(threadPool.get()), _ioCompleted(false) #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) , _leader(false) #endif { }