summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp132
1 files changed, 29 insertions, 103 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index fcee3afb792..07dedbead26 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -30,14 +30,7 @@ void
IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if (handler->server())
- {
- ++_servers;
- }
- else
- {
- ++_clients;
- }
+ ++_handlers;
_changes.push_back(make_pair(fd, handler));
setInterrupt(0);
}
@@ -51,70 +44,30 @@ IceInternal::ThreadPool::unregister(SOCKET fd)
}
void
-IceInternal::ThreadPool::serverIsNowClient()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- ++_clients;
- assert(_servers > 0);
- --_servers;
- if (_servers == 0)
- {
- notifyAll(); // For waitUntil...Finished() methods.
- }
-}
-
-void
-IceInternal::ThreadPool::clientIsNowServer()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- ++_servers;
- assert(_clients > 0);
- --_clients;
-}
-
-void
IceInternal::ThreadPool::promoteFollower()
{
_threadMutex.unlock();
}
void
-IceInternal::ThreadPool::initiateServerShutdown()
+IceInternal::ThreadPool::initiateShutdown()
{
setInterrupt(1);
}
void
-IceInternal::ThreadPool::waitUntilServerFinished()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- while (_servers != 0 && _threadNum != 0)
- {
- wait();
- }
-
- if (_servers != 0)
- {
- Error out(_logger);
- out << "can't wait for graceful server termination in thread pool\n"
- << "since all threads have vanished";
- }
-}
-
-void
IceInternal::ThreadPool::waitUntilFinished()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- while (_clients + _servers != 0 && _threadNum != 0)
+ while (_handlers != 0 && _threadNum != 0)
{
wait();
}
- if (_clients + _servers != 0)
+ if (_handlers != 0)
{
- Error out(_logger);
+ Error out(_instance->logger());
out << "can't wait for graceful application termination in thread pool\n"
<< "since all threads have vanished";
}
@@ -139,35 +92,11 @@ IceInternal::ThreadPool::joinWithAllThreads()
}
}
-void
-IceInternal::ThreadPool::setMaxConnections(int maxConnections)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if (maxConnections < _threadNum + 1 && maxConnections != 0)
- {
- _maxConnections = _threadNum + 1;
- }
- else
- {
- _maxConnections = maxConnections;
- }
-}
-
-int
-IceInternal::ThreadPool::getMaxConnections()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- return _maxConnections;
-}
-
-IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) :
+IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, bool server) :
_instance(instance),
- _logger(_instance->logger()),
- _properties(_instance->properties()),
_destroyed(false),
_lastFd(INVALID_SOCKET),
- _clients(0),
- _servers(0),
+ _handlers(0),
_timeout(0)
{
SOCKET fds[2];
@@ -181,8 +110,16 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) :
_maxFd = _fdIntrRead;
_minFd = _fdIntrRead;
- _timeout = _properties->getPropertyAsInt("Ice.ServerIdleTime");
- _threadNum = _properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Size", 10);
+ if (server)
+ {
+ _timeout = _instance->properties()->getPropertyAsInt("Ice.ServerIdleTime");
+ _threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ServerThreadPool.Size", 10);
+ }
+ else
+ {
+ _threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ClientThreadPool.Size", 1);
+ }
+
if (_threadNum < 1)
{
_threadNum = 1;
@@ -205,9 +142,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) :
destroy();
throw;
}
-
- // Must be called after _threadNum is set.
- setMaxConnections(_properties->getPropertyAsInt("Ice.ThreadPool.MaxConnections"));
}
IceInternal::ThreadPool::~ThreadPool()
@@ -297,12 +231,13 @@ repeat:
void
IceInternal::ThreadPool::run()
{
+ ThreadPoolPtr self = this;
bool shutdown = false;
while (true)
{
_threadMutex.lock();
-
+
repeatSelect:
if (shutdown) // Shutdown has been initiated.
@@ -438,7 +373,7 @@ IceInternal::ThreadPool::run()
//
if (fdSet.fd_count == 0)
{
- Error out(_logger);
+ Error out(_instance->logger());
out << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
goto repeatSelect;
}
@@ -490,7 +425,7 @@ IceInternal::ThreadPool::run()
if (loops > 1)
{
- Error out(_logger);
+ Error out(_instance->logger());
out << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
goto repeatSelect;
}
@@ -501,7 +436,7 @@ IceInternal::ThreadPool::run()
map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
if(p == _handlerMap.end())
{
- Error out(_logger);
+ Error out(_instance->logger());
out << "filedescriptor " << _lastFd << " not registered with the thread pool";
goto repeatSelect;
}
@@ -518,23 +453,14 @@ IceInternal::ThreadPool::run()
// Notify a handler about it's removal from the thread
// pool.
//
- handler->finished();
+ handler->finished(self); // "self" is faster than "this", as the reference count is not modified.
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if (handler->server())
- {
- assert(_servers > 0);
- --_servers;
- }
- else
- {
- assert(_clients > 0);
- --_clients;
- }
- if (_clients == 0 || _servers == 0)
+ assert(_handlers > 0);
+ if (--_handlers == 0)
{
- notifyAll(); // For waitUntil...Finished() methods.
+ notifyAll(); // For waitUntilFinished().
}
}
}
@@ -564,7 +490,7 @@ IceInternal::ThreadPool::run()
assert(stream.i == stream.b.end());
}
- handler->message(stream);
+ handler->message(stream, self); // "self" is faster than "this", as the reference count is not modified.
}
}
}
@@ -640,12 +566,12 @@ IceInternal::ThreadPool::EventHandlerThread::run()
}
catch (const Exception& ex)
{
- Error out(_pool->_logger);
+ Error out(_pool->_instance->logger());
out << "exception in thread pool:\n" << ex;
}
catch (...)
{
- Error out(_pool->_logger);
+ Error out(_pool->_instance->logger());
out << "unknown exception in thread pool";
}