summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp1140
1 files changed, 1140 insertions, 0 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
new file mode 100644
index 00000000000..1f9a3342576
--- /dev/null
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -0,0 +1,1140 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+//
+// The following is required for GetThreadIOPendingFlag
+//
+#if defined(_WIN32) && !defined(_WIN32_WINNT)
+# define _WIN32_WINNT 0x0501
+#endif
+
+#include <IceUtil/DisableWarnings.h>
+#include <Ice/ThreadPool.h>
+#include <Ice/EventHandler.h>
+#include <Ice/Network.h>
+#include <Ice/LocalException.h>
+#include <Ice/Instance.h>
+#include <Ice/LoggerUtil.h>
+#include <Ice/Protocol.h>
+#include <Ice/ObjectAdapterFactory.h>
+#include <Ice/Properties.h>
+#include <Ice/TraceLevels.h>
+
+using namespace std;
+using namespace Ice;
+using namespace IceInternal;
+
+ICE_DECLSPEC_EXPORT IceUtil::Shared* IceInternal::upCast(ThreadPool* p) { return p; }
+ICE_DECLSPEC_EXPORT IceUtil::Shared* IceInternal::upCast(ThreadPoolWorkItem* p) { return p; }
+
+namespace
+{
+
+class ShutdownWorkItem : public ThreadPoolWorkItem
+{
+public:
+
+ ShutdownWorkItem(const InstancePtr& instance) : _instance(instance)
+ {
+ }
+
+ virtual void
+ execute(ThreadPoolCurrent& current)
+ {
+ current.ioCompleted();
+ try
+ {
+ _instance->objectAdapterFactory()->shutdown();
+ }
+ catch(const CommunicatorDestroyedException&)
+ {
+ }
+ }
+
+private:
+
+ const InstancePtr _instance;
+};
+
+class FinishedWorkItem : public ThreadPoolWorkItem
+{
+public:
+
+ FinishedWorkItem(const EventHandlerPtr& handler) : _handler(handler)
+ {
+ }
+
+ virtual void
+ execute(ThreadPoolCurrent& current)
+ {
+ _handler->finished(current);
+ }
+
+private:
+
+ const EventHandlerPtr _handler;
+};
+
+class JoinThreadWorkItem : public ThreadPoolWorkItem
+{
+public:
+
+ JoinThreadWorkItem(const IceUtil::ThreadPtr& thread) : _thread(thread)
+ {
+ }
+
+ virtual void
+ execute(ThreadPoolCurrent&)
+ {
+ // No call to ioCompleted, this shouldn't block (and we don't want to cause
+ // a new thread to be started).
+ _thread->getThreadControl().join();
+ }
+
+private:
+
+ IceUtil::ThreadPtr _thread;
+};
+
+//
+// Exception raised by the thread pool work queue when the thread pool
+// is destroyed.
+//
+class ThreadPoolDestroyedException
+{
+};
+
+}
+
+IceInternal::DispatchWorkItem::DispatchWorkItem(const InstancePtr& instance) : _instance(instance)
+{
+}
+
+void
+IceInternal::DispatchWorkItem::execute(ThreadPoolCurrent& current)
+{
+ Ice::DispatcherPtr dispatcher = _instance->initializationData().dispatcher;
+ if(dispatcher)
+ {
+ try
+ {
+ dispatcher->dispatch(this, 0);
+ }
+ catch(const std::exception& ex)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\n" << ex;
+ }
+ }
+ catch(...)
+ {
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "dispatch exception:\nunknown c++ exception";
+ }
+ }
+ }
+ else
+ {
+ current.ioCompleted(); // Promote a follower.
+ run();
+ }
+}
+
+IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(ThreadPool* threadPool,
+ const InstancePtr& instance,
+ Selector& selector) :
+ _threadPool(threadPool),
+ _instance(instance),
+ _selector(selector),
+ _destroyed(false)
+#ifdef ICE_USE_IOCP
+ , _info(SocketOperationRead)
+#endif
+{
+#ifndef ICE_USE_IOCP
+ SOCKET fds[2];
+ createPipe(fds);
+ _fdIntrRead = fds[0];
+ _fdIntrWrite = fds[1];
+
+ _selector.initialize(this);
+ _selector.update(this, SocketOperationNone, SocketOperationRead);
+#endif
+}
+
+IceInternal::ThreadPoolWorkQueue::~ThreadPoolWorkQueue()
+{
+ assert(_destroyed);
+
+#ifndef ICE_USE_IOCP
+ try
+ {
+ closeSocket(_fdIntrRead);
+ }
+ catch(const LocalException& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in selector while calling closeSocket():\n" << ex;
+ }
+
+ try
+ {
+ closeSocket(_fdIntrWrite);
+ }
+ catch(const LocalException& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in selector while calling closeSocket():\n" << ex;
+ }
+#endif
+}
+
+void
+IceInternal::ThreadPoolWorkQueue::destroy()
+{
+ Lock sync(*this);
+ assert(!_destroyed);
+ _destroyed = true;
+ postMessage();
+}
+
+void
+IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item)
+{
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
+ _workItems.push_back(item);
+#ifndef ICE_USE_IOCP
+ if(_workItems.size() == 1)
+ {
+ postMessage();
+ }
+#else
+ postMessage();
+#endif
+}
+
+#ifdef ICE_USE_IOCP
+bool
+IceInternal::ThreadPoolWorkQueue::startAsync(SocketOperation)
+{
+ assert(false);
+ return false;
+}
+
+bool
+IceInternal::ThreadPoolWorkQueue::finishAsync(SocketOperation)
+{
+ assert(false);
+ return false;
+}
+#endif
+
+void
+IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current)
+{
+ ThreadPoolWorkItemPtr workItem;
+ {
+ Lock sync(*this);
+ if(!_workItems.empty())
+ {
+ workItem = _workItems.front();
+ _workItems.pop_front();
+
+#ifndef ICE_USE_IOCP
+ if(_workItems.empty())
+ {
+ char c;
+ while(true)
+ {
+ ssize_t ret;
+#ifdef _WIN32
+ ret = ::recv(_fdIntrRead, &c, 1, 0);
+#else
+ ret = ::read(_fdIntrRead, &c, 1);
+#endif
+ if(ret == SOCKET_ERROR)
+ {
+ if(interrupted())
+ {
+ continue;
+ }
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ break;
+ }
+ }
+#endif
+ }
+ else
+ {
+ assert(_destroyed);
+#ifdef ICE_USE_IOCP
+ postMessage();
+#endif
+ }
+ }
+
+ if(workItem)
+ {
+ workItem->execute(current);
+ }
+ else
+ {
+ current.ioCompleted();
+ throw ThreadPoolDestroyedException();
+ }
+}
+
+void
+IceInternal::ThreadPoolWorkQueue::finished(ThreadPoolCurrent&)
+{
+ assert(false);
+}
+
+string
+IceInternal::ThreadPoolWorkQueue::toString() const
+{
+ return "work queue";
+}
+
+NativeInfoPtr
+IceInternal::ThreadPoolWorkQueue::getNativeInfo()
+{
+#ifndef ICE_USE_IOCP
+ return new NativeInfo(_fdIntrRead);
+#endif
+ return 0;
+}
+
+void
+IceInternal::ThreadPoolWorkQueue::postMessage()
+{
+#ifndef ICE_USE_IOCP
+ char c = 0;
+ while(true)
+ {
+#ifdef _WIN32
+ if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR)
+#else
+ if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR)
+#endif
+ {
+ if(interrupted())
+ {
+ continue;
+ }
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ break;
+ }
+#else
+ if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this),
+#if defined(_MSC_VER) && (_MSC_VER < 1300) // COMPILER FIX: VC60
+ reinterpret_cast<LPOVERLAPPED>(&_info)
+#else
+ &_info
+#endif
+ ))
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = GetLastError();
+ throw ex;
+ }
+#endif
+}
+
+IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) :
+ _instance(instance),
+ _destroyed(false),
+ _prefix(prefix),
+ _selector(instance),
+ _size(0),
+ _sizeIO(0),
+ _sizeMax(0),
+ _sizeWarn(0),
+ _serialize(_instance->initializationData().properties->getPropertyAsInt(_prefix + ".Serialize") > 0),
+ _hasPriority(false),
+ _priority(0),
+ _serverIdleTime(timeout),
+ _threadIdleTime(0),
+ _stackSize(0),
+ _inUse(0),
+#ifndef ICE_USE_IOCP
+ _inUseIO(0),
+ _nextHandler(_handlers.end()),
+#endif
+ _promote(true)
+{
+ PropertiesPtr properties = _instance->initializationData().properties;
+
+#ifdef _WIN32
+ SYSTEM_INFO sysInfo;
+ GetSystemInfo(&sysInfo);
+ int nProcessors = sysInfo.dwNumberOfProcessors;
+#else
+ int nProcessors = static_cast<int>(sysconf(_SC_NPROCESSORS_ONLN));
+#endif
+
+ //
+ // We use just one thread as the default. This is the fastest
+ // possible setting, still allows one level of nesting, and
+ // doesn't require to make the servants thread safe.
+ //
+ int size = properties->getPropertyAsIntWithDefault(_prefix + ".Size", 1);
+ if(size < 1)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << _prefix << ".Size < 1; Size adjusted to 1";
+ size = 1;
+ }
+
+ int sizeMax = properties->getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
+ if(sizeMax == -1)
+ {
+ sizeMax = nProcessors;
+ }
+ if(sizeMax < size)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << _prefix << ".SizeMax < " << _prefix << ".Size; SizeMax adjusted to Size (" << size << ")";
+ sizeMax = size;
+ }
+
+ int sizeWarn = properties->getPropertyAsInt(_prefix + ".SizeWarn");
+ if(sizeWarn != 0 && sizeWarn < size)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << _prefix << ".SizeWarn < " << _prefix << ".Size; adjusted SizeWarn to Size (" << size << ")";
+ sizeWarn = size;
+ }
+ else if(sizeWarn > sizeMax)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << _prefix << ".SizeWarn > " << _prefix << ".SizeMax; adjusted SizeWarn to SizeMax (" << sizeMax << ")";
+ sizeWarn = sizeMax;
+ }
+
+ int threadIdleTime = properties->getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60);
+ if(threadIdleTime < 0)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << _prefix << ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0";
+ threadIdleTime = 0;
+ }
+
+ const_cast<int&>(_size) = size;
+ const_cast<int&>(_sizeMax) = sizeMax;
+ const_cast<int&>(_sizeWarn) = sizeWarn;
+ const_cast<int&>(_sizeIO) = min(sizeMax, nProcessors);
+ const_cast<int&>(_threadIdleTime) = threadIdleTime;
+
+#ifdef ICE_USE_IOCP
+ _selector.setup(_sizeIO);
+#endif
+
+ int stackSize = properties->getPropertyAsInt(_prefix + ".StackSize");
+ if(stackSize < 0)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << _prefix << ".StackSize < 0; Size adjusted to OS default";
+ stackSize = 0;
+ }
+ const_cast<size_t&>(_stackSize) = static_cast<size_t>(stackSize);
+
+ const_cast<bool&>(_hasPriority) = properties->getProperty(_prefix + ".ThreadPriority") != "";
+ const_cast<int&>(_priority) = properties->getPropertyAsInt(_prefix + ".ThreadPriority");
+ if(!_hasPriority)
+ {
+ const_cast<bool&>(_hasPriority) = properties->getProperty("Ice.ThreadPriority") != "";
+ const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority");
+ }
+
+ _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector);
+
+ if(_instance->traceLevels()->threadPool >= 1)
+ {
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
+ out << "creating " << _prefix << ": Size = " << _size << ", SizeMax = " << _sizeMax << ", SizeWarn = "
+ << _sizeWarn;
+ }
+
+ __setNoDelete(true);
+ try
+ {
+ for(int i = 0 ; i < _size ; ++i)
+ {
+ IceUtil::ThreadPtr thread = new EventHandlerThread(this);
+ if(_hasPriority)
+ {
+ thread->start(_stackSize, _priority);
+ }
+ else
+ {
+ thread->start(_stackSize);
+ }
+ _threads.insert(thread);
+ }
+ }
+ catch(const IceUtil::Exception& ex)
+ {
+ {
+ Error out(_instance->initializationData().logger);
+ out << "cannot create thread for `" << _prefix << "':\n" << ex;
+ }
+
+ destroy();
+ joinWithAllThreads();
+ __setNoDelete(false);
+ throw;
+ }
+ catch(...)
+ {
+ __setNoDelete(false);
+ throw;
+ }
+ __setNoDelete(false);
+}
+
+IceInternal::ThreadPool::~ThreadPool()
+{
+ assert(_destroyed);
+}
+
+void
+IceInternal::ThreadPool::destroy()
+{
+ Lock sync(*this);
+ assert(!_destroyed);
+ _destroyed = true;
+ _workQueue->destroy();
+}
+
+void
+IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler)
+{
+ Lock sync(*this);
+ assert(!_destroyed);
+ _selector.initialize(handler.get());
+}
+
+void
+IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation remove, SocketOperation add)
+{
+ Lock sync(*this);
+ assert(!_destroyed);
+ _selector.update(handler.get(), remove, add);
+}
+
+void
+IceInternal::ThreadPool::finish(const EventHandlerPtr& handler)
+{
+ Lock sync(*this);
+ assert(!_destroyed);
+#ifndef ICE_USE_IOCP
+ _selector.finish(handler.get()); // This must be called before!
+ _workQueue->queue(new FinishedWorkItem(handler));
+
+ //
+ // Clear the current ready handlers. The handlers from this vector can't be
+ // reference counted and a handler might get destroyed once it's finished.
+ //
+ _handlers.clear();
+ _nextHandler = _handlers.end();
+#else
+ // If there are no pending asynchronous operations, we can call finish on the handler now.
+ if(!handler->_pending)
+ {
+ _workQueue->queue(new FinishedWorkItem(handler));
+ _selector.finish(handler.get());
+ }
+ else
+ {
+ handler->_finish = true;
+ }
+#endif
+}
+
+void
+IceInternal::ThreadPool::execute(const ThreadPoolWorkItemPtr& workItem)
+{
+ _workQueue->queue(workItem);
+}
+
+void
+IceInternal::ThreadPool::joinWithAllThreads()
+{
+ assert(_destroyed);
+
+ //
+ // _threads is immutable after destroy() has been called,
+ // therefore no synchronization is needed. (Synchronization
+ // wouldn't be possible here anyway, because otherwise the other
+ // threads would never terminate.)
+ //
+ for(set<IceUtil::ThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p)
+ {
+ (*p)->getThreadControl().join();
+ }
+
+#ifndef ICE_USE_IOCP
+ _selector.finish(_workQueue.get());
+#endif
+ _selector.destroy();
+}
+
+string
+IceInternal::ThreadPool::prefix() const
+{
+ return _prefix;
+}
+
+void
+IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread)
+{
+#ifndef ICE_USE_IOCP
+ ThreadPoolCurrent current(_instance, this);
+ bool select = false;
+ vector<pair<EventHandler*, SocketOperation> > handlers;
+ while(true)
+ {
+ if(current._handler)
+ {
+ try
+ {
+ current._handler->message(current);
+ }
+ catch(ThreadPoolDestroyedException&)
+ {
+ return;
+ }
+ catch(const exception& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: "
+ << current._handler->toString();
+ }
+ catch(...)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString();
+ }
+ }
+ else if(select)
+ {
+ try
+ {
+ _selector.select(handlers, _serverIdleTime);
+ }
+ catch(SelectorTimeoutException&)
+ {
+ Lock sync(*this);
+ if(!_destroyed && _inUse == 0)
+ {
+ _workQueue->queue(new ShutdownWorkItem(_instance)); // Select timed-out.
+ }
+ continue;
+ }
+ }
+
+ {
+ Lock sync(*this);
+ if(!current._handler)
+ {
+ if(select)
+ {
+ _handlers.swap(handlers);
+ _nextHandler = _handlers.begin();
+ _selector.finishSelect();
+ select = false;
+ }
+ else if(!current._leader && followerWait(thread, current))
+ {
+ return; // Wait timed-out.
+ }
+ }
+ else if(_sizeMax > 1)
+ {
+ if(!current._ioCompleted)
+ {
+ //
+ // The handler didn't call ioCompleted() so we take care of decreasing
+ // the IO thread count now.
+ //
+ --_inUseIO;
+ }
+ else
+ {
+ //
+ // If the handler called ioCompleted(), we re-enable the handler in
+ // case it was disabled and we decrease the number of thread in use.
+ //
+ _selector.enable(current._handler.get(), current.operation);
+ assert(_inUse > 0);
+ --_inUse;
+ }
+
+ if(!current._leader && followerWait(thread, current))
+ {
+ return; // Wait timed-out.
+ }
+ }
+
+ //
+ // Get the next ready handler.
+ //
+ if(_nextHandler != _handlers.end())
+ {
+ current._ioCompleted = false;
+ current._handler = _nextHandler->first;
+ current.operation = _nextHandler->second;
+ ++_nextHandler;
+ }
+ else
+ {
+ current._handler = 0;
+ }
+
+ if(!current._handler)
+ {
+ //
+ // If there are no more ready handlers and there are still threads busy performing
+ // IO, we give up leadership and promote another follower (which will perform the
+ // select() only once all the IOs are completed). Otherwise, if there are no more
+ // threads peforming IOs, it's time to do another select().
+ //
+ if(_inUseIO > 0)
+ {
+ promoteFollower(current);
+ }
+ else
+ {
+ _selector.startSelect();
+ select = true;
+ }
+ }
+ else if(_sizeMax > 1)
+ {
+ //
+ // Increment the IO thread count and if there are still threads available
+ // to perform IO and more handlers ready, we promote a follower.
+ //
+ ++_inUseIO;
+ if(_nextHandler != _handlers.end() && _inUseIO < _sizeIO)
+ {
+ promoteFollower(current);
+ }
+ }
+ }
+ }
+#else
+ ThreadPoolCurrent current(_instance, this);
+ while(true)
+ {
+ try
+ {
+ current._ioCompleted = false;
+ current._handler = _selector.getNextHandler(current.operation, _threadIdleTime);
+ }
+ catch(const SelectorTimeoutException&)
+ {
+ if(_sizeMax > 1)
+ {
+ Lock sync(*this);
+
+ if(_destroyed)
+ {
+ continue;
+ }
+ else if(_inUse < static_cast<int>(_threads.size() - 1)) // If not the last idle thread, we can exit.
+ {
+ BOOL hasIO = false;
+ GetThreadIOPendingFlag(GetCurrentThread(), &hasIO);
+ if(hasIO)
+ {
+ continue;
+ }
+
+ if(_instance->traceLevels()->threadPool >= 1)
+ {
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
+ out << "shrinking " << _prefix << ": Size = " << (_threads.size() - 1);
+ }
+ _threads.erase(thread);
+ _workQueue->queue(new JoinThreadWorkItem(thread));
+ return;
+ }
+ else if(_inUse > 0)
+ {
+ //
+ // If this is the last idle thread but there are still other threads
+ // busy dispatching, we go back waiting with _threadIdleTime. We only
+ // wait with _serverIdleTime when there's only one thread left.
+ //
+ continue;
+ }
+ assert(_threads.size() == 1);
+ }
+
+ try
+ {
+ current._handler = _selector.getNextHandler(current.operation, _serverIdleTime);
+ }
+ catch(const SelectorTimeoutException&)
+ {
+ Lock sync(*this);
+ if(!_destroyed)
+ {
+ _workQueue->queue(new ShutdownWorkItem(_instance));
+ }
+ continue;
+ }
+ }
+
+ assert(current._handler);
+ try
+ {
+ current._handler->message(current);
+
+ if(_sizeMax > 1 && current._ioCompleted)
+ {
+ Lock sync(*this);
+ assert(_inUse > 0);
+ --_inUse;
+ }
+ }
+ catch(ThreadPoolDestroyedException&)
+ {
+ return;
+ }
+ catch(const exception& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString();
+ }
+ catch(...)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString();
+ }
+ }
+#endif
+}
+
+bool
+IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
+{
+ current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called.
+
+ if(_sizeMax > 1)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+#ifndef ICE_USE_IOCP
+ --_inUseIO;
+
+ if(_serialize && !_destroyed)
+ {
+ _selector.disable(current._handler.get(), current.operation);
+ }
+
+ if(current._leader)
+ {
+ //
+ // If this thread is still the leader, it's time to promote a new leader.
+ //
+ promoteFollower(current);
+ }
+ else if(_promote && (_nextHandler != _handlers.end() || _inUseIO == 0))
+ {
+ notify();
+ }
+#endif
+
+ assert(_inUse >= 0);
+ ++_inUse;
+
+ if(_inUse == _sizeWarn)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "thread pool `" << _prefix << "' is running low on threads\n"
+ << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
+ }
+
+ if(!_destroyed)
+ {
+ assert(_inUse <= static_cast<int>(_threads.size()));
+ if(_inUse < _sizeMax && _inUse == static_cast<int>(_threads.size()))
+ {
+ if(_instance->traceLevels()->threadPool >= 1)
+ {
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
+ out << "growing " << _prefix << ": Size=" << _threads.size() + 1;
+ }
+
+ try
+ {
+ IceUtil::ThreadPtr thread = new EventHandlerThread(this);
+ if(_hasPriority)
+ {
+ thread->start(_stackSize, _priority);
+ }
+ else
+ {
+ thread->start(_stackSize);
+ }
+ _threads.insert(thread);
+ }
+ catch(const IceUtil::Exception& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "cannot create thread for `" << _prefix << "':\n" << ex;
+ }
+ }
+ }
+ }
+
+ return _serialize;
+}
+
+#ifdef ICE_USE_IOCP
+bool
+IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
+{
+ assert(current._handler->_pending & current.operation);
+
+ if(current._handler->_started & current.operation)
+ {
+ assert(!(current._handler->_ready & current.operation));
+ current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready | current.operation);
+ current._handler->_started = static_cast<SocketOperation>(current._handler->_started & ~current.operation);
+ if(!current._handler->finishAsync(current.operation)) // Returns false if the handler is finished.
+ {
+ current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
+ if(!current._handler->_pending && current._handler->_finish)
+ {
+ _workQueue->queue(new FinishedWorkItem(current._handler));
+ _selector.finish(current._handler.get());
+ }
+ return false;
+ }
+ }
+ else if(!(current._handler->_ready & current.operation) && (current._handler->_registered & current.operation))
+ {
+ assert(!(current._handler->_started & current.operation));
+ if(!current._handler->startAsync(current.operation))
+ {
+ current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
+ if(!current._handler->_pending && current._handler->_finish)
+ {
+ _workQueue->queue(new FinishedWorkItem(current._handler));
+ _selector.finish(current._handler.get());
+ }
+ return false;
+ }
+ else
+ {
+ current._handler->_started = static_cast<SocketOperation>(current._handler->_started | current.operation);
+ return false;
+ }
+ }
+
+ if(current._handler->_registered & current.operation)
+ {
+ assert(current._handler->_ready & current.operation);
+ current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready & ~current.operation);
+ return true;
+ }
+ else
+ {
+ current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
+ if(!current._handler->_pending && current._handler->_finish)
+ {
+ _workQueue->queue(new FinishedWorkItem(current._handler));
+ _selector.finish(current._handler.get());
+ }
+ return false;
+ }
+}
+
+void
+IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current)
+{
+ if(current._handler->_registered & current.operation)
+ {
+ assert(!(current._handler->_ready & current.operation));
+ if(!current._handler->startAsync(current.operation))
+ {
+ current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
+ }
+ else
+ {
+ assert(current._handler->_pending & current.operation);
+ current._handler->_started = static_cast<SocketOperation>(current._handler->_started | current.operation);
+ }
+ }
+ else
+ {
+ current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
+ }
+
+ if(!current._handler->_pending && current._handler->_finish)
+ {
+ // There are no more pending async operations, it's time to call finish.
+ _workQueue->queue(new FinishedWorkItem(current._handler));
+ _selector.finish(current._handler.get());
+ }
+}
+#else
+void
+IceInternal::ThreadPool::promoteFollower(ThreadPoolCurrent& current)
+{
+ assert(!_promote && current._leader);
+ _promote = true;
+ if(_inUseIO < _sizeIO && (_nextHandler != _handlers.end() || _inUseIO == 0))
+ {
+ notify();
+ }
+ current._leader = false;
+}
+
+bool
+IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPoolCurrent& current)
+{
+ assert(!current._leader);
+
+ //
+ // It's important to clear the handler before waiting to make sure that
+ // resources for the handler are released now if it's finished. We also
+ // clear the per-thread stream.
+ //
+ current._handler = 0;
+ current.stream.clear();
+ current.stream.b.clear();
+
+ //
+ // Wait to be promoted and for all the IO threads to be done.
+ //
+ while(!_promote || _inUseIO == _sizeIO || (_nextHandler == _handlers.end() && _inUseIO > 0))
+ {
+ if(_threadIdleTime)
+ {
+ if(!timedWait(IceUtil::Time::seconds(_threadIdleTime)))
+ {
+ if(!_destroyed && (!_promote || _inUseIO == _sizeIO ||
+ (_nextHandler == _handlers.end() && _inUseIO > 0)))
+ {
+ if(_instance->traceLevels()->threadPool >= 1)
+ {
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
+ out << "shrinking " << _prefix << ": Size=" << (_threads.size() - 1);
+ }
+ assert(_threads.size() > 1); // Can only be called by a waiting follower thread.
+ _threads.erase(thread);
+ _workQueue->queue(new JoinThreadWorkItem(thread));
+ return true;
+ }
+ }
+ }
+ else
+ {
+ wait();
+ }
+ }
+ current._leader = true; // The current thread has become the leader.
+ _promote = false;
+ return false;
+}
+#endif
+
+IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPoolPtr& pool) :
+ IceUtil::Thread(pool->_prefix + " thread"),
+ _pool(pool)
+{
+}
+
+void
+IceInternal::ThreadPool::EventHandlerThread::run()
+{
+ if(_pool->_instance->initializationData().threadHook)
+ {
+ try
+ {
+ _pool->_instance->initializationData().threadHook->start();
+ }
+ catch(const exception& ex)
+ {
+ Error out(_pool->_instance->initializationData().logger);
+ out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex;
+ }
+ catch(...)
+ {
+ Error out(_pool->_instance->initializationData().logger);
+ out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "'";
+ }
+ }
+
+ try
+ {
+ _pool->run(this);
+ }
+ catch(const exception& ex)
+ {
+ Error out(_pool->_instance->initializationData().logger);
+ out << "exception in `" << _pool->_prefix << "':\n" << ex;
+ }
+ catch(...)
+ {
+ Error out(_pool->_instance->initializationData().logger);
+ out << "unknown exception in `" << _pool->_prefix << "'";
+ }
+
+ if(_pool->_instance->initializationData().threadHook)
+ {
+ try
+ {
+ _pool->_instance->initializationData().threadHook->stop();
+ }
+ catch(const exception& ex)
+ {
+ Error out(_pool->_instance->initializationData().logger);
+ out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex;
+ }
+ catch(...)
+ {
+ Error out(_pool->_instance->initializationData().logger);
+ out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "'";
+ }
+ }
+
+ _pool = 0; // Break cyclic dependency.
+}
+
+ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, const ThreadPoolPtr& threadPool) :
+ operation(SocketOperationNone),
+ stream(instance.get()),
+ _threadPool(threadPool.get()),
+ _ioCompleted(false)
+#ifndef ICE_USE_IOCP
+ , _leader(false)
+#endif
+{
+}