summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2001-12-10 17:38:20 +0000
committerMarc Laukien <marc@zeroc.com>2001-12-10 17:38:20 +0000
commit28968ed975c12236878f2dd03d73cdd1d898fbf3 (patch)
treefd8d645921d016c3513db1fcb4be7f9fe2ed5c3d /cpp/src/Ice/ThreadPool.cpp
parentfix (diff)
downloadice-28968ed975c12236878f2dd03d73cdd1d898fbf3.tar.bz2
ice-28968ed975c12236878f2dd03d73cdd1d898fbf3.tar.xz
ice-28968ed975c12236878f2dd03d73cdd1d898fbf3.zip
changes
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp262
1 files changed, 91 insertions, 171 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index c27f3f9864a..a5cd8a6de34 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -252,7 +252,6 @@ IceInternal::ThreadPool::run()
_threadMutex.lock();
EventHandlerPtr handler;
- bool reap;
repeatSelect:
@@ -308,62 +307,7 @@ IceInternal::ThreadPool::run()
//
return;
}
-
- bool interrupt = false;
-
-//
-// Optimization for WIN32 specific version of fd_set. Looping with a
-// FD_ISSET test like for Unix is very unefficient for WIN32.
-//
-#ifdef WIN32
- //
- // Round robin for the filedescriptors. The interrupt
- // filedescriptor has priority.
- //
- assert(fdSet.fd_count > 0);
- SOCKET largerFd = _maxFd + 1;
- SOCKET smallestFd = _maxFd + 1;
- for (u_short i = 0; i < fdSet.fd_count; ++i)
- {
- SOCKET fd = fdSet.fd_array[i];
- assert(fd != INVALID_SOCKET);
- if (fd == _fdIntrRead)
- {
- shutdown = clearInterrupt();
- interrupt = true;
- break;
- }
-
- if (_lastFd == INVALID_SOCKET || fd > _lastFd)
- {
- largerFd = min(largerFd, fd);
- }
-
- smallestFd = min(smallestFd, fd);
- }
-
- if (!interrupt)
- {
- if (largerFd <= _maxFd)
- {
- assert(largerFd >= _minFd);
- _lastFd = largerFd;
- }
- else
- {
- assert(smallestFd >= _minFd && smallestFd <= _maxFd);
- _lastFd = smallestFd;
- }
- }
-#else
- if (FD_ISSET(_fdIntrRead, &fdSet))
- {
- shutdown = clearInterrupt();
- interrupt = true;
- }
-#endif
-
if (!_adds.empty())
{
//
@@ -371,8 +315,7 @@ IceInternal::ThreadPool::run()
//
for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p)
{
- _reapList.push_front(p->first);
- _handlerMap[p->first] = make_pair(p->second, _reapList.begin());
+ _handlerMap.insert(*p);
FD_SET(p->first, &_fdSet);
_maxFd = max(_maxFd, p->first);
_minFd = min(_minFd, p->first);
@@ -387,16 +330,14 @@ IceInternal::ThreadPool::run()
//
for (vector<SOCKET>::iterator p = _removes.begin(); p != _removes.end(); ++p)
{
- map<SOCKET, pair<EventHandlerPtr, list<SOCKET>::iterator> >::iterator q = _handlerMap.find(*p);
+ map<SOCKET, EventHandlerPtr>::iterator q = _handlerMap.find(*p);
assert(q != _handlerMap.end());
FD_CLR(*p, &_fdSet);
- q->second.first->finished();
- if (q->second.first->server())
+ q->second->finished();
+ if (q->second->server())
{
--_servers;
}
-
- _reapList.erase(q->second.second);
_handlerMap.erase(q);
}
_removes.clear();
@@ -404,7 +345,7 @@ IceInternal::ThreadPool::run()
_minFd = _fdIntrRead;
if (!_handlerMap.empty())
{
- _maxFd = max(_maxFd, (--_handlerMap.end())->first);
+ _maxFd = max(_maxFd, _handlerMap.rbegin()->first);
_minFd = min(_minFd, _handlerMap.begin()->first);
}
if (_handlerMap.empty() || _servers == 0)
@@ -412,136 +353,120 @@ IceInternal::ThreadPool::run()
notifyAll(); // For waitUntil...Finished() methods.
}
}
-
- if (interrupt)
- {
- goto repeatSelect;
- }
+//
+// Optimization for WIN32 specific version of fd_set. Looping with a
+// FD_ISSET test like for Unix is very unefficient for WIN32.
+//
+#ifdef WIN32
//
- // Check if there are connections to reap.
+ // Round robin for the filedescriptors.
//
- reap = false;
- // _handlerMap.size() is faster than _reapList() with most STLs.
- if (_maxConnections > 0 && _handlerMap.size() > static_cast<list<SOCKET>::size_type>(_maxConnections))
+ if (fdSet.fd_count == 0)
{
- for (list<SOCKET>::reverse_iterator p = _reapList.rbegin(); p != _reapList.rend(); ++p)
- {
- SOCKET fd = *p;
- if (fd != -1)
- {
- _reapList.pop_back();
- _reapList.push_front(-1);
- map<SOCKET, pair<EventHandlerPtr, list<SOCKET>::iterator> >::iterator q = _handlerMap.find(fd);
- q->second.second = _reapList.begin();
- handler = q->second.first;
- reap = true;
- break;
- }
- }
+ ostringstream s;
+ s << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
+ _instance->logger()->error(s.str());
+ goto repeatSelect;
}
- if (!reap)
+ SOCKET largerFd = _maxFd + 1;
+ SOCKET smallestFd = _maxFd + 1;
+ for (u_short i = 0; i < fdSet.fd_count; ++i)
{
-#ifndef WIN32
- //
- // Round robin for the filedescriptors.
- //
- int loops = 0;
+ SOCKET fd = fdSet.fd_array[i];
+ assert(fd != INVALID_SOCKET);
- if (_lastFd < _minFd - 1)
+ if (fd > _lastFd || _lastFd == INVALID_SOCKET)
{
- _lastFd = _minFd - 1;
+ largerFd = min(largerFd, fd);
}
- do
- {
- if (++_lastFd > _maxFd)
- {
- ++loops;
- _lastFd = _minFd;
- }
- }
- while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1);
+ smallestFd = min(smallestFd, fd);
+ }
+
+ if (largerFd <= _maxFd)
+ {
+ assert(largerFd >= _minFd);
+ _lastFd = largerFd;
+ }
+ else
+ {
+ assert(smallestFd >= _minFd && smallestFd <= _maxFd);
+ _lastFd = smallestFd;
+ }
+#else
+ //
+ // Round robin for the filedescriptors.
+ //
+ if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET)
+ {
+ _lastFd = _minFd - 1;
+ }
- if (loops > 1)
+ int loops = 0;
+ do
+ {
+ if (++_lastFd > _maxFd)
{
- ostringstream s;
- s << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
- _instance->logger()->error(s.str());
- goto repeatSelect;
+ ++loops;
+ _lastFd = _minFd;
}
+ }
+ while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1);
+
+ if (loops > 1)
+ {
+ ostringstream s;
+ s << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
+ _instance->logger()->error(s.str());
+ goto repeatSelect;
+ }
#endif
-
- map<SOCKET, pair<EventHandlerPtr, list<SOCKET>::iterator> >::iterator p = _handlerMap.find(_lastFd);
- if(p == _handlerMap.end())
- {
- ostringstream s;
- s << "filedescriptor " << _lastFd << " not registered with the thread pool";
- _instance->logger()->error(s.str());
- goto repeatSelect;
- }
-
- //
- // Make the fd for the handler the most recently used one
- // by moving it to the beginning of the the reap list.
- //
- if (p->second.second != _reapList.begin())
- {
- _reapList.erase(p->second.second);
- _reapList.push_front(p->first);
- p->second.second = _reapList.begin();
- }
-
- handler = p->second.first;
+
+ if (_lastFd == _fdIntrRead)
+ {
+ shutdown = clearInterrupt();
+ goto repeatSelect;
+ }
+
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
+ if(p == _handlerMap.end())
+ {
+ ostringstream s;
+ s << "filedescriptor " << _lastFd << " not registered with the thread pool";
+ _instance->logger()->error(s.str());
+ goto repeatSelect;
}
+
+ handler = p->second;
}
- if (reap)
+ //
+ // If the handler is "readable", try to read a message.
+ //
+ BasicStream stream(_instance);
+ if (handler->readable())
{
- //
- // Reap the handler.
- //
try
{
- if (!handler->tryDestroy())
- {
- goto repeatSelect;
- }
+ read(handler);
}
- catch (const LocalException&)
+ catch (const TimeoutException&) // Expected.
{
- // Ignore exeptions.
+ goto repeatSelect;
}
- }
- else
- {
- //
- // If the handler is "readable", try to read a message.
- //
- BasicStream stream(_instance);
- if (handler->readable())
+ catch (const LocalException& ex)
{
- try
- {
- read(handler);
- }
- catch (const TimeoutException&) // Expected.
- {
- goto repeatSelect;
- }
- catch (const LocalException& ex)
- {
- handler->exception(ex);
- goto repeatSelect;
- }
-
- stream.swap(handler->_stream);
- assert(stream.i == stream.b.end());
+ handler->exception(ex);
+ goto repeatSelect;
}
- handler->message(stream);
+ stream.swap(handler->_stream);
+ assert(stream.i == stream.b.end());
}
+
+ handler->message(stream);
}
}
@@ -600,11 +525,6 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler)
}
void
-IceInternal::ThreadPool::reapConnections()
-{
-}
-
-void
IceInternal::ThreadPool::EventHandlerThread::run()
{
try