diff options
author | Benoit Foucher <benoit@zeroc.com> | 2012-08-01 09:19:38 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2012-08-01 09:19:38 +0200 |
commit | fdadca3f4782c4fcc71ab86b091503855447e485 (patch) | |
tree | e18ae57cc00c82347daa2e51ccb55eaab8845bdc /cpp/src/Ice/ThreadPool.cpp | |
parent | Before RequestObserver change (diff) | |
download | ice-fdadca3f4782c4fcc71ab86b091503855447e485.tar.bz2 ice-fdadca3f4782c4fcc71ab86b091503855447e485.tar.xz ice-fdadca3f4782c4fcc71ab86b091503855447e485.zip |
Fixes
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 88 |
1 files changed, 68 insertions, 20 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index e1fd5ace7e6..025822a9fa0 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -33,6 +33,7 @@ using namespace std; using namespace Ice; +using namespace Ice::Instrumentation; using namespace IceInternal; ICE_DECLSPEC_EXPORT IceUtil::Shared* IceInternal::upCast(ThreadPool* p) { return p; } @@ -498,7 +499,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p { for(int i = 0 ; i < _size ; ++i) { - IceUtil::ThreadPtr thread = new EventHandlerThread(this); + EventHandlerThreadPtr thread = new EventHandlerThread(this); if(_hasPriority) { thread->start(_stackSize, _priority); @@ -545,6 +546,16 @@ IceInternal::ThreadPool::destroy() } void +IceInternal::ThreadPool::updateObservers() +{ + Lock sync(*this); + for(set<EventHandlerThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p) + { + (*p)->updateObserver(); + } +} + +void IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler) { Lock sync(*this); @@ -606,7 +617,7 @@ IceInternal::ThreadPool::joinWithAllThreads() // wouldn't be possible here anyway, because otherwise the other // threads would never terminate.) // - for(set<IceUtil::ThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p) + for(set<EventHandlerThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p) { (*p)->getThreadControl().join(); } @@ -624,16 +635,17 @@ IceInternal::ThreadPool::prefix() const } void -IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) +IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) { #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - ThreadPoolCurrent current(_instance, this); + ThreadPoolCurrent current(_instance, this, thread); bool select = false; vector<pair<EventHandler*, SocketOperation> > handlers; while(true) { if(current._handler) { + thread->setState(ThreadStateInUseForIO); try { current._handler->message(current); @@ -648,14 +660,6 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString(); } -#ifdef ICE_OS_WINRT - catch(Platform::Exception^ ex) - { - Error out(_instance->initializationData().logger); - out << "exception in `" << _prefix << "':\n" << IceUtil::wstringToString(ex->Message->Data()) - << "\nevent handler: " << current._handler->toString(); - } -#endif catch(...) { Error out(_instance->initializationData().logger); @@ -664,6 +668,7 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) } else if(select) { + thread->setState(ThreadStateIdle); try { _selector.select(handlers, _serverIdleTime); @@ -690,7 +695,7 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) _selector.finishSelect(); select = false; } - else if(!current._leader && followerWait(thread, current)) + else if(!current._leader && followerWait(current)) { return; // Wait timed-out. } @@ -716,7 +721,7 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) --_inUse; } - if(!current._leader && followerWait(thread, current)) + if(!current._leader && followerWait(current)) { return; // Wait timed-out. } @@ -877,6 +882,8 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) { current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called. + current._thread->setState(ThreadStateInUseForUser); + if(_sizeMax > 1) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -924,7 +931,7 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) try { - IceUtil::ThreadPtr thread = new EventHandlerThread(this); + EventHandlerThreadPtr thread = new EventHandlerThread(this); if(_hasPriority) { thread->start(_stackSize, _priority); @@ -1049,9 +1056,11 @@ IceInternal::ThreadPool::promoteFollower(ThreadPoolCurrent& current) } bool -IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPoolCurrent& current) +IceInternal::ThreadPool::followerWait(ThreadPoolCurrent& current) { assert(!current._leader); + + current._thread->setState(ThreadStateIdle); // // It's important to clear the handler before waiting to make sure that @@ -1080,8 +1089,8 @@ IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPo out << "shrinking " << _prefix << ": Size=" << (_threads.size() - 1); } assert(_threads.size() > 1); // Can only be called by a waiting follower thread. - _threads.erase(thread); - _workQueue->queue(new JoinThreadWorkItem(thread)); + _threads.erase(current._thread); + _workQueue->queue(new JoinThreadWorkItem(current._thread)); return true; } } @@ -1099,8 +1108,39 @@ IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPo IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPoolPtr& pool) : IceUtil::Thread(pool->_prefix + " thread"), - _pool(pool) + _pool(pool), + _state(Ice::Instrumentation::ThreadStateIdle) +{ + updateObserver(); +} + +void +IceInternal::ThreadPool::EventHandlerThread::updateObserver() +{ + const ObserverResolverPtr& resolver = _pool->_instance->initializationData().observerResolver; + if(resolver) + { + ostringstream os; + os << _pool->_prefix << '-' << this; + _observer = resolver->getThreadObserver(_pool->_prefix, os.str(), _state, _observer); + if(_observer) + { + _observer->attach(); + } + } +} + +void +IceInternal::ThreadPool::EventHandlerThread::setState(Ice::Instrumentation::ThreadState s) { + if(_observer) + { + if(_state != s) + { + _observer->stateChanged(_state, s); + } + } + _state = s; } void @@ -1139,6 +1179,11 @@ IceInternal::ThreadPool::EventHandlerThread::run() out << "unknown exception in `" << _pool->_prefix << "'"; } + if(_observer) + { + _observer->detach(); + } + if(_pool->_instance->initializationData().threadHook) { try @@ -1160,10 +1205,13 @@ IceInternal::ThreadPool::EventHandlerThread::run() _pool = 0; // Break cyclic dependency. } -ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, const ThreadPoolPtr& threadPool) : +ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, + const ThreadPoolPtr& threadPool, + const ThreadPool::EventHandlerThreadPtr& thread) : operation(SocketOperationNone), stream(instance.get(), Ice::currentProtocolEncoding), _threadPool(threadPool.get()), + _thread(thread), _ioCompleted(false) #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) , _leader(false) |