summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2001-12-11 15:48:11 +0000
committerMarc Laukien <marc@zeroc.com>2001-12-11 15:48:11 +0000
commitc3253a00787b7c2dc84c81b269bc368ec2a2043f (patch)
tree747703a29c3cb487f556add7e00dd6b30866c3da /cpp/src/Ice/ThreadPool.cpp
parentfixes (diff)
downloadice-c3253a00787b7c2dc84c81b269bc368ec2a2043f.tar.bz2
ice-c3253a00787b7c2dc84c81b269bc368ec2a2043f.tar.xz
ice-c3253a00787b7c2dc84c81b269bc368ec2a2043f.zip
fixes
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp384
1 files changed, 190 insertions, 194 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 5ed4d3eaecc..a2bd4415d25 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -251,236 +251,232 @@ IceInternal::ThreadPool::run()
{
_threadMutex.lock();
- while (true)
+ repeatSelect:
+
+ if (shutdown) // Shutdown has been initiated.
+ {
+ shutdown = false;
+ _instance->objectAdapterFactory()->shutdown();
+ }
+
+ fd_set fdSet;
+ memcpy(&fdSet, &_fdSet, sizeof(fd_set));
+ int ret;
+ if (_timeout)
{
- if (shutdown) // Shutdown has been initiated.
+ struct timeval tv;
+ tv.tv_sec = _timeout;
+ tv.tv_usec = 0;
+ ret = ::select(_maxFd + 1, &fdSet, 0, 0, &tv);
+ }
+ else
+ {
+ ret = ::select(_maxFd + 1, &fdSet, 0, 0, 0);
+ }
+
+ if (ret == 0) // Timeout.
+ {
+ assert(_timeout);
+ _timeout = 0;
+ shutdown = true;
+ goto repeatSelect;
+ }
+
+ if (ret == SOCKET_ERROR)
+ {
+ if (interrupted())
{
- shutdown = false;
- _instance->objectAdapterFactory()->shutdown();
+ goto repeatSelect;
}
- fd_set fdSet;
- memcpy(&fdSet, &_fdSet, sizeof(fd_set));
- int ret;
- if (_timeout)
- {
- struct timeval tv;
- tv.tv_sec = _timeout;
- tv.tv_usec = 0;
- ret = ::select(_maxFd + 1, &fdSet, 0, 0, &tv);
- }
- else
- {
- ret = ::select(_maxFd + 1, &fdSet, 0, 0, 0);
- }
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+
+ EventHandlerPtr handler;
+
+ {
+ JTCSyncT<JTCMonitorT<JTCMutex> > sync(*this);
- if (ret == 0) // Timeout.
+ if (_destroyed)
{
- assert(_timeout);
- _timeout = 0;
- shutdown = true;
- continue;
+ //
+ // Don't clear the interrupt fd if destroyed, so that
+ // the other threads exit as well.
+ //
+ return;
}
-
- if (ret == SOCKET_ERROR)
+
+ if (!_adds.empty())
{
- if (interrupted())
+ //
+ // New handlers have been added.
+ //
+ for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p)
{
- continue;
+ _handlerMap.insert(*p);
+ FD_SET(p->first, &_fdSet);
+ _maxFd = max(_maxFd, p->first);
+ _minFd = min(_minFd, p->first);
}
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ _adds.clear();
}
- EventHandlerPtr handler;
-
+ if (!_removes.empty())
{
- JTCSyncT<JTCMonitorT<JTCMutex> > sync(*this);
-
- if (_destroyed)
- {
- //
- // Don't clear the interrupt fd if destroyed, so that
- // the other threads exit as well.
- //
- return;
- }
-
- if (!_adds.empty())
- {
- //
- // New handlers have been added.
- //
- for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p)
- {
- _handlerMap.insert(*p);
- FD_SET(p->first, &_fdSet);
- _maxFd = max(_maxFd, p->first);
- _minFd = min(_minFd, p->first);
- }
- _adds.clear();
- }
-
- if (!_removes.empty())
- {
- //
- // Handlers are permanently removed.
- //
- for (vector<pair<SOCKET, bool> >::iterator p = _removes.begin(); p != _removes.end(); ++p)
- {
- map<SOCKET, EventHandlerPtr>::iterator q = _handlerMap.find(p->first);
- assert(q != _handlerMap.end());
- FD_CLR(p->first, &_fdSet);
- if (p->second) // Call finished() on the handler?
- {
- q->second->finished();
- }
- if (q->second->server())
- {
- --_servers;
- }
- _handlerMap.erase(q);
- }
- _removes.clear();
- _maxFd = _fdIntrRead;
- _minFd = _fdIntrRead;
- if (!_handlerMap.empty())
- {
- _maxFd = max(_maxFd, (--_handlerMap.end())->first);
- _minFd = min(_minFd, _handlerMap.begin()->first);
- }
- if (_handlerMap.empty() || _servers == 0)
- {
- notifyAll(); // For waitUntil...Finished() methods.
- }
-
- //
- // Selected filedescriptors may have changed, I
- // therefore need to repeat the select().
- //
- shutdown = clearInterrupt();
- continue;
- }
-
-//
-// Optimization for WIN32 specific version of fd_set. Looping with a
-// FD_ISSET test like for Unix is very unefficient for WIN32.
-//
-#ifdef WIN32
//
- // Round robin for the filedescriptors.
+ // Handlers are permanently removed.
//
- if (fdSet.fd_count == 0)
- {
- ostringstream s;
- s << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
- _instance->logger()->error(s.str());
- continue;
- }
-
- SOCKET largerFd = _maxFd + 1;
- SOCKET smallestFd = _maxFd + 1;
- for (u_short i = 0; i < fdSet.fd_count; ++i)
+ for (vector<pair<SOCKET, bool> >::iterator p = _removes.begin(); p != _removes.end(); ++p)
{
- SOCKET fd = fdSet.fd_array[i];
- assert(fd != INVALID_SOCKET);
-
- if (fd > _lastFd || _lastFd == INVALID_SOCKET)
+ map<SOCKET, EventHandlerPtr>::iterator q = _handlerMap.find(p->first);
+ assert(q != _handlerMap.end());
+ FD_CLR(p->first, &_fdSet);
+ if (p->second) // Call finished() on the handler?
{
- largerFd = min(largerFd, fd);
+ q->second->finished();
}
-
- smallestFd = min(smallestFd, fd);
- }
-
- if (largerFd <= _maxFd)
- {
- assert(largerFd >= _minFd);
- _lastFd = largerFd;
- }
- else
- {
- assert(smallestFd >= _minFd && smallestFd <= _maxFd);
- _lastFd = smallestFd;
- }
-#else
- //
- // Round robin for the filedescriptors.
- //
- if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET)
- {
- _lastFd = _minFd - 1;
- }
-
- int loops = 0;
- do
- {
- if (++_lastFd > _maxFd)
+ if (q->second->server())
{
- ++loops;
- _lastFd = _minFd;
+ --_servers;
}
+ _handlerMap.erase(q);
}
- while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1);
-
- if (loops > 1)
+ _removes.clear();
+ _maxFd = _fdIntrRead;
+ _minFd = _fdIntrRead;
+ if (!_handlerMap.empty())
{
- ostringstream s;
- s << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
- _instance->logger()->error(s.str());
- continue;
+ _maxFd = max(_maxFd, (--_handlerMap.end())->first);
+ _minFd = min(_minFd, _handlerMap.begin()->first);
}
-#endif
-
- if (_lastFd == _fdIntrRead)
+ if (_handlerMap.empty() || _servers == 0)
{
- shutdown = clearInterrupt();
- continue;
+ notifyAll(); // For waitUntil...Finished() methods.
}
-
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
- if(p == _handlerMap.end())
+
+ //
+ // Selected filedescriptors may have changed, I
+ // therefore need to repeat the select().
+ //
+ shutdown = clearInterrupt();
+ goto repeatSelect;
+ }
+
+//
+// Optimization for WIN32 specific version of fd_set. Looping with a
+// FD_ISSET test like for Unix is very unefficient for WIN32.
+//
+#ifdef WIN32
+ //
+ // Round robin for the filedescriptors.
+ //
+ if (fdSet.fd_count == 0)
+ {
+ ostringstream s;
+ s << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
+ _instance->logger()->error(s.str());
+ goto repeatSelect;
+ }
+
+ SOCKET largerFd = _maxFd + 1;
+ SOCKET smallestFd = _maxFd + 1;
+ for (u_short i = 0; i < fdSet.fd_count; ++i)
+ {
+ SOCKET fd = fdSet.fd_array[i];
+ assert(fd != INVALID_SOCKET);
+
+ if (fd > _lastFd || _lastFd == INVALID_SOCKET)
{
- ostringstream s;
- s << "filedescriptor " << _lastFd << " not registered with the thread pool";
- _instance->logger()->error(s.str());
- continue;
+ largerFd = min(largerFd, fd);
}
-
- handler = p->second;
+
+ smallestFd = min(smallestFd, fd);
}
-
- assert(handler);
+ if (largerFd <= _maxFd)
+ {
+ assert(largerFd >= _minFd);
+ _lastFd = largerFd;
+ }
+ else
+ {
+ assert(smallestFd >= _minFd && smallestFd <= _maxFd);
+ _lastFd = smallestFd;
+ }
+#else
//
- // If the handler is "readable", try to read a message.
+ // Round robin for the filedescriptors.
//
- BasicStream stream(_instance);
- if (handler->readable())
+ if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET)
{
- try
- {
- read(handler);
- }
- catch (const TimeoutException&) // Expected.
- {
- continue;
- }
- catch (const LocalException& ex)
+ _lastFd = _minFd - 1;
+ }
+
+ int loops = 0;
+ do
+ {
+ if (++_lastFd > _maxFd)
{
- handler->exception(ex);
- continue;
+ ++loops;
+ _lastFd = _minFd;
}
-
- stream.swap(handler->_stream);
- assert(stream.i == stream.b.end());
}
-
- handler->message(stream);
- break;
+ while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1);
+
+ if (loops > 1)
+ {
+ ostringstream s;
+ s << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
+ _instance->logger()->error(s.str());
+ goto repeatSelect;
+ }
+#endif
+
+ if (_lastFd == _fdIntrRead)
+ {
+ shutdown = clearInterrupt();
+ goto repeatSelect;
+ }
+
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
+ if(p == _handlerMap.end())
+ {
+ ostringstream s;
+ s << "filedescriptor " << _lastFd << " not registered with the thread pool";
+ _instance->logger()->error(s.str());
+ goto repeatSelect;
+ }
+
+ handler = p->second;
+ }
+
+ //
+ // If the handler is "readable", try to read a message.
+ //
+ BasicStream stream(_instance);
+ if (handler->readable())
+ {
+ try
+ {
+ read(handler);
+ }
+ catch (const TimeoutException&) // Expected.
+ {
+ goto repeatSelect;
+ }
+ catch (const LocalException& ex)
+ {
+ handler->exception(ex);
+ goto repeatSelect;
+ }
+
+ stream.swap(handler->_stream);
+ assert(stream.i == stream.b.end());
}
+
+ handler->message(stream);
}
}