diff options
author | Benoit Foucher <benoit@zeroc.com> | 2012-10-08 15:03:46 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2012-10-08 15:03:46 +0200 |
commit | d36ec7c740d5dfaa8e4ce2a2df6c9cb0818f01ae (patch) | |
tree | 7abffd29e98ff112cec85b658fab404961f8306a /cpp/src/Ice/ThreadPool.cpp | |
parent | FreeBSD port (diff) | |
parent | Win32 fixes (diff) | |
download | ice-d36ec7c740d5dfaa8e4ce2a2df6c9cb0818f01ae.tar.bz2 ice-d36ec7c740d5dfaa8e4ce2a2df6c9cb0818f01ae.tar.xz ice-d36ec7c740d5dfaa8e4ce2a2df6c9cb0818f01ae.zip |
Merge branch 'mx' into encoding11
Conflicts:
cpp/demo/Freeze/backup/.depend
cpp/demo/Freeze/backup/.depend.mak
cpp/demo/Freeze/bench/.depend
cpp/demo/Freeze/bench/.depend.mak
cpp/demo/Freeze/casino/.depend
cpp/demo/Freeze/casino/.depend.mak
cpp/demo/Freeze/customEvictor/.depend
cpp/demo/Freeze/customEvictor/.depend.mak
cpp/demo/Freeze/library/.depend
cpp/demo/Freeze/library/.depend.mak
cpp/demo/Freeze/phonebook/.depend
cpp/demo/Freeze/phonebook/.depend.mak
cpp/demo/Freeze/transform/.depend
cpp/demo/Freeze/transform/.depend.mak
cpp/demo/Glacier2/callback/.depend
cpp/demo/Glacier2/callback/.depend.mak
cpp/demo/Glacier2/chat/.depend
cpp/demo/Glacier2/chat/.depend.mak
cpp/demo/Ice/async/.depend
cpp/demo/Ice/async/.depend.mak
cpp/demo/Ice/bidir/.depend
cpp/demo/Ice/bidir/.depend.mak
cpp/demo/Ice/callback/.depend
cpp/demo/Ice/callback/.depend.mak
cpp/demo/Ice/converter/.depend
cpp/demo/Ice/converter/.depend.mak
cpp/demo/Ice/hello/.depend
cpp/demo/Ice/hello/.depend.mak
cpp/demo/Ice/interleaved/.depend
cpp/demo/Ice/interleaved/.depend.mak
cpp/demo/Ice/invoke/.depend
cpp/demo/Ice/invoke/.depend.mak
cpp/demo/Ice/latency/.depend
cpp/demo/Ice/latency/.depend.mak
cpp/demo/Ice/minimal/.depend
cpp/demo/Ice/minimal/.depend.mak
cpp/demo/Ice/multicast/.depend
cpp/demo/Ice/multicast/.depend.mak
cpp/demo/Ice/nested/.depend
cpp/demo/Ice/nested/.depend.mak
cpp/demo/Ice/nrvo/.depend
cpp/demo/Ice/nrvo/.depend.mak
cpp/demo/Ice/plugin/.depend
cpp/demo/Ice/plugin/.depend.mak
cpp/demo/Ice/session/.depend
cpp/demo/Ice/session/.depend.mak
cpp/demo/Ice/throughput/.depend
cpp/demo/Ice/throughput/.depend.mak
cpp/demo/Ice/value/.depend
cpp/demo/Ice/value/.depend.mak
cpp/demo/IceBox/hello/.depend
cpp/demo/IceBox/hello/.depend.mak
cpp/demo/IceGrid/allocate/.depend
cpp/demo/IceGrid/allocate/.depend.mak
cpp/demo/IceGrid/icebox/.depend
cpp/demo/IceGrid/icebox/.depend.mak
cpp/demo/IceGrid/replication/.depend
cpp/demo/IceGrid/replication/.depend.mak
cpp/demo/IceGrid/secure/.depend
cpp/demo/IceGrid/secure/.depend.mak
cpp/demo/IceGrid/sessionActivation/.depend
cpp/demo/IceGrid/sessionActivation/.depend.mak
cpp/demo/IceGrid/simple/.depend
cpp/demo/IceGrid/simple/.depend.mak
cpp/demo/IceStorm/clock/.depend
cpp/demo/IceStorm/clock/.depend.mak
cpp/demo/IceStorm/counter/.depend
cpp/demo/IceStorm/counter/.depend.mak
cpp/demo/IceStorm/replicated/.depend
cpp/demo/IceStorm/replicated/.depend.mak
cpp/demo/IceStorm/replicated2/.depend
cpp/demo/IceStorm/replicated2/.depend.mak
cpp/demo/book/evictor_filesystem/.depend
cpp/demo/book/evictor_filesystem/.depend.mak
cpp/demo/book/lifecycle/.depend
cpp/demo/book/lifecycle/.depend.mak
cpp/demo/book/map_filesystem/.depend
cpp/demo/book/map_filesystem/.depend.mak
cpp/demo/book/printer/.depend
cpp/demo/book/printer/.depend.mak
cpp/demo/book/simple_filesystem/.depend
cpp/demo/book/simple_filesystem/.depend.mak
cpp/include/Ice/Outgoing.h
cpp/include/Ice/OutgoingAsync.h
cpp/include/Ice/StreamTraits.h
cpp/src/Freeze/.depend
cpp/src/Freeze/.depend.mak
cpp/src/FreezeScript/.depend
cpp/src/FreezeScript/.depend.mak
cpp/src/Glacier2/.depend
cpp/src/Glacier2/.depend.mak
cpp/src/Glacier2Lib/.depend
cpp/src/Glacier2Lib/.depend.mak
cpp/src/Ice/.depend
cpp/src/Ice/.depend.mak
cpp/src/IceBox/.depend
cpp/src/IceBox/.depend.mak
cpp/src/IceDB/.depend
cpp/src/IceDB/.depend.mak
cpp/src/IceGrid/.depend
cpp/src/IceGrid/.depend.mak
cpp/src/IceGrid/FreezeDB/.depend
cpp/src/IceGrid/FreezeDB/.depend.mak
cpp/src/IceGrid/ServerCache.h
cpp/src/IceGrid/ServerI.h
cpp/src/IceGridLib/.depend
cpp/src/IceGridLib/.depend.mak
cpp/src/IcePatch2/.depend
cpp/src/IcePatch2/.depend.mak
cpp/src/IcePatch2Lib/.depend
cpp/src/IcePatch2Lib/.depend.mak
cpp/src/IceSSL/.depend
cpp/src/IceSSL/.depend.mak
cpp/src/IceStorm/.depend
cpp/src/IceStorm/.depend.mak
cpp/src/IceStorm/FreezeDB/.depend
cpp/src/IceStorm/FreezeDB/.depend.mak
cpp/src/IceStormLib/.depend
cpp/src/IceStormLib/.depend.mak
cpp/src/slice2cpp/Gen.cpp
cpp/test/Freeze/complex/.depend
cpp/test/Freeze/complex/.depend.mak
cpp/test/Freeze/dbmap/.depend
cpp/test/Freeze/dbmap/.depend.mak
cpp/test/Freeze/evictor/.depend
cpp/test/Freeze/evictor/.depend.mak
cpp/test/Freeze/fileLock/.depend
cpp/test/Freeze/fileLock/.depend.mak
cpp/test/FreezeScript/dbmap/.depend
cpp/test/FreezeScript/dbmap/.depend.mak
cpp/test/FreezeScript/evictor/.depend
cpp/test/FreezeScript/evictor/.depend.mak
cpp/test/Glacier2/attack/.depend
cpp/test/Glacier2/attack/.depend.mak
cpp/test/Glacier2/dynamicFiltering/.depend
cpp/test/Glacier2/dynamicFiltering/.depend.mak
cpp/test/Glacier2/override/.depend
cpp/test/Glacier2/override/.depend.mak
cpp/test/Glacier2/router/.depend
cpp/test/Glacier2/router/.depend.mak
cpp/test/Glacier2/sessionControl/.depend
cpp/test/Glacier2/sessionControl/.depend.mak
cpp/test/Glacier2/sessionHelper/.depend
cpp/test/Glacier2/sessionHelper/.depend.mak
cpp/test/Glacier2/ssl/.depend
cpp/test/Glacier2/ssl/.depend.mak
cpp/test/Glacier2/staticFiltering/.depend
cpp/test/Glacier2/staticFiltering/.depend.mak
cpp/test/Ice/adapterDeactivation/.depend
cpp/test/Ice/adapterDeactivation/.depend.mak
cpp/test/Ice/ami/.depend
cpp/test/Ice/ami/.depend.mak
cpp/test/Ice/background/.depend
cpp/test/Ice/background/.depend.mak
cpp/test/Ice/binding/.depend
cpp/test/Ice/binding/.depend.mak
cpp/test/Ice/checksum/.depend
cpp/test/Ice/checksum/.depend.mak
cpp/test/Ice/checksum/server/.depend
cpp/test/Ice/checksum/server/.depend.mak
cpp/test/Ice/custom/.depend
cpp/test/Ice/custom/.depend.mak
cpp/test/Ice/defaultServant/.depend
cpp/test/Ice/defaultServant/.depend.mak
cpp/test/Ice/defaultValue/.depend
cpp/test/Ice/defaultValue/.depend.mak
cpp/test/Ice/dispatcher/.depend
cpp/test/Ice/dispatcher/.depend.mak
cpp/test/Ice/exceptions/.depend
cpp/test/Ice/exceptions/.depend.mak
cpp/test/Ice/facets/.depend
cpp/test/Ice/facets/.depend.mak
cpp/test/Ice/faultTolerance/.depend
cpp/test/Ice/faultTolerance/.depend.mak
cpp/test/Ice/gc/.depend
cpp/test/Ice/gc/.depend.mak
cpp/test/Ice/hash/.depend
cpp/test/Ice/hash/.depend.mak
cpp/test/Ice/hold/.depend
cpp/test/Ice/hold/.depend.mak
cpp/test/Ice/info/.depend
cpp/test/Ice/info/.depend.mak
cpp/test/Ice/inheritance/.depend
cpp/test/Ice/inheritance/.depend.mak
cpp/test/Ice/interceptor/.depend
cpp/test/Ice/interceptor/.depend.mak
cpp/test/Ice/invoke/.depend
cpp/test/Ice/invoke/.depend.mak
cpp/test/Ice/location/.depend
cpp/test/Ice/location/.depend.mak
cpp/test/Ice/objects/.depend
cpp/test/Ice/objects/.depend.mak
cpp/test/Ice/operations/.depend
cpp/test/Ice/operations/.depend.mak
cpp/test/Ice/plugin/.depend
cpp/test/Ice/plugin/.depend.mak
cpp/test/Ice/properties/.depend
cpp/test/Ice/properties/.depend.mak
cpp/test/Ice/proxy/.depend
cpp/test/Ice/proxy/.depend.mak
cpp/test/Ice/retry/.depend
cpp/test/Ice/retry/.depend.mak
cpp/test/Ice/servantLocator/.depend
cpp/test/Ice/servantLocator/.depend.mak
cpp/test/Ice/slicing/exceptions/.depend
cpp/test/Ice/slicing/exceptions/.depend.mak
cpp/test/Ice/slicing/objects/.depend
cpp/test/Ice/slicing/objects/.depend.mak
cpp/test/Ice/stream/.depend
cpp/test/Ice/stream/.depend.mak
cpp/test/Ice/stringConverter/.depend
cpp/test/Ice/stringConverter/.depend.mak
cpp/test/Ice/threadPoolPriority/.depend
cpp/test/Ice/threadPoolPriority/.depend.mak
cpp/test/Ice/timeout/.depend
cpp/test/Ice/timeout/.depend.mak
cpp/test/Ice/udp/.depend
cpp/test/Ice/udp/.depend.mak
cpp/test/IceBox/configuration/.depend
cpp/test/IceBox/configuration/.depend.mak
cpp/test/IceGrid/activation/.depend
cpp/test/IceGrid/activation/.depend.mak
cpp/test/IceGrid/admin/.depend
cpp/test/IceGrid/admin/.depend.mak
cpp/test/IceGrid/allocation/.depend
cpp/test/IceGrid/allocation/.depend.mak
cpp/test/IceGrid/deployer/.depend
cpp/test/IceGrid/deployer/.depend.mak
cpp/test/IceGrid/distribution/.depend
cpp/test/IceGrid/distribution/.depend.mak
cpp/test/IceGrid/replicaGroup/.depend
cpp/test/IceGrid/replicaGroup/.depend.mak
cpp/test/IceGrid/replication/.depend
cpp/test/IceGrid/replication/.depend.mak
cpp/test/IceGrid/session/.depend
cpp/test/IceGrid/session/.depend.mak
cpp/test/IceGrid/simple/.depend
cpp/test/IceGrid/simple/.depend.mak
cpp/test/IceGrid/update/.depend
cpp/test/IceGrid/update/.depend.mak
cpp/test/IceSSL/configuration/.depend
cpp/test/IceSSL/configuration/.depend.mak
cpp/test/IceStorm/federation/.depend
cpp/test/IceStorm/federation/.depend.mak
cpp/test/IceStorm/federation2/.depend
cpp/test/IceStorm/federation2/.depend.mak
cpp/test/IceStorm/rep1/.depend
cpp/test/IceStorm/rep1/.depend.mak
cpp/test/IceStorm/repgrid/.depend
cpp/test/IceStorm/repgrid/.depend.mak
cpp/test/IceStorm/repstress/.depend
cpp/test/IceStorm/repstress/.depend.mak
cpp/test/IceStorm/single/.depend
cpp/test/IceStorm/single/.depend.mak
cpp/test/IceStorm/stress/.depend
cpp/test/IceStorm/stress/.depend.mak
cpp/test/Slice/keyword/.depend
cpp/test/Slice/keyword/.depend.mak
cpp/test/Slice/parser/.depend
cpp/test/Slice/parser/.depend.mak
cpp/test/Slice/structure/.depend
cpp/test/Slice/structure/.depend.mak
py/modules/IcePy/.depend
py/modules/IcePy/.depend.mak
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 94 |
1 files changed, 71 insertions, 23 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 733adb3df1e..d6114a45071 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -33,6 +33,7 @@ using namespace std; using namespace Ice; +using namespace Ice::Instrumentation; using namespace IceInternal; ICE_DECLSPEC_EXPORT IceUtil::Shared* IceInternal::upCast(ThreadPool* p) { return p; } @@ -370,6 +371,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _destroyed(false), _prefix(prefix), _selector(instance), + _nextThreadId(0), _size(0), _sizeIO(0), _sizeMax(0), @@ -492,7 +494,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p { for(int i = 0 ; i < _size ; ++i) { - IceUtil::ThreadPtr thread = new EventHandlerThread(this); + EventHandlerThreadPtr thread = new EventHandlerThread(this, nextThreadId()); if(_hasPriority) { thread->start(_stackSize, _priority); @@ -539,6 +541,16 @@ IceInternal::ThreadPool::destroy() } void +IceInternal::ThreadPool::updateObservers() +{ + Lock sync(*this); + for(set<EventHandlerThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p) + { + (*p)->updateObserver(); + } +} + +void IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler) { Lock sync(*this); @@ -600,7 +612,7 @@ IceInternal::ThreadPool::joinWithAllThreads() // wouldn't be possible here anyway, because otherwise the other // threads would never terminate.) // - for(set<IceUtil::ThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p) + for(set<EventHandlerThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p) { (*p)->getThreadControl().join(); } @@ -618,16 +630,17 @@ IceInternal::ThreadPool::prefix() const } void -IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) +IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) { #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - ThreadPoolCurrent current(_instance, this); + ThreadPoolCurrent current(_instance, this, thread); bool select = false; vector<pair<EventHandler*, SocketOperation> > handlers; while(true) { if(current._handler) { + thread->setState(ThreadStateInUseForIO); try { current._handler->message(current); @@ -642,14 +655,6 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString(); } -#ifdef ICE_OS_WINRT - catch(Platform::Exception^ ex) - { - Error out(_instance->initializationData().logger); - out << "exception in `" << _prefix << "':\n" << IceUtil::wstringToString(ex->Message->Data()) - << "\nevent handler: " << current._handler->toString(); - } -#endif catch(...) { Error out(_instance->initializationData().logger); @@ -658,6 +663,7 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) } else if(select) { + thread->setState(ThreadStateIdle); try { _selector.select(handlers, _serverIdleTime); @@ -684,7 +690,7 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) _selector.finishSelect(); select = false; } - else if(!current._leader && followerWait(thread, current)) + else if(!current._leader && followerWait(current)) { return; // Wait timed-out. } @@ -710,7 +716,7 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) --_inUse; } - if(!current._leader && followerWait(thread, current)) + if(!current._leader && followerWait(current)) { return; // Wait timed-out. } @@ -764,7 +770,7 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread) } } #else - ThreadPoolCurrent current(_instance, this); + ThreadPoolCurrent current(_instance, this, thread); while(true) { try @@ -871,6 +877,8 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) { current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called. + current._thread->setState(ThreadStateInUseForUser); + if(_sizeMax > 1) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -918,7 +926,7 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) try { - IceUtil::ThreadPtr thread = new EventHandlerThread(this); + EventHandlerThreadPtr thread = new EventHandlerThread(this, nextThreadId()); if(_hasPriority) { thread->start(_stackSize, _priority); @@ -1043,9 +1051,11 @@ IceInternal::ThreadPool::promoteFollower(ThreadPoolCurrent& current) } bool -IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPoolCurrent& current) +IceInternal::ThreadPool::followerWait(ThreadPoolCurrent& current) { assert(!current._leader); + + current._thread->setState(ThreadStateIdle); // // It's important to clear the handler before waiting to make sure that @@ -1074,8 +1084,8 @@ IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPo out << "shrinking " << _prefix << ": Size=" << (_threads.size() - 1); } assert(_threads.size() > 1); // Can only be called by a waiting follower thread. - _threads.erase(thread); - _workQueue->queue(new JoinThreadWorkItem(thread)); + _threads.erase(current._thread); + _workQueue->queue(new JoinThreadWorkItem(current._thread)); return true; } } @@ -1091,10 +1101,43 @@ IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPo } #endif -IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPoolPtr& pool) : - IceUtil::Thread(pool->_prefix + " thread"), - _pool(pool) +string +IceInternal::ThreadPool::nextThreadId() { + ostringstream os; + os << _prefix << "-" << _nextThreadId++; + return os.str(); +} + +IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPoolPtr& pool, const string& name) : + IceUtil::Thread(name), + _pool(pool), + _state(Ice::Instrumentation::ThreadStateIdle) +{ + updateObserver(); +} + +void +IceInternal::ThreadPool::EventHandlerThread::updateObserver() +{ + const CommunicatorObserverPtr& obsv = _pool->_instance->initializationData().observer; + if(obsv) + { + _observer.attach(obsv->getThreadObserver(_pool->_prefix, name(), _state, _observer.get())); + } +} + +void +IceInternal::ThreadPool::EventHandlerThread::setState(Ice::Instrumentation::ThreadState s) +{ + if(_observer) + { + if(_state != s) + { + _observer->stateChanged(_state, s); + } + } + _state = s; } void @@ -1133,6 +1176,8 @@ IceInternal::ThreadPool::EventHandlerThread::run() out << "unknown exception in `" << _pool->_prefix << "'"; } + _observer.detach(); + if(_pool->_instance->initializationData().threadHook) { try @@ -1154,10 +1199,13 @@ IceInternal::ThreadPool::EventHandlerThread::run() _pool = 0; // Break cyclic dependency. } -ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, const ThreadPoolPtr& threadPool) : +ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, + const ThreadPoolPtr& threadPool, + const ThreadPool::EventHandlerThreadPtr& thread) : operation(SocketOperationNone), stream(instance.get(), Ice::currentProtocolEncoding), _threadPool(threadPool.get()), + _thread(thread), _ioCompleted(false) #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) , _leader(false) |