summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp252
1 files changed, 133 insertions, 119 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 5c8caa70197..82ceade9ff7 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -288,7 +288,8 @@ IceInternal::ThreadPool::run()
}
EventHandlerPtr handler;
-
+ std::pair<SOCKET, bool> remove(INVALID_SOCKET, false);
+
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -321,152 +322,165 @@ IceInternal::ThreadPool::run()
//
// Handlers are permanently removed.
//
- for (vector<pair<SOCKET, bool> >::iterator p = _removes.begin(); p != _removes.end(); ++p)
+ remove = _removes.front();
+ _removes.pop_front();
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(remove.first);
+ assert(p != _handlerMap.end());
+ FD_CLR(p->first, &_fdSet);
+ handler = p->second;
+ }
+
+ if (!handler)
+ {
+//
+// Optimization for WIN32 specific version of fd_set. Looping with a
+// FD_ISSET test like for Unix is very unefficient for WIN32.
+//
+#ifdef WIN32
+ //
+ // Round robin for the filedescriptors.
+ //
+ if (fdSet.fd_count == 0)
{
- map<SOCKET, EventHandlerPtr>::iterator q = _handlerMap.find(p->first);
- assert(q != _handlerMap.end());
- FD_CLR(p->first, &_fdSet);
- if (p->second) // Call finished() on the handler?
- {
- q->second->finished();
- }
- if (q->second->server())
+ Error out(_logger);
+ out << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
+ goto repeatSelect;
+ }
+
+ SOCKET largerFd = _maxFd + 1;
+ SOCKET smallestFd = _maxFd + 1;
+ for (u_short i = 0; i < fdSet.fd_count; ++i)
+ {
+ SOCKET fd = fdSet.fd_array[i];
+ assert(fd != INVALID_SOCKET);
+
+ if (fd > _lastFd || _lastFd == INVALID_SOCKET)
{
- --_servers;
+ largerFd = min(largerFd, fd);
}
- _handlerMap.erase(q);
+
+ smallestFd = min(smallestFd, fd);
}
- _removes.clear();
- _maxFd = _fdIntrRead;
- _minFd = _fdIntrRead;
- if (!_handlerMap.empty())
+
+ if (largerFd <= _maxFd)
{
- _maxFd = max(_maxFd, (--_handlerMap.end())->first);
- _minFd = min(_minFd, _handlerMap.begin()->first);
+ assert(largerFd >= _minFd);
+ _lastFd = largerFd;
}
- if (_handlerMap.empty() || _servers == 0)
+ else
{
- notifyAll(); // For waitUntil...Finished() methods.
+ assert(smallestFd >= _minFd && smallestFd <= _maxFd);
+ _lastFd = smallestFd;
}
-
+#else
//
- // Selected filedescriptors may have changed, I
- // therefore need to repeat the select().
+ // Round robin for the filedescriptors.
//
- shutdown = clearInterrupt();
- goto repeatSelect;
- }
-
-//
-// Optimization for WIN32 specific version of fd_set. Looping with a
-// FD_ISSET test like for Unix is very unefficient for WIN32.
-//
-#ifdef WIN32
- //
- // Round robin for the filedescriptors.
- //
- if (fdSet.fd_count == 0)
- {
- Error out(_logger);
- out << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
- goto repeatSelect;
- }
-
- SOCKET largerFd = _maxFd + 1;
- SOCKET smallestFd = _maxFd + 1;
- for (u_short i = 0; i < fdSet.fd_count; ++i)
- {
- SOCKET fd = fdSet.fd_array[i];
- assert(fd != INVALID_SOCKET);
-
- if (fd > _lastFd || _lastFd == INVALID_SOCKET)
+ if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET)
{
- largerFd = min(largerFd, fd);
+ _lastFd = _minFd - 1;
}
-
- smallestFd = min(smallestFd, fd);
+
+ int loops = 0;
+ do
+ {
+ if (++_lastFd > _maxFd)
+ {
+ ++loops;
+ _lastFd = _minFd;
+ }
+ }
+ while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1);
+
+ if (loops > 1)
+ {
+ Error out(_logger);
+ out << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
+ goto repeatSelect;
+ }
+#endif
+
+ if (_lastFd == _fdIntrRead)
+ {
+ shutdown = clearInterrupt();
+ goto repeatSelect;
+ }
+
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
+ if(p == _handlerMap.end())
+ {
+ Error out(_logger);
+ out << "filedescriptor " << _lastFd << " not registered with the thread pool";
+ goto repeatSelect;
+ }
+
+ handler = p->second;
}
+ }
+
+ assert(handler);
- if (largerFd <= _maxFd)
- {
- assert(largerFd >= _minFd);
- _lastFd = largerFd;
- }
- else
- {
- assert(smallestFd >= _minFd && smallestFd <= _maxFd);
- _lastFd = smallestFd;
- }
-#else
+ if (remove.first != INVALID_SOCKET)
+ {
//
- // Round robin for the filedescriptors.
+ // Call finished() on a handler if necessary.
//
- if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET)
+ if (remove.second)
{
- _lastFd = _minFd - 1;
+ handler->finished();
}
-
- int loops = 0;
- do
+
{
- if (++_lastFd > _maxFd)
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(remove.first);
+ assert(p != _handlerMap.end());
+ _handlerMap.erase(p);
+ _maxFd = _fdIntrRead;
+ _minFd = _fdIntrRead;
+ if (!_handlerMap.empty())
{
- ++loops;
- _lastFd = _minFd;
+ _maxFd = max(_maxFd, (--_handlerMap.end())->first);
+ _minFd = min(_minFd, _handlerMap.begin()->first);
+ }
+ if (handler->server())
+ {
+ --_servers;
+ }
+ if (_handlerMap.empty() || _servers == 0)
+ {
+ notifyAll(); // For waitUntil...Finished() methods.
}
}
- while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1);
-
- if (loops > 1)
- {
- Error out(_logger);
- out << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
- goto repeatSelect;
- }
-#endif
-
- if (_lastFd == _fdIntrRead)
- {
- shutdown = clearInterrupt();
- goto repeatSelect;
- }
-
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
- if(p == _handlerMap.end())
- {
- Error out(_logger);
- out << "filedescriptor " << _lastFd << " not registered with the thread pool";
- goto repeatSelect;
- }
-
- handler = p->second;
}
-
- //
- // If the handler is "readable", try to read a message.
- //
- BasicStream stream(_instance);
- if (handler->readable())
+ else
{
- try
- {
- read(handler);
- }
- catch (const TimeoutException&) // Expected.
- {
- goto repeatSelect;
- }
- catch (const LocalException& ex)
+ //
+ // If the handler is "readable", try to read a message.
+ //
+ BasicStream stream(_instance);
+ if (handler->readable())
{
- handler->exception(ex);
- goto repeatSelect;
+ try
+ {
+ read(handler);
+ }
+ catch (const TimeoutException&) // Expected.
+ {
+ goto repeatSelect;
+ }
+ catch (const LocalException& ex)
+ {
+ handler->exception(ex);
+ goto repeatSelect;
+ }
+
+ stream.swap(handler->_stream);
+ assert(stream.i == stream.b.end());
}
- stream.swap(handler->_stream);
- assert(stream.i == stream.b.end());
+ handler->message(stream);
}
-
- handler->message(stream);
}
}