summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2009-08-21 15:55:01 +0200
committerBenoit Foucher <benoit@zeroc.com>2009-08-21 15:55:01 +0200
commitb9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a (patch)
tree183215e2dbeadfbc871b800ce09726e58af38b91 /cpp/src/Ice/ThreadPool.cpp
parentadding compression cookbook demo (diff)
downloadice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.bz2
ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.xz
ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.zip
IOCP changes, bug 3501, 4200, 4156, 3101
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp1364
1 files changed, 785 insertions, 579 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 835b28b3c08..d74b0222ddc 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -7,6 +7,13 @@
//
// **********************************************************************
+//
+// The following is required for GetThreadIOPendingFlag
+//
+#if defined(_WIN32) && !defined(_WIN32_WINNT)
+# define _WIN32_WINNT 0x0501
+#endif
+
#include <IceUtil/DisableWarnings.h>
#include <Ice/ThreadPool.h>
#include <Ice/EventHandler.h>
@@ -14,7 +21,6 @@
#include <Ice/LocalException.h>
#include <Ice/Instance.h>
#include <Ice/LoggerUtil.h>
-#include <Ice/Functional.h>
#include <Ice/Protocol.h>
#include <Ice/ObjectAdapterFactory.h>
#include <Ice/Properties.h>
@@ -25,47 +31,358 @@ using namespace Ice;
using namespace IceInternal;
ICE_DECLSPEC_EXPORT IceUtil::Shared* IceInternal::upCast(ThreadPool* p) { return p; }
+ICE_DECLSPEC_EXPORT IceUtil::Shared* IceInternal::upCast(ThreadPoolWorkItem* p) { return p; }
+
+namespace
+{
+
+class ShutdownWorkItem : public ThreadPoolWorkItem
+{
+public:
+
+ ShutdownWorkItem(const InstancePtr& instance) : _instance(instance)
+ {
+ }
+
+ virtual void
+ execute(ThreadPoolCurrent& current)
+ {
+ current.ioCompleted();
+ try
+ {
+ _instance->objectAdapterFactory()->shutdown();
+ }
+ catch(const CommunicatorDestroyedException&)
+ {
+ }
+ }
+
+private:
+
+ const InstancePtr _instance;
+};
+
+class FinishedWorkItem : public ThreadPoolWorkItem
+{
+public:
+
+ FinishedWorkItem(const EventHandlerPtr& handler) : _handler(handler)
+ {
+ }
+
+ virtual void
+ execute(ThreadPoolCurrent& current)
+ {
+ _handler->finished(current);
+ }
+
+private:
+
+ const EventHandlerPtr _handler;
+};
+
+class JoinThreadWorkItem : public ThreadPoolWorkItem
+{
+public:
+
+ JoinThreadWorkItem(const IceUtil::ThreadPtr& thread) : _thread(thread)
+ {
+ }
+
+ virtual void
+ execute(ThreadPoolCurrent&)
+ {
+ // No call to ioCompleted, this shouldn't block (and we don't want to cause
+ // a new thread to be started).
+ _thread->getThreadControl().join();
+ }
+
+private:
+
+ IceUtil::ThreadPtr _thread;
+};
+
+//
+// Exception raised by the thread pool work queue when the thread pool
+// is destroyed.
+//
+class ThreadPoolDestroyedException
+{
+};
+
+}
+
+namespace IceInternal
+{
+
+class ThreadPoolWorkQueue : public EventHandler, public IceUtil::Mutex
+{
+public:
+
+ ThreadPoolWorkQueue(ThreadPool* threadPool, const InstancePtr& instance, Selector& selector) :
+ _threadPool(threadPool),
+ _instance(instance),
+ _selector(selector),
+ _destroyed(false)
+#ifdef ICE_USE_IOCP
+ , _info(SocketOperationRead)
+#endif
+ {
+#ifndef ICE_USE_IOCP
+ SOCKET fds[2];
+ createPipe(fds);
+ _fdIntrRead = fds[0];
+ _fdIntrWrite = fds[1];
+
+ _selector.initialize(this);
+ _selector.update(this, SocketOperationNone, SocketOperationRead);
+#endif
+ }
+
+ ~ThreadPoolWorkQueue()
+ {
+ assert(_destroyed);
+
+#ifndef ICE_USE_IOCP
+ try
+ {
+ closeSocket(_fdIntrRead);
+ }
+ catch(const LocalException& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in selector while calling closeSocket():\n" << ex;
+ }
+
+ try
+ {
+ closeSocket(_fdIntrWrite);
+ }
+ catch(const LocalException& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in selector while calling closeSocket():\n" << ex;
+ }
+#endif
+ }
+
+ void destroy()
+ {
+ Lock sync(*this);
+ assert(!_destroyed);
+ _destroyed = true;
+ postMessage();
+ }
+
+ void queue(const ThreadPoolWorkItemPtr& item)
+ {
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
+ _workItems.push_back(item);
+ if(_workItems.size() == 1)
+ {
+ postMessage();
+ }
+ }
+
+#ifdef ICE_USE_IOCP
+ bool startAsync(SocketOperation)
+ {
+ assert(false);
+ return false;
+ }
+
+ bool finishAsync(SocketOperation)
+ {
+ assert(false);
+ return false;
+ }
+#endif
+
+ virtual void message(ThreadPoolCurrent& current)
+ {
+ ThreadPoolWorkItemPtr workItem;
+ {
+ Lock sync(*this);
+ if(!_workItems.empty())
+ {
+ workItem = _workItems.front();
+ _workItems.pop_front();
+
+ if(_workItems.empty())
+ {
+#ifndef ICE_USE_IOCP
+ char c;
+ while(true)
+ {
+ ssize_t ret;
+#ifdef _WIN32
+ ret = ::recv(_fdIntrRead, &c, 1, 0);
+#else
+ ret = ::read(_fdIntrRead, &c, 1);
+#endif
+ if(ret == SOCKET_ERROR)
+ {
+ if(interrupted())
+ {
+ continue;
+ }
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ break;
+ }
+#endif
+ }
+ }
+ else
+ {
+ assert(_destroyed);
+ }
+ }
+
+ if(workItem)
+ {
+ workItem->execute(current);
+ }
+ else
+ {
+ current.ioCompleted();
+ throw ThreadPoolDestroyedException();
+ }
+ }
+
+ virtual void finished(ThreadPoolCurrent&)
+ {
+ assert(false);
+ }
+
+ virtual string toString() const
+ {
+ return "work queue";
+ }
+
+ virtual NativeInfoPtr getNativeInfo()
+ {
+#ifndef ICE_USE_IOCP
+ return new NativeInfo(_fdIntrRead);
+#endif
+ return 0;
+ }
+
+ virtual void postMessage()
+ {
+#ifndef ICE_USE_IOCP
+ char c = 0;
+ while(true)
+ {
+#ifdef _WIN32
+ if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR)
+#else
+ if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR)
+#endif
+ {
+ if(interrupted())
+ {
+ continue;
+ }
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ break;
+ }
+#else
+ if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this), &_info))
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = GetLastError();
+ throw ex;
+ }
+#endif
+ }
+
+private:
+
+ const ThreadPool* _threadPool;
+ const InstancePtr _instance;
+ Selector& _selector;
+ bool _destroyed;
+#ifdef ICE_USE_IOCP
+ AsyncInfo _info;
+#else
+ SOCKET _fdIntrRead;
+ SOCKET _fdIntrWrite;
+#endif
+ list<ThreadPoolWorkItemPtr> _workItems;
+};
+
+}
IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) :
_instance(instance),
_destroyed(false),
_prefix(prefix),
- _selector(instance, timeout),
+ _selector(instance),
_size(0),
+ _sizeIO(0),
_sizeMax(0),
_sizeWarn(0),
_serialize(_instance->initializationData().properties->getPropertyAsInt(_prefix + ".Serialize") > 0),
+ _hasPriority(false),
+ _priority(0),
+ _serverIdleTime(timeout),
+ _threadIdleTime(0),
_stackSize(0),
- _running(0),
_inUse(0),
- _load(1.0),
- _promote(true),
- _warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0),
- _hasPriority(false),
- _priority(0)
+#ifndef ICE_USE_IOCP
+ _inUseIO(0),
+ _nextHandler(_handlers.end()),
+#endif
+ _promote(true)
{
+ PropertiesPtr properties = _instance->initializationData().properties;
+
+#ifdef _WIN32
+ SYSTEM_INFO sysInfo;
+ GetSystemInfo(&sysInfo);
+ int nProcessors = sysInfo.dwNumberOfProcessors;
+#else
+ int nProcessors = static_cast<int>(sysconf(_SC_NPROCESSORS_ONLN));
+#endif
+
//
// We use just one thread as the default. This is the fastest
// possible setting, still allows one level of nesting, and
// doesn't require to make the servants thread safe.
//
- int size = _instance->initializationData().properties->getPropertyAsIntWithDefault(_prefix + ".Size", 1);
+ int size = properties->getPropertyAsIntWithDefault(_prefix + ".Size", 1);
if(size < 1)
{
Warning out(_instance->initializationData().logger);
out << _prefix << ".Size < 1; Size adjusted to 1";
size = 1;
}
-
- int sizeMax = _instance->initializationData().properties->getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
+
+ int sizeMax = properties->getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
+ if(sizeMax == -1)
+ {
+ sizeMax = nProcessors;
+ }
if(sizeMax < size)
{
Warning out(_instance->initializationData().logger);
out << _prefix << ".SizeMax < " << _prefix << ".Size; SizeMax adjusted to Size (" << size << ")";
sizeMax = size;
- }
+ }
- int sizeWarn = _instance->initializationData().properties->getPropertyAsInt(_prefix + ".SizeWarn");
+ int sizeWarn = properties->getPropertyAsIntWithDefault(_prefix + ".SizeWarn", sizeMax * 80 / 100);
if(sizeWarn != 0 && sizeWarn < size)
{
Warning out(_instance->initializationData().logger);
@@ -79,11 +396,25 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
sizeWarn = sizeMax;
}
+ int threadIdleTime = properties->getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60);
+ if(threadIdleTime < 0)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << _prefix << ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0";
+ threadIdleTime = 0;
+ }
+
const_cast<int&>(_size) = size;
const_cast<int&>(_sizeMax) = sizeMax;
const_cast<int&>(_sizeWarn) = sizeWarn;
+ const_cast<int&>(_sizeIO) = min(sizeMax, nProcessors);
+ const_cast<int&>(_threadIdleTime) = threadIdleTime;
- int stackSize = _instance->initializationData().properties->getPropertyAsInt(_prefix + ".StackSize");
+#ifdef ICE_USE_IOCP
+ _selector.setup(_sizeIO);
+#endif
+
+ int stackSize = properties->getPropertyAsInt(_prefix + ".StackSize");
if(stackSize < 0)
{
Warning out(_instance->initializationData().logger);
@@ -92,15 +423,16 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
}
const_cast<size_t&>(_stackSize) = static_cast<size_t>(stackSize);
-
- const_cast<bool&>(_hasPriority) = _instance->initializationData().properties->getProperty(_prefix + ".ThreadPriority") != "";
- const_cast<int&>(_priority) = _instance->initializationData().properties->getPropertyAsInt(_prefix + ".ThreadPriority");
+ const_cast<bool&>(_hasPriority) = properties->getProperty(_prefix + ".ThreadPriority") != "";
+ const_cast<int&>(_priority) = properties->getPropertyAsInt(_prefix + ".ThreadPriority");
if(!_hasPriority)
{
- const_cast<bool&>(_hasPriority) = _instance->initializationData().properties->getProperty("Ice.ThreadPriority") != "";
- const_cast<int&>(_priority) = _instance->initializationData().properties->getPropertyAsInt("Ice.ThreadPriority");
+ const_cast<bool&>(_hasPriority) = properties->getProperty("Ice.ThreadPriority") != "";
+ const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority");
}
+ _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector);
+
if(_instance->traceLevels()->threadPool >= 1)
{
Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
@@ -122,8 +454,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
{
thread->start(_stackSize);
}
- _threads.push_back(thread);
- ++_running;
+ _threads.insert(thread);
}
}
catch(const IceUtil::Exception& ex)
@@ -131,9 +462,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
{
Error out(_instance->initializationData().logger);
out << "cannot create thread for `" << _prefix << "':\n" << ex;
-#ifdef __GNUC__
- out << "\n" << ex.ice_stackTrace();
-#endif
}
destroy();
@@ -157,171 +485,83 @@ IceInternal::ThreadPool::~ThreadPool()
void
IceInternal::ThreadPool::destroy()
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ Lock sync(*this);
assert(!_destroyed);
_destroyed = true;
- _selector.setInterrupt();
+ _workQueue->destroy();
}
void
-IceInternal::ThreadPool::incFdsInUse()
+IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler)
{
-#ifdef ICE_USE_SELECT
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- _selector.incFdsInUse();
-#endif
+ Lock sync(*this);
+ assert(!_destroyed);
+ _selector.initialize(handler.get());
}
void
-IceInternal::ThreadPool::decFdsInUse()
+IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation remove, SocketOperation add)
{
-#ifdef ICE_USE_SELECT
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- _selector.decFdsInUse();
-#endif
+ Lock sync(*this);
+ assert(!_destroyed);
+ _selector.update(handler.get(), remove, add);
}
void
-IceInternal::ThreadPool::_register(const EventHandlerPtr& handler)
+IceInternal::ThreadPool::finish(const EventHandlerPtr& handler)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(!_destroyed && handler->_fd != INVALID_SOCKET);
- if(!handler->_registered)
- {
- if(!handler->_serializing)
- {
- _selector.add(handler.get(), NeedRead);
- }
- handler->_registered = true;
- }
-}
+ Lock sync(*this);
+ assert(!_destroyed);
+#ifndef ICE_USE_IOCP
+ _selector.finish(handler.get()); // This must be called before!
+ _workQueue->queue(new FinishedWorkItem(handler));
-void
-IceInternal::ThreadPool::unregister(const EventHandlerPtr& handler)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(!_destroyed && handler->_fd != INVALID_SOCKET);
- if(handler->_registered)
+ //
+ // Clear the current ready handlers. The handlers from this vector can't be
+ // reference counted and a handler might get destroyed once it's finished.
+ //
+ _handlers.clear();
+ _nextHandler = _handlers.end();
+#else
+ // If there are no pending asynchronous operations, we can call finish on the handler now.
+ if(!handler->_pending)
{
- if(!handler->_serializing)
- {
- _selector.remove(handler.get(), NeedRead);
- }
- handler->_registered = false;
+ _workQueue->queue(new FinishedWorkItem(handler));
+ _selector.finish(handler.get());
}
-}
-
-void
-IceInternal::ThreadPool::finish(const EventHandlerPtr& handler)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(!_destroyed && handler->_fd != INVALID_SOCKET);
- if(handler->_registered)
+ else
{
- if(!handler->_serializing)
- {
- _selector.remove(handler.get(), NeedRead);
- }
- handler->_registered = false;
+ handler->_finish = true;
}
- _finished.push_back(handler);
- _selector.setInterrupt();
+#endif
}
void
IceInternal::ThreadPool::execute(const ThreadPoolWorkItemPtr& workItem)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_destroyed)
- {
- throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
- }
- _workItems.push_back(workItem);
- _selector.setInterrupt();
-}
-
-void
-IceInternal::ThreadPool::promoteFollower(EventHandler* handler)
-{
- if(_sizeMax > 1)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(_serialize && handler)
- {
- handler->_serializing = true;
- if(handler->_registered)
- {
- _selector.remove(handler, NeedRead, true); // No interrupt, no thread is blocked on select().
- }
- }
-
- assert(!_promote);
- _promote = true;
- notify();
-
- if(!_destroyed)
- {
- assert(_inUse >= 0);
- ++_inUse;
-
- if(_inUse == _sizeWarn)
- {
- Warning out(_instance->initializationData().logger);
- out << "thread pool `" << _prefix << "' is running low on threads\n"
- << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
- }
-
- assert(_inUse <= _running);
- if(_inUse < _sizeMax && _inUse == _running)
- {
- if(_instance->traceLevels()->threadPool >= 1)
- {
- Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
- out << "growing " << _prefix << ": Size = " << (_running + 1);
- }
-
- try
- {
- IceUtil::ThreadPtr thread = new EventHandlerThread(this);
- if(_hasPriority)
- {
- thread->start(_stackSize, _priority);
- }
- else
- {
- thread->start(_stackSize);
- }
- _threads.push_back(thread);
- ++_running;
- }
- catch(const IceUtil::Exception& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "cannot create thread for `" << _prefix << "':\n" << ex;
-#ifdef __GNUC__
- out << "\n" << ex.ice_stackTrace();
-#endif
- }
- }
- }
- }
+ _workQueue->queue(workItem);
}
void
IceInternal::ThreadPool::joinWithAllThreads()
{
+ assert(_destroyed);
+
//
// _threads is immutable after destroy() has been called,
// therefore no synchronization is needed. (Synchronization
// wouldn't be possible here anyway, because otherwise the other
// threads would never terminate.)
//
- assert(_destroyed);
- for(vector<IceUtil::ThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p)
+ for(set<IceUtil::ThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p)
{
(*p)->getThreadControl().join();
}
+
+#ifndef ICE_USE_IOCP
+ _selector.finish(_workQueue.get());
+#endif
+ _selector.destroy();
}
string
@@ -330,505 +570,461 @@ IceInternal::ThreadPool::prefix() const
return _prefix;
}
-bool
-IceInternal::ThreadPool::run()
+void
+IceInternal::ThreadPool::run(const IceUtil::ThreadPtr& thread)
{
- ThreadPoolPtr self = this;
-
- if(_sizeMax > 1)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- while(!_promote)
- {
- wait();
- }
-
- _promote = false;
- }
-
+#ifndef ICE_USE_IOCP
+ ThreadPoolCurrent current(_instance, this);
+ bool select = false;
+ vector<pair<EventHandler*, SocketOperation> > handlers;
while(true)
{
- int ret;
- try
+ if(current._handler)
{
- ret = _selector.select();
+ try
+ {
+ current._handler->message(current);
+ }
+ catch(ThreadPoolDestroyedException&)
+ {
+ return;
+ }
+ catch(const exception& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: "
+ << current._handler->toString();
+ }
+ catch(...)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString();
+ }
}
- catch(const Ice::LocalException& ex)
+ else if(select)
{
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "':\n" << ex;
-#ifdef __GNUC__
- out << "\n" << ex.ice_stackTrace();
-#endif
- continue;
+ try
+ {
+ _selector.select(handlers, _serverIdleTime);
+ }
+ catch(SelectorTimeoutException&)
+ {
+ Lock sync(*this);
+ if(!_destroyed && _inUse == 0)
+ {
+ _workQueue->queue(new ShutdownWorkItem(_instance)); // Select timed-out.
+ }
+ continue;
+ }
}
- EventHandlerPtr handler;
- ThreadPoolWorkItemPtr workItem;
- bool finished = false;
- bool shutdown = false;
-
- if(ret == 0) // We initiate a shutdown if there is a thread pool timeout.
{
- shutdown = true;
- }
- else
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_selector.isInterrupted())
+ Lock sync(*this);
+ if(!current._handler)
{
- if(_selector.processInterrupt())
+ if(select)
{
- continue;
+ _handlers.swap(handlers);
+ _nextHandler = _handlers.begin();
+ _selector.finishSelect();
+ select = false;
}
-
- //
- // There are three possiblities for an interrupt:
- //
- // 1. An event handler is being finished (closed).
- //
- // 2. The thread pool has been destroyed.
- //
- // 3. A work item has been schedulded.
- //
- if(!_finished.empty())
+ else if(!current._leader && followerWait(thread, current))
{
- _selector.clearInterrupt();
- handler = _finished.front();
- _finished.pop_front();
- finished = true;
+ return; // Wait timed-out.
}
- else if(!_workItems.empty())
+ }
+ else if(_sizeMax > 1)
+ {
+ if(!current._ioCompleted)
{
//
- // Work items must be executed first even if the thread pool is destroyed.
+ // The handler didn't call ioCompleted() so we take care of decreasing
+ // the IO thread count now.
//
- _selector.clearInterrupt();
- workItem = _workItems.front();
- _workItems.pop_front();
+ --_inUseIO;
}
- else if(_destroyed)
+ else
{
//
- // Don't clear the interrupt if destroyed, so that the other threads exit as well.
+ // If the handler called ioCompleted(), we re-enable the handler in
+ // case it was disabled and we decrease the number of thread in use.
//
- return true;
+ _selector.enable(current._handler.get(), current.operation);
+ assert(_inUse > 0);
+ --_inUse;
}
- else
- {
- assert(false);
- }
- }
- else
- {
- handler = _selector.getNextSelected();
- if(!handler)
+
+ if(!current._leader && followerWait(thread, current))
{
- continue;
+ return; // Wait timed-out.
}
}
- }
-
- //
- // Now we are outside the thread synchronization.
- //
- if(shutdown)
- {
- //
- // Initiate server shutdown.
- //
- ObjectAdapterFactoryPtr factory;
- try
- {
- factory = _instance->objectAdapterFactory();
- }
- catch(const Ice::CommunicatorDestroyedException&)
- {
- continue;
- }
-
- promoteFollower();
- factory->shutdown();
//
- // No "continue", because we want shutdown to be done in
- // its own thread from this pool. Therefore we called
- // promoteFollower().
+ // Get the next ready handler.
//
- }
- else if(workItem)
- {
- try
+ if(_nextHandler != _handlers.end())
{
- //
- // "self" is faster than "this", as the reference
- // count is not modified.
- //
- workItem->execute(self);
+ current._ioCompleted = false;
+ current._handler = _nextHandler->first;
+ current.operation = _nextHandler->second;
+ ++_nextHandler;
}
- catch(const LocalException& ex)
+ else
{
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "' while calling execute():\n" << ex;
-#ifdef __GNUC__
- out << "\n" << ex.ice_stackTrace();
-#endif
+ current._handler = 0;
}
-
- //
- // No "continue", because we want execute() to be
- // called in its own thread from this pool. Note that
- // this means that execute() must call
- // promoteFollower().
- //
- }
- else
- {
- assert(handler);
-
- if(finished)
+
+ if(!current._handler)
{
//
- // Notify a handler about it's removal from the thread pool.
+ // If there are no more ready handlers and there are still threads busy performing
+ // IO, we give up leadership and promote another follower (which will perform the
+ // select() only once all the IOs are completed). Otherwise, if there are no more
+ // threads peforming IOs, it's time to do another select().
//
- try
+ if(_inUseIO > 0)
{
- //
- // "self" is faster than "this", as the reference count is not modified.
- //
- handler->finished(self);
+ promoteFollower(current);
}
- catch(const LocalException& ex)
+ else
{
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "' while calling finished():\n"
- << ex << '\n' << handler->toString();
-#ifdef __GNUC__
- out << "\n" << ex.ice_stackTrace();
-#endif
+ _selector.startSelect();
+ select = true;
}
-
- //
- // No "continue", because we want finished() to be
- // called in its own thread from this pool. Note that
- // this means that finished() must call
- // promoteFollower().
- //
}
- else
+ else if(_sizeMax > 1)
{
//
- // If the handler is "readable", try to read a message.
+ // Increment the IO thread count and if there are still threads available
+ // to perform IO and more handlers ready, we promote a follower.
//
- BasicStream stream(_instance.get());
- if(handler->readable())
+ ++_inUseIO;
+ if(_nextHandler != _handlers.end() && _inUseIO < _sizeIO)
{
- try
- {
- if(!read(handler))
- {
- continue; // Can't read without blocking.
- }
- }
- catch(const TimeoutException&)
- {
- assert(false); // This shouldn't occur as we only perform non-blocking reads.
- continue;
- }
- catch(const DatagramLimitException&) // Expected.
- {
- handler->_stream.resize(0);
- handler->_stream.i = stream.b.begin();
- continue;
- }
- catch(const SocketException& ex)
+ promoteFollower(current);
+ }
+ }
+ }
+ }
+#else
+ ThreadPoolCurrent current(_instance, this);
+ while(true)
+ {
+ try
+ {
+ current._ioCompleted = false;
+ current._handler = _selector.getNextHandler(current.operation, _threadIdleTime);
+ }
+ catch(const SelectorTimeoutException&)
+ {
+ if(_sizeMax > 1)
+ {
+ Lock sync(*this);
+
+ if(_destroyed)
+ {
+ continue;
+ }
+ else if(_inUse < static_cast<int>(_threads.size() - 1)) // If not the last idle thread, we can exit.
+ {
+ BOOL hasIO = false;
+ GetThreadIOPendingFlag(GetCurrentThread(), &hasIO);
+ if(hasIO)
{
- handler->exception(ex);
continue;
}
- catch(const LocalException& ex)
+
+ if(_instance->traceLevels()->threadPool >= 1)
{
- if(handler->datagram())
- {
- if(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0)
- {
- Warning out(_instance->initializationData().logger);
- out << "datagram connection exception:\n" << ex << '\n' << handler->toString();
- }
- handler->_stream.resize(0);
- handler->_stream.i = stream.b.begin();
- }
- else
- {
- handler->exception(ex);
- }
- continue;
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
+ out << "shrinking " << _prefix << ": Size = " << (_threads.size() - 1);
}
-
- stream.swap(handler->_stream);
- assert(stream.i == stream.b.end());
+ _threads.erase(thread);
+ _workQueue->queue(new JoinThreadWorkItem(thread));
+ return;
}
-
- //
- // Provide a new mesage to the handler.
- //
- try
+ else if(_inUse > 0)
{
//
- // "self" is faster than "this", as the reference count is not modified.
- //
- handler->message(stream, self);
+ // If this is the last idle thread but there are still other threads
+ // busy dispatching, we go back waiting with _threadIdleTime. We only
+ // wait with _serverIdleTime when there's only one thread left.
+ //
+ continue;
}
- catch(const LocalException& ex)
+ assert(_threads.size() == 1);
+ }
+
+ try
+ {
+ current._handler = _selector.getNextHandler(current.operation, _serverIdleTime);
+ }
+ catch(const SelectorTimeoutException&)
+ {
+ Lock sync(*this);
+ if(!_destroyed)
{
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "' while calling message():\n"
- << ex << '\n' << handler->toString();
-#ifdef __GNUC__
- out << "\n" << ex.ice_stackTrace();
-#endif
+ _workQueue->queue(new ShutdownWorkItem(_instance));
}
-
- //
- // No "continue", because we want message() to be
- // called in its own thread from this pool. Note that
- // this means that message() must call
- // promoteFollower().
- //
+ continue;
}
}
- if(_sizeMax > 1)
+ assert(current._handler);
+ try
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ current._handler->message(current);
- if(!_destroyed)
+ if(_sizeMax > 1 && current._ioCompleted)
{
- if(_serialize && handler && handler->_serializing)
- {
- if(handler->_registered)
- {
- // No interrupt if no thread is blocked on select (_promote == true)
- _selector.add(handler.get(), NeedRead, _promote);
- }
- handler->_serializing = false;
- }
+ Lock sync(*this);
+ assert(_inUse > 0);
+ --_inUse;
+ }
+ }
+ catch(ThreadPoolDestroyedException&)
+ {
+ return;
+ }
+ catch(const exception& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: " << current._handler->toString();
+ }
+ catch(...)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString();
+ }
+ }
+#endif
+}
- if(_size < _sizeMax) // Dynamic thread pool
- {
- //
- // First we reap threads that have been destroyed before.
- //
- int sz = static_cast<int>(_threads.size());
- assert(_running <= sz);
- if(_running < sz)
- {
- vector<IceUtil::ThreadPtr>::iterator start =
- partition(_threads.begin(), _threads.end(),
- IceUtil::constMemFun(&IceUtil::Thread::isAlive));
+bool
+IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
+{
+ current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called.
- for(vector<IceUtil::ThreadPtr>::iterator p = start; p != _threads.end(); ++p)
- {
- (*p)->getThreadControl().join();
- }
+ if(_sizeMax > 1)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+#ifndef ICE_USE_IOCP
+ --_inUseIO;
+
+ if(_serialize && !_destroyed)
+ {
+ _selector.disable(current._handler.get(), current.operation);
+ }
- _threads.erase(start, _threads.end());
- }
-
- //
- // Now we check if this thread can be destroyed, based
- // on a load factor.
- //
+ if(current._leader)
+ {
+ //
+ // If this thread is still the leader, it's time to promote a new leader.
+ //
+ promoteFollower(current);
+ }
+ else if(_promote && (_nextHandler != _handlers.end() || _inUseIO == 0))
+ {
+ notify();
+ }
+#endif
- //
- // The load factor jumps immediately to the number of
- // threads that are currently in use, but decays
- // exponentially if the number of threads in use is
- // smaller than the load factor. This reflects that we
- // create threads immediately when they are needed,
- // but want the number of threads to slowly decline to
- // the configured minimum.
- //
- double inUse = static_cast<double>(_inUse);
- if(_load < inUse)
+ assert(_inUse >= 0);
+ ++_inUse;
+
+ if(_inUse == _sizeWarn)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "thread pool `" << _prefix << "' is running low on threads\n"
+ << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
+ }
+
+ if(!_destroyed)
+ {
+ assert(_inUse <= static_cast<int>(_threads.size()));
+ if(_inUse < _sizeMax && _inUse == static_cast<int>(_threads.size()))
+ {
+ if(_instance->traceLevels()->threadPool >= 1)
+ {
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
+ out << "growing " << _prefix << ": Size=" << _threads.size() + 1;
+ }
+
+ try
+ {
+ IceUtil::ThreadPtr thread = new EventHandlerThread(this);
+ if(_hasPriority)
{
- _load = inUse;
+ thread->start(_stackSize, _priority);
}
else
{
- const double loadFactor = 0.05; // TODO: Configurable?
- const double oneMinusLoadFactor = 1 - loadFactor;
- _load = _load * oneMinusLoadFactor + inUse * loadFactor;
- }
-
- if(_running > _size)
- {
- int load = static_cast<int>(_load + 0.5);
-
- //
- // We add one to the load factor because on
- // additional thread is needed for select().
- //
- if(load + 1 < _running)
- {
- if(_instance->traceLevels()->threadPool >= 1)
- {
- Trace out(_instance->initializationData().logger,
- _instance->traceLevels()->threadPoolCat);
- out << "shrinking " << _prefix << ": Size = " << (_running - 1);
- }
-
- assert(_inUse > 0);
- --_inUse;
-
- assert(_running > 0);
- --_running;
-
- return false;
- }
+ thread->start(_stackSize);
}
+ _threads.insert(thread);
+ }
+ catch(const IceUtil::Exception& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "cannot create thread for `" << _prefix << "':\n" << ex;
}
-
- assert(_inUse > 0);
- --_inUse;
- }
-
- //
- // Do not wait to be promoted again to release these objects.
- //
- handler = 0;
- workItem = 0;
-
- while(!_promote)
- {
- wait();
}
-
- _promote = false;
}
}
+
+ return _serialize;
}
+#ifdef ICE_USE_IOCP
bool
-IceInternal::ThreadPool::read(const EventHandlerPtr& handler)
+IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
{
- BasicStream& stream = handler->_stream;
+ assert(current._handler->_pending & current.operation);
- if(stream.i - stream.b.begin() >= headerSize)
+ if(current._handler->_started & current.operation)
{
- if(!handler->read(stream))
+ assert(!(current._handler->_ready & current.operation));
+ current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready | current.operation);
+ current._handler->_started = static_cast<SocketOperation>(current._handler->_started & ~current.operation);
+ if(!current._handler->finishAsync(current.operation)) // Returns false if the handler is finished.
{
+ current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
+ if(!current._handler->_pending && current._handler->_finish)
+ {
+ _workQueue->queue(new FinishedWorkItem(current._handler));
+ _selector.finish(current._handler.get());
+ }
+ return false;
+ }
+ }
+ else if(!(current._handler->_ready & current.operation) && (current._handler->_registered & current.operation))
+ {
+ assert(!(current._handler->_started & current.operation));
+ if(!current._handler->startAsync(current.operation))
+ {
+ current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
+ if(!current._handler->_pending && current._handler->_finish)
+ {
+ _workQueue->queue(new FinishedWorkItem(current._handler));
+ _selector.finish(current._handler.get());
+ }
+ return false;
+ }
+ else
+ {
+ current._handler->_started = static_cast<SocketOperation>(current._handler->_started | current.operation);
return false;
}
- assert(stream.i == stream.b.end());
- return true;
}
- if(stream.b.size() == 0)
+ if(current._handler->_registered & current.operation)
{
- stream.b.resize(headerSize);
- stream.i = stream.b.begin();
+ assert(current._handler->_ready & current.operation);
+ current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready & ~current.operation);
+ return true;
+ }
+ else
+ {
+ current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
+ if(!current._handler->_pending && current._handler->_finish)
+ {
+ _workQueue->queue(new FinishedWorkItem(current._handler));
+ _selector.finish(current._handler.get());
+ }
+ return false;
}
+}
- if(stream.i != stream.b.end())
+void
+IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current)
+{
+ if(current._handler->_registered & current.operation)
{
- if(!handler->read(stream))
+ assert(!(current._handler->_ready & current.operation));
+ if(!current._handler->startAsync(current.operation))
{
- return false;
+ current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
+ }
+ else
+ {
+ assert(current._handler->_pending & current.operation);
+ current._handler->_started = static_cast<SocketOperation>(current._handler->_started | current.operation);
}
- assert(stream.i == stream.b.end());
}
-
- ptrdiff_t pos = stream.i - stream.b.begin();
- if(pos < headerSize)
- {
- //
- // This situation is possible for small UDP packets.
- //
- throw IllegalMessageSizeException(__FILE__, __LINE__);
- }
-
- stream.i = stream.b.begin();
- const Byte* m;
- stream.readBlob(m, static_cast<Int>(sizeof(magic)));
- if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3])
- {
- BadMagicException ex(__FILE__, __LINE__);
- ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic));
- throw ex;
- }
- Byte pMajor;
- Byte pMinor;
- stream.read(pMajor);
- stream.read(pMinor);
- if(pMajor != protocolMajor
- || static_cast<unsigned char>(pMinor) > static_cast<unsigned char>(protocolMinor))
- {
- UnsupportedProtocolException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(pMajor);
- ex.badMinor = static_cast<unsigned char>(pMinor);
- ex.major = static_cast<unsigned char>(protocolMajor);
- ex.minor = static_cast<unsigned char>(protocolMinor);
- throw ex;
- }
- Byte eMajor;
- Byte eMinor;
- stream.read(eMajor);
- stream.read(eMinor);
- if(eMajor != encodingMajor
- || static_cast<unsigned char>(eMinor) > static_cast<unsigned char>(encodingMinor))
- {
- UnsupportedEncodingException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(eMajor);
- ex.badMinor = static_cast<unsigned char>(eMinor);
- ex.major = static_cast<unsigned char>(encodingMajor);
- ex.minor = static_cast<unsigned char>(encodingMinor);
- throw ex;
- }
- Byte messageType;
- stream.read(messageType);
- Byte compress;
- stream.read(compress);
- Int size;
- stream.read(size);
- if(size < headerSize)
- {
- throw IllegalMessageSizeException(__FILE__, __LINE__);
- }
- if(size > static_cast<Int>(_instance->messageSizeMax()))
- {
- Ex::throwMemoryLimitException(__FILE__, __LINE__, size, _instance->messageSizeMax());
- }
- if(size > static_cast<Int>(stream.b.size()))
- {
- stream.b.resize(size);
- }
- stream.i = stream.b.begin() + pos;
-
- if(stream.i != stream.b.end())
+ else
+ {
+ current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
+ }
+
+ if(!current._handler->_pending && current._handler->_finish)
+ {
+ // There are no more pending async operations, it's time to call finish.
+ _workQueue->queue(new FinishedWorkItem(current._handler));
+ _selector.finish(current._handler.get());
+ }
+}
+#else
+void
+IceInternal::ThreadPool::promoteFollower(ThreadPoolCurrent& current)
+{
+ assert(!_promote && current._leader);
+ _promote = true;
+ if(_inUseIO < _sizeIO && (_nextHandler != _handlers.end() || _inUseIO == 0))
+ {
+ notify();
+ }
+ current._leader = false;
+}
+
+bool
+IceInternal::ThreadPool::followerWait(const IceUtil::ThreadPtr& thread, ThreadPoolCurrent& current)
+{
+ assert(!current._leader);
+
+ //
+ // It's important to clear the handler before waiting to make sure that
+ // resources for the handler are released now if it's finished. We also
+ // clear the per-thread stream.
+ //
+ current._handler = 0;
+ current.stream.clear();
+ current.stream.b.clear();
+
+ //
+ // Wait to be promoted and for all the IO threads to be done.
+ //
+ while(!_promote || _inUseIO == _sizeIO || _nextHandler == _handlers.end() && _inUseIO > 0)
{
- if(handler->datagram())
+ if(_threadIdleTime)
{
- if(_warnUdp)
+ if(!timedWait(IceUtil::Time::seconds(_threadIdleTime)))
{
- Warning out(_instance->initializationData().logger);
- out << "DatagramLimitException: maximum size of " << pos << " exceeded";
+ if(!_destroyed && (!_promote || _inUseIO == _sizeIO || _nextHandler == _handlers.end() && _inUseIO > 0))
+ {
+ if(_instance->traceLevels()->threadPool >= 1)
+ {
+ Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
+ out << "shrinking " << _prefix << ": Size=" << (_threads.size() - 1);
+ }
+ assert(_threads.size() > 1); // Can only be called by a waiting follower thread.
+ _threads.erase(thread);
+ _workQueue->queue(new JoinThreadWorkItem(thread));
+ return true;
+ }
}
- throw DatagramLimitException(__FILE__, __LINE__);
}
else
{
- if(!handler->read(stream))
- {
- return false;
- }
- assert(stream.i == stream.b.end());
+ wait();
}
}
-
- return true;
+ current._leader = true; // The current thread has become the leader.
+ _promote = false;
+ return false;
}
+#endif
IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPoolPtr& pool) :
IceUtil::Thread(pool->_prefix + " thread"),
@@ -841,55 +1037,65 @@ IceInternal::ThreadPool::EventHandlerThread::run()
{
if(_pool->_instance->initializationData().threadHook)
{
- _pool->_instance->initializationData().threadHook->start();
+ try
+ {
+ _pool->_instance->initializationData().threadHook->start();
+ }
+ catch(const exception& ex)
+ {
+ Error out(_pool->_instance->initializationData().logger);
+ out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex;
+ }
+ catch(...)
+ {
+ Error out(_pool->_instance->initializationData().logger);
+ out << "thread hook start() method raised an unexpected exception in `" << _pool->_prefix << "'";
+ }
}
- bool promote;
-
try
{
- promote = _pool->run();
+ _pool->run(this);
}
- catch(const Ice::Exception& ex)
+ catch(const exception& ex)
{
Error out(_pool->_instance->initializationData().logger);
- out << "exception in `" << _pool->_prefix << "':\n" << ex.what();
-#ifdef __GNUC__
- out << "\n" << ex.ice_stackTrace();
-#endif
- promote = true;
- }
- catch(const std::exception& ex)
- {
- Error out(_pool->_instance->initializationData().logger);
- out << "exception in `" << _pool->_prefix << "':\n" << ex.what();
- promote = true;
+ out << "exception in `" << _pool->_prefix << "':\n" << ex;
}
catch(...)
{
Error out(_pool->_instance->initializationData().logger);
out << "unknown exception in `" << _pool->_prefix << "'";
- promote = true;
}
- if(promote && _pool->_sizeMax > 1)
+ if(_pool->_instance->initializationData().threadHook)
{
- //
- // Promote a follower, but w/o modifying _inUse or creating
- // new threads.
- //
+ try
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*_pool.get());
- assert(!_pool->_promote);
- _pool->_promote = true;
- _pool->notify();
+ _pool->_instance->initializationData().threadHook->stop();
+ }
+ catch(const exception& ex)
+ {
+ Error out(_pool->_instance->initializationData().logger);
+ out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "':\n" << ex;
+ }
+ catch(...)
+ {
+ Error out(_pool->_instance->initializationData().logger);
+ out << "thread hook stop() method raised an unexpected exception in `" << _pool->_prefix << "'";
}
- }
-
- if(_pool->_instance->initializationData().threadHook)
- {
- _pool->_instance->initializationData().threadHook->stop();
}
_pool = 0; // Break cyclic dependency.
}
+
+ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance, const ThreadPoolPtr& threadPool) :
+ operation(SocketOperationNone),
+ stream(instance.get()),
+ _threadPool(threadPool.get()),
+ _ioCompleted(false)
+#ifndef ICE_USE_IOCP
+ , _leader(false)
+#endif
+{
+}