summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp245
1 files changed, 114 insertions, 131 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 3bce62e5bc7..687d0787a84 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -122,7 +122,7 @@ IceInternal::ThreadPool::destroy()
assert(_handlerMap.empty());
assert(_changes.empty());
_destroyed = true;
- setInterrupt(0);
+ setInterrupt();
}
void
@@ -131,7 +131,7 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
_changes.push_back(make_pair(fd, handler));
- setInterrupt(0);
+ setInterrupt();
}
void
@@ -140,7 +140,7 @@ IceInternal::ThreadPool::unregister(SOCKET fd)
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
_changes.push_back(make_pair(fd, EventHandlerPtr(0)));
- setInterrupt(0);
+ setInterrupt();
}
void
@@ -186,16 +186,6 @@ IceInternal::ThreadPool::promoteFollower()
}
void
-IceInternal::ThreadPool::initiateShutdown()
-{
- //
- // This operation must be signal safe, so all we can do is to set
- // an interrupt.
- //
- setInterrupt(1);
-}
-
-void
IceInternal::ThreadPool::joinWithAllThreads()
{
//
@@ -215,7 +205,7 @@ IceInternal::ThreadPool::joinWithAllThreads()
#endif
}
-bool
+void
IceInternal::ThreadPool::clearInterrupt()
{
char c;
@@ -232,9 +222,7 @@ repeat:
SocketException ex(__FILE__, __LINE__);
ex.error = getSocketErrno();
- //throw ex;
- Error out(_instance->logger());
- out << "exception in `" << _prefix << "':\n" << ex;
+ throw ex;
}
#else
if(::read(_fdIntrRead, &c, 1) == -1)
@@ -246,18 +234,16 @@ repeat:
SyscallException ex(__FILE__, __LINE__);
ex.error = getSystemErrno();
- //throw ex;
- Error out(_instance->logger());
- out << "exception in `" << _prefix << "':\n" << ex;
+ throw ex;
}
#endif
-
- return c == 1; // Return true if shutdown has been initiated.
}
void
-IceInternal::ThreadPool::setInterrupt(char c)
+IceInternal::ThreadPool::setInterrupt()
{
+ char c = 0;
+
repeat:
#ifdef _WIN32
@@ -270,9 +256,7 @@ repeat:
SocketException ex(__FILE__, __LINE__);
ex.error = getSocketErrno();
- //throw ex;
- Error out(_instance->logger());
- out << "exception in `" << _prefix << "':\n" << ex;
+ throw ex;
}
#else
if(::write(_fdIntrWrite, &c, 1) == -1)
@@ -284,9 +268,7 @@ repeat:
SyscallException ex(__FILE__, __LINE__);
ex.error = getSystemErrno();
- //throw ex;
- Error out(_instance->logger());
- out << "exception in `" << _prefix << "':\n" << ex;
+ throw ex;
}
#endif
}
@@ -325,14 +307,6 @@ IceInternal::ThreadPool::run()
ret = ::select(_maxFd + 1, &fdSet, 0, 0, 0);
}
- if(ret == 0) // We initiate a shutdown if there is a thread pool timeout.
- {
- assert(_timeout > 0);
- _timeout = 0;
- initiateShutdown();
- continue;
- }
-
if(ret == SOCKET_ERROR)
{
if(interrupted())
@@ -345,43 +319,48 @@ IceInternal::ThreadPool::run()
//throw ex;
Error out(_instance->logger());
out << "exception in `" << _prefix << "':\n" << ex;
+ continue;
}
EventHandlerPtr handler;
bool finished = false;
bool shutdown = false;
-
+
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(FD_ISSET(_fdIntrRead, &fdSet))
+
+ if(ret == 0) // We initiate a shutdown if there is a thread pool timeout.
{
- //
- // There are three possiblities for an interrupt:
- //
- // - The thread pool has been destroyed.
- //
- // - Server shutdown has been initiated.
- //
- // - An event handler was registered or unregistered.
- //
-
- //
- // Thread pool destroyed?
- //
- if(_destroyed)
+ assert(_timeout > 0);
+ _timeout = 0;
+ shutdown = true;
+ }
+ else
+ {
+ if(FD_ISSET(_fdIntrRead, &fdSet))
{
//
- // Don't clear the interrupt if destroyed, so that
- // the other threads exit as well.
+ // There are two possiblities for an interrupt:
+ //
+ // 1. The thread pool has been destroyed.
+ //
+ // 2. An event handler was registered or unregistered.
//
- return true;
- }
-
- shutdown = clearInterrupt();
- if(!shutdown)
- {
+ //
+ // Thread pool destroyed?
+ //
+ if(_destroyed)
+ {
+ //
+ // Don't clear the interrupt if destroyed, so that
+ // the other threads exit as well.
+ //
+ return true;
+ }
+
+ clearInterrupt();
+
//
// An event handler must have been registered or
// unregistered.
@@ -418,92 +397,96 @@ IceInternal::ThreadPool::run()
// the thread synchronization.
}
}
- }
- else
- {
+ else
+ {
//
// Optimization for WIN32 specific version of fd_set. Looping with a
// FD_ISSET test like for Unix is very inefficient for WIN32.
//
#ifdef _WIN32
- //
- // Round robin for the filedescriptors.
- //
- if(fdSet.fd_count == 0)
- {
- Error out(_instance->logger());
- out << "select() in `" << _prefix << "' returned " << ret << " but no filedescriptor is readable";
- continue;
- }
-
- SOCKET largerFd = _maxFd + 1;
- SOCKET smallestFd = _maxFd + 1;
- for(u_short i = 0; i < fdSet.fd_count; ++i)
- {
- SOCKET fd = fdSet.fd_array[i];
- assert(fd != INVALID_SOCKET);
+ //
+ // Round robin for the filedescriptors.
+ //
+ if(fdSet.fd_count == 0)
+ {
+ Error out(_instance->logger());
+ out << "select() in `" << _prefix << "' returned " << ret
+ << " but no filedescriptor is readable";
+ continue;
+ }
- if(fd > _lastFd || _lastFd == INVALID_SOCKET)
+ SOCKET largerFd = _maxFd + 1;
+ SOCKET smallestFd = _maxFd + 1;
+ for(u_short i = 0; i < fdSet.fd_count; ++i)
{
- largerFd = min(largerFd, fd);
+ SOCKET fd = fdSet.fd_array[i];
+ assert(fd != INVALID_SOCKET);
+
+ if(fd > _lastFd || _lastFd == INVALID_SOCKET)
+ {
+ largerFd = min(largerFd, fd);
+ }
+
+ smallestFd = min(smallestFd, fd);
}
- smallestFd = min(smallestFd, fd);
- }
-
- if(largerFd <= _maxFd)
- {
- assert(largerFd >= _minFd);
- _lastFd = largerFd;
- }
- else
- {
- assert(smallestFd >= _minFd && smallestFd <= _maxFd);
- _lastFd = smallestFd;
- }
+ if(largerFd <= _maxFd)
+ {
+ assert(largerFd >= _minFd);
+ _lastFd = largerFd;
+ }
+ else
+ {
+ assert(smallestFd >= _minFd && smallestFd <= _maxFd);
+ _lastFd = smallestFd;
+ }
#else
- //
- // Round robin for the filedescriptors.
- //
- if(_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET)
- {
- _lastFd = _minFd - 1;
- }
-
- int loops = 0;
- do
- {
- if(++_lastFd > _maxFd)
+ //
+ // Round robin for the filedescriptors.
+ //
+ if(_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET)
{
- ++loops;
- _lastFd = _minFd;
+ _lastFd = _minFd - 1;
+ }
+
+ int loops = 0;
+ do
+ {
+ if(++_lastFd > _maxFd)
+ {
+ ++loops;
+ _lastFd = _minFd;
+ }
+ }
+ while(!FD_ISSET(_lastFd, &fdSet) && loops <= 1);
+
+ if(loops > 1)
+ {
+ Error out(_instance->logger());
+ out << "select() in `" << _prefix << "' returned " << ret
+ << " but no filedescriptor is readable";
+ continue;
}
- }
- while(!FD_ISSET(_lastFd, &fdSet) && loops <= 1);
-
- if(loops > 1)
- {
- Error out(_instance->logger());
- out << "select() in `" << _prefix << "' returned " << ret << " but no filedescriptor is readable";
- continue;
- }
#endif
-
- assert(_lastFd != _fdIntrRead);
-
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
- if(p == _handlerMap.end())
- {
- Error out(_instance->logger());
- out << "filedescriptor " << _lastFd << " not registered with `" << _prefix << "'";
- continue;
+
+ assert(_lastFd != _fdIntrRead);
+
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
+ if(p == _handlerMap.end())
+ {
+ Error out(_instance->logger());
+ out << "filedescriptor " << _lastFd << " not registered with `" << _prefix << "'";
+ continue;
+ }
+
+ handler = p->second;
}
-
- handler = p->second;
}
}
- assert(handler || shutdown);
+ //
+ // Now we are outside the thread synchronization.
+ //
if(shutdown)
{