summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2012-08-01 09:19:38 +0200
committerBenoit Foucher <benoit@zeroc.com>2012-08-01 09:19:38 +0200
commitfdadca3f4782c4fcc71ab86b091503855447e485 (patch)
treee18ae57cc00c82347daa2e51ccb55eaab8845bdc /cpp/src/Ice/ThreadPool.cpp
parentBefore RequestObserver change (diff)
downloadice-fdadca3f4782c4fcc71ab86b091503855447e485.tar.bz2
ice-fdadca3f4782c4fcc71ab86b091503855447e485.tar.xz
ice-fdadca3f4782c4fcc71ab86b091503855447e485.zip
Fixes
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp88
1 files changed, 68 insertions, 20 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index e1fd5ace7e6..025822a9fa0 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -33,6 +33,7 @@
using namespace std;
using namespace Ice;
+using namespace Ice::Instrumentation;
using namespace IceInternal;
ICE_DECLSPEC_EXPORT IceUtil::Shared* IceInternal::upCast(ThreadPool* p) { return p; }
@@ -498,7 +499,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
{
for(int i = 0 ; i < _size ; ++i)
{
- IceUtil::ThreadPtr thread = new EventHandlerThread(this);
+ EventHandlerThreadPtr thread = new EventHandlerThread(this);
if(_hasPriority)
{
thread->start(_stackSize, _priority);
@@ -545,6 +546,16 @@ IceInternal::ThreadPool::destroy()
}
void
+IceInternal::ThreadPool::updateObservers()
+{
+ Lock sync(*this);
+ for(set<EventHandlerThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p)
+ {
+ (*p)->updateObserver();
+ }
+}
+
+void
IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler)
{
Lock sync(*this);
@@ -606,7 +617,7 @@ IceInternal::ThreadPool::joinWithAllThreads()
// wouldn't be possible here anyway, because otherwise the other
// threads would never terminate.)
//
- for(set<IceUtil::ThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p)
+ for(set<EventHandlerThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p)
{
(*p)->getThreadControl().join();
}
@@ -624,16 +635,17 @@ IceInternal::ThreadPool::prefix() const
}
void
-IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread)
+IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
{
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- ThreadPoolCurrent current(_instance, this);
+ ThreadPoolCurrent current(_instance, this, thread);
bool select = false;
vector<pair<EventHandler*, SocketOperation> > handlers;
while(true)
{
if(current._handler)
{
+ thread->setState(ThreadStateInUseForIO);
try
{
current._handler->message(current);
@@ -648,14 +660,6 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread)
out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: "
<< current._handler->toString();
}
-#ifdef ICE_OS_WINRT
- catch(Platform::Exception^ ex)
- {
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "':\n" << IceUtil::wstringToString(ex->Message->Data())
- << "\nevent handler: " << current._handler->toString();
- }
-#endif
catch(...)
{
Error out(_instance->initializationData().logger);
@@ -664,6 +668,7 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread)
}
else if(select)
{
+ thread->setState(ThreadStateIdle);
try
{
_selector.select(handlers, _serverIdleTime);
@@ -690,7 +695,7 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread)
_selector.finishSelect();
select = false;
}
- else if(!current._leader && followerWait(thread, current))
+ else if(!current._leader && followerWait(current))
{
return; // Wait timed-out.
}
@@ -716,7 +721,7 @@ IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread)
--_inUse;
}
- if(!current._leader && followerWait(thread, current))
+ if(!current._leader && followerWait(current))
{
return; // Wait timed-out.
}
@@ -877,6 +882,8 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
{
current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called.
+ current._thread->setState(ThreadStateInUseForUser);
+
if(_sizeMax > 1)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -924,7 +931,7 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
try
{
- IceUtil::ThreadPtr thread = new EventHandlerThread(this);
+ EventHandlerThreadPtr thread = new EventHandlerThread(this);
if(_hasPriority)
{
thread->start(_stackSize, _priority);
@@ -1049,9 +1056,11 @@ IceInternal::ThreadPool::promoteFollower(ThreadPoolCurrent& current)
}
bool
-IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPoolCurrent& current)
+IceInternal::ThreadPool::followerWait(ThreadPoolCurrent& current)
{
assert(!current._leader);
+
+ current._thread->setState(ThreadStateIdle);
//
// It's important to clear the handler before waiting to make sure that
@@ -1080,8 +1089,8 @@ IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPo
out << "shrinking " << _prefix << ": Size=" << (_threads.size() - 1);
}
assert(_threads.size() > 1); // Can only be called by a waiting follower thread.
- _threads.erase(thread);
- _workQueue->queue(new JoinThreadWorkItem(thread));
+ _threads.erase(current._thread);
+ _workQueue->queue(new JoinThreadWorkItem(current._thread));
return true;
}
}
@@ -1099,8 +1108,39 @@ IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPo
IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPoolPtr& pool) :
IceUtil::Thread(pool->_prefix + " thread"),
- _pool(pool)
+ _pool(pool),
+ _state(Ice::Instrumentation::ThreadStateIdle)
+{
+ updateObserver();
+}
+
+void
+IceInternal::ThreadPool::EventHandlerThread::updateObserver()
+{
+ const ObserverResolverPtr& resolver = _pool->_instance->initializationData().observerResolver;
+ if(resolver)
+ {
+ ostringstream os;
+ os << _pool->_prefix << '-' << this;
+ _observer = resolver->getThreadObserver(_pool->_prefix, os.str(), _state, _observer);
+ if(_observer)
+ {
+ _observer->attach();
+ }
+ }
+}
+
+void
+IceInternal::ThreadPool::EventHandlerThread::setState(Ice::Instrumentation::ThreadState s)
{
+ if(_observer)
+ {
+ if(_state != s)
+ {
+ _observer->stateChanged(_state, s);
+ }
+ }
+ _state = s;
}
void
@@ -1139,6 +1179,11 @@ IceInternal::ThreadPool::EventHandlerThread::run()
out << "unknown exception in `" << _pool->_prefix << "'";
}
+ if(_observer)
+ {
+ _observer->detach();
+ }
+
if(_pool->_instance->initializationData().threadHook)
{
try
@@ -1160,10 +1205,13 @@ IceInternal::ThreadPool::EventHandlerThread::run()
_pool = 0; // Break cyclic dependency.
}
-ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, const ThreadPoolPtr& threadPool) :
+ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance,
+ const ThreadPoolPtr& threadPool,
+ const ThreadPool::EventHandlerThreadPtr& thread) :
operation(SocketOperationNone),
stream(instance.get(), Ice::currentProtocolEncoding),
_threadPool(threadPool.get()),
+ _thread(thread),
_ioCompleted(false)
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
, _leader(false)