summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp83
1 files changed, 41 insertions, 42 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index b2e7883c325..f7c25cc20f4 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -8,7 +8,6 @@
// **********************************************************************
#include <Ice/ThreadPool.h>
-#include <IceUtil/DisableWarnings.h>
#include <Ice/EventHandler.h>
#include <Ice/Network.h>
#include <Ice/LocalException.h>
@@ -40,7 +39,7 @@ public:
ShutdownWorkItem(const InstancePtr& instance) : _instance(instance)
{
}
-
+
virtual void
execute(ThreadPoolCurrent& current)
{
@@ -51,9 +50,9 @@ public:
}
catch(const CommunicatorDestroyedException&)
{
- }
+ }
}
-
+
private:
const InstancePtr _instance;
@@ -66,13 +65,13 @@ public:
FinishedWorkItem(const EventHandlerPtr& handler, bool close) : _handler(handler), _close(close)
{
}
-
+
virtual void
execute(ThreadPoolCurrent& current)
{
_handler->finished(current, _close);
}
-
+
private:
const EventHandlerPtr _handler;
@@ -133,11 +132,11 @@ class ThreadPoolDestroyedException
}
-IceInternal::DispatchWorkItem::DispatchWorkItem()
+IceInternal::DispatchWorkItem::DispatchWorkItem()
{
}
-IceInternal::DispatchWorkItem::DispatchWorkItem(const Ice::ConnectionPtr& connection) : _connection(connection)
+IceInternal::DispatchWorkItem::DispatchWorkItem(const Ice::ConnectionPtr& connection) : _connection(connection)
{
}
@@ -152,7 +151,7 @@ IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(const InstancePtr& instanc
_instance(instance),
_selector(selector),
_destroyed(false)
-#ifdef ICE_USE_IOCP
+#ifdef ICE_USE_IOCP
, _info(SocketOperationRead)
#endif
{
@@ -181,7 +180,7 @@ IceInternal::ThreadPoolWorkQueue::~ThreadPoolWorkQueue()
Error out(_instance->initializationData().logger);
out << "exception in selector while calling closeSocket():\n" << ex;
}
-
+
try
{
closeSocket(_fdIntrWrite);
@@ -242,7 +241,7 @@ void
IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current)
{
ThreadPoolWorkItemPtr workItem;
- {
+ {
Lock sync(*this);
if(!_workItems.empty())
{
@@ -267,7 +266,7 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current)
{
continue;
}
-
+
SocketException ex(__FILE__, __LINE__);
ex.error = getSocketErrno();
throw ex;
@@ -345,7 +344,7 @@ IceInternal::ThreadPoolWorkQueue::postMessage()
{
continue;
}
-
+
SocketException ex(__FILE__, __LINE__);
ex.error = getSocketErrno();
throw ex;
@@ -387,7 +386,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
int nProcessors = sysInfo.dwNumberOfProcessors;
# else
int nProcessors = static_cast<int>(sysconf(_SC_NPROCESSORS_ONLN));
-# endif
+# endif
#endif
//
@@ -416,7 +415,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
out << _prefix << ".SizeMax < " << _prefix << ".Size; SizeMax adjusted to Size (" << size << ")";
sizeMax = size;
}
-
+
int sizeWarn = properties->getPropertyAsInt(_prefix + ".SizeWarn");
if(sizeWarn != 0 && sizeWarn < size)
{
@@ -461,7 +460,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
stackSize = 0;
}
const_cast<size_t&>(_stackSize) = static_cast<size_t>(stackSize);
-
+
const_cast<bool&>(_hasPriority) = properties->getProperty(_prefix + ".ThreadPriority") != "";
const_cast<int&>(_priority) = properties->getPropertyAsInt(_prefix + ".ThreadPriority");
if(!_hasPriority)
@@ -469,7 +468,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
const_cast<bool&>(_hasPriority) = properties->getProperty("Ice.ThreadPriority") != "";
const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority");
}
-
+
_workQueue = new ThreadPoolWorkQueue(_instance, _selector);
if(_instance->traceLevels()->threadPool >= 1)
@@ -525,11 +524,11 @@ void
IceInternal::ThreadPool::destroy()
{
Lock sync(*this);
- if(_destroyed)
+ if(_destroyed)
{
return;
}
-
+
_destroyed = true;
_workQueue->destroy();
}
@@ -568,7 +567,7 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation
{
return;
}
-
+
_selector.update(handler.get(), remove, add);
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
if(add & SocketOperationRead && handler->_hasMoreData && !(handler->_disabled & SocketOperationRead))
@@ -640,7 +639,7 @@ IceInternal::ThreadPool::dispatchFromThisThread(const DispatchWorkItemPtr& workI
else
{
workItem->run();
- }
+ }
}
void
@@ -699,7 +698,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
catch(const exception& ex)
{
Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: "
+ out << "exception in `" << _prefix << "':\n" << ex << "\nevent handler: "
<< current._handler->toString();
}
catch(...)
@@ -759,7 +758,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
if(!current._ioCompleted)
{
//
- // The handler didn't call ioCompleted() so we take care of decreasing
+ // The handler didn't call ioCompleted() so we take care of decreasing
// the IO thread count now.
//
--_inUseIO;
@@ -831,9 +830,9 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
if(!current._handler)
{
//
- // If there are no more ready handlers and there are still threads busy performing
+ // If there are no more ready handlers and there are still threads busy performing
// IO, we give up leadership and promote another follower (which will perform the
- // select() only once all the IOs are completed). Otherwise, if there are no more
+ // select() only once all the IOs are completed). Otherwise, if there are no more
// threads peforming IOs, it's time to do another select().
//
if(_inUseIO > 0)
@@ -881,7 +880,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
if(_sizeMax > 1)
{
Lock sync(*this);
-
+
if(_destroyed)
{
continue;
@@ -904,14 +903,14 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
_threads.erase(thread);
_workQueue->queue(new JoinThreadWorkItem(thread));
return;
- }
+ }
else if(_inUse > 0)
{
//
// If this is the last idle thread but there are still other threads
// busy dispatching, we go back waiting with _threadIdleTime. We only
// wait with _serverIdleTime when there's only one thread left.
- //
+ //
continue;
}
assert(_threads.size() == 1);
@@ -964,7 +963,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
// because the wide string is using the platform default encoding.
//
Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "':\n"
+ out << "exception in `" << _prefix << "':\n"
<< IceUtil::wstringToString(ex->Message->Data(), _instance->getStringConverter())
<< "\nevent handler: " << current._handler->toString();
}
@@ -999,7 +998,7 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
if(_sizeMax > 1)
{
-
+
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
--_inUseIO;
@@ -1039,16 +1038,16 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
assert(_inUse >= 0);
++_inUse;
-
+
if(_inUse == _sizeWarn)
{
Warning out(_instance->initializationData().logger);
out << "thread pool `" << _prefix << "' is running low on threads\n"
<< "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
}
-
+
if(!_destroyed)
- {
+ {
assert(_inUse <= static_cast<int>(_threads.size()));
if(_inUse < _sizeMax && _inUse == static_cast<int>(_threads.size()))
{
@@ -1057,7 +1056,7 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
Trace out(_instance->initializationData().logger, _instance->traceLevels()->threadPoolCat);
out << "growing " << _prefix << ": Size=" << _threads.size() + 1;
}
-
+
try
{
EventHandlerThreadPtr thread = new EventHandlerThread(this, nextThreadId());
@@ -1195,7 +1194,7 @@ bool
IceInternal::ThreadPool::followerWait(ThreadPoolCurrent& current)
{
assert(!current._leader);
-
+
current._thread->setState(ThreadStateIdle);
//
@@ -1216,7 +1215,7 @@ IceInternal::ThreadPool::followerWait(ThreadPoolCurrent& current)
{
if(!timedWait(IceUtil::Time::seconds(_threadIdleTime)))
{
- if(!_destroyed && (!_promote || _inUseIO == _sizeIO ||
+ if(!_destroyed && (!_promote || _inUseIO == _sizeIO ||
(_nextHandler == _handlers.end() && _inUseIO > 0)))
{
if(_instance->traceLevels()->threadPool >= 1)
@@ -1246,7 +1245,7 @@ string
IceInternal::ThreadPool::nextThreadId()
{
ostringstream os;
- os << _prefix << "-" << _nextThreadId++;
+ os << _prefix << "-" << _nextThreadId++;
return os.str();
}
@@ -1316,7 +1315,7 @@ IceInternal::ThreadPool::EventHandlerThread::run()
catch(...)
{
Error out(_pool->_instance->initializationData().logger);
- out << "unknown exception in `" << _pool->_prefix << "'";
+ out << "unknown exception in `" << _pool->_prefix << "'";
}
_observer.detach();
@@ -1342,12 +1341,12 @@ IceInternal::ThreadPool::EventHandlerThread::run()
_pool = 0; // Break cyclic dependency.
}
-ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance,
+ThreadPoolCurrent::ThreadPoolCurrent(const InstancePtr& instance,
const ThreadPoolPtr& threadPool,
const ThreadPool::EventHandlerThreadPtr& thread) :
- operation(SocketOperationNone),
- stream(instance.get(), Ice::currentProtocolEncoding),
- _threadPool(threadPool.get()),
+ operation(SocketOperationNone),
+ stream(instance.get(), Ice::currentProtocolEncoding),
+ _threadPool(threadPool.get()),
_thread(thread),
_ioCompleted(false)
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)