summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp384
-rw-r--r--cpp/src/Ice/ThreadPool.h2
2 files changed, 195 insertions, 191 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 936f96907af..5ed4d3eaecc 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -251,232 +251,236 @@ IceInternal::ThreadPool::run()
{
_threadMutex.lock();
- EventHandlerPtr handler;
-
- repeatSelect:
-
- if (shutdown) // Shutdown has been initiated.
- {
- shutdown = false;
- _instance->objectAdapterFactory()->shutdown();
- }
-
- fd_set fdSet;
- memcpy(&fdSet, &_fdSet, sizeof(fd_set));
- int ret;
- if (_timeout)
- {
- struct timeval tv;
- tv.tv_sec = _timeout;
- tv.tv_usec = 0;
- ret = ::select(_maxFd + 1, &fdSet, 0, 0, &tv);
- }
- else
+ while (true)
{
- ret = ::select(_maxFd + 1, &fdSet, 0, 0, 0);
- }
-
- if (ret == 0) // Timeout.
- {
- assert(_timeout);
- _timeout = 0;
- shutdown = true;
- goto repeatSelect;
- }
-
- if (ret == SOCKET_ERROR)
- {
- if (interrupted())
+ if (shutdown) // Shutdown has been initiated.
{
- goto repeatSelect;
+ shutdown = false;
+ _instance->objectAdapterFactory()->shutdown();
}
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
-
- {
- JTCSyncT<JTCMonitorT<JTCMutex> > sync(*this);
+ fd_set fdSet;
+ memcpy(&fdSet, &_fdSet, sizeof(fd_set));
+ int ret;
+ if (_timeout)
+ {
+ struct timeval tv;
+ tv.tv_sec = _timeout;
+ tv.tv_usec = 0;
+ ret = ::select(_maxFd + 1, &fdSet, 0, 0, &tv);
+ }
+ else
+ {
+ ret = ::select(_maxFd + 1, &fdSet, 0, 0, 0);
+ }
- if (_destroyed)
+ if (ret == 0) // Timeout.
{
- //
- // Don't clear the interrupt fd if destroyed, so that
- // the other threads exit as well.
- //
- return;
+ assert(_timeout);
+ _timeout = 0;
+ shutdown = true;
+ continue;
}
-
- if (!_adds.empty())
+
+ if (ret == SOCKET_ERROR)
{
- //
- // New handlers have been added.
- //
- for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p)
+ if (interrupted())
{
- _handlerMap.insert(*p);
- FD_SET(p->first, &_fdSet);
- _maxFd = max(_maxFd, p->first);
- _minFd = min(_minFd, p->first);
+ continue;
}
- _adds.clear();
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
- if (!_removes.empty())
+ EventHandlerPtr handler;
+
{
- //
- // Handlers are permanently removed.
- //
- for (vector<pair<SOCKET, bool> >::iterator p = _removes.begin(); p != _removes.end(); ++p)
+ JTCSyncT<JTCMonitorT<JTCMutex> > sync(*this);
+
+ if (_destroyed)
{
- map<SOCKET, EventHandlerPtr>::iterator q = _handlerMap.find(p->first);
- assert(q != _handlerMap.end());
- FD_CLR(p->first, &_fdSet);
- if (p->second) // Call finished() on the handler?
- {
- q->second->finished();
- }
- if (q->second->server())
- {
- --_servers;
- }
- _handlerMap.erase(q);
+ //
+ // Don't clear the interrupt fd if destroyed, so that
+ // the other threads exit as well.
+ //
+ return;
}
- _removes.clear();
- _maxFd = _fdIntrRead;
- _minFd = _fdIntrRead;
- if (!_handlerMap.empty())
+
+ if (!_adds.empty())
{
- _maxFd = max(_maxFd, (--_handlerMap.end())->first);
- _minFd = min(_minFd, _handlerMap.begin()->first);
+ //
+ // New handlers have been added.
+ //
+ for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p)
+ {
+ _handlerMap.insert(*p);
+ FD_SET(p->first, &_fdSet);
+ _maxFd = max(_maxFd, p->first);
+ _minFd = min(_minFd, p->first);
+ }
+ _adds.clear();
}
- if (_handlerMap.empty() || _servers == 0)
+
+ if (!_removes.empty())
{
- notifyAll(); // For waitUntil...Finished() methods.
+ //
+ // Handlers are permanently removed.
+ //
+ for (vector<pair<SOCKET, bool> >::iterator p = _removes.begin(); p != _removes.end(); ++p)
+ {
+ map<SOCKET, EventHandlerPtr>::iterator q = _handlerMap.find(p->first);
+ assert(q != _handlerMap.end());
+ FD_CLR(p->first, &_fdSet);
+ if (p->second) // Call finished() on the handler?
+ {
+ q->second->finished();
+ }
+ if (q->second->server())
+ {
+ --_servers;
+ }
+ _handlerMap.erase(q);
+ }
+ _removes.clear();
+ _maxFd = _fdIntrRead;
+ _minFd = _fdIntrRead;
+ if (!_handlerMap.empty())
+ {
+ _maxFd = max(_maxFd, (--_handlerMap.end())->first);
+ _minFd = min(_minFd, _handlerMap.begin()->first);
+ }
+ if (_handlerMap.empty() || _servers == 0)
+ {
+ notifyAll(); // For waitUntil...Finished() methods.
+ }
+
+ //
+ // Selected filedescriptors may have changed, I
+ // therefore need to repeat the select().
+ //
+ shutdown = clearInterrupt();
+ continue;
}
-
- //
- // Selected filedescriptors may have changed, I
- // therefore need to repeat the select().
- //
- shutdown = clearInterrupt();
- goto repeatSelect;
- }
-
+
//
// Optimization for WIN32 specific version of fd_set. Looping with a
// FD_ISSET test like for Unix is very unefficient for WIN32.
//
#ifdef WIN32
- //
- // Round robin for the filedescriptors.
- //
- if (fdSet.fd_count == 0)
- {
- ostringstream s;
- s << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
- _instance->logger()->error(s.str());
- goto repeatSelect;
- }
-
- SOCKET largerFd = _maxFd + 1;
- SOCKET smallestFd = _maxFd + 1;
- for (u_short i = 0; i < fdSet.fd_count; ++i)
- {
- SOCKET fd = fdSet.fd_array[i];
- assert(fd != INVALID_SOCKET);
-
- if (fd > _lastFd || _lastFd == INVALID_SOCKET)
+ //
+ // Round robin for the filedescriptors.
+ //
+ if (fdSet.fd_count == 0)
{
- largerFd = min(largerFd, fd);
+ ostringstream s;
+ s << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
+ _instance->logger()->error(s.str());
+ continue;
+ }
+
+ SOCKET largerFd = _maxFd + 1;
+ SOCKET smallestFd = _maxFd + 1;
+ for (u_short i = 0; i < fdSet.fd_count; ++i)
+ {
+ SOCKET fd = fdSet.fd_array[i];
+ assert(fd != INVALID_SOCKET);
+
+ if (fd > _lastFd || _lastFd == INVALID_SOCKET)
+ {
+ largerFd = min(largerFd, fd);
+ }
+
+ smallestFd = min(smallestFd, fd);
+ }
+
+ if (largerFd <= _maxFd)
+ {
+ assert(largerFd >= _minFd);
+ _lastFd = largerFd;
+ }
+ else
+ {
+ assert(smallestFd >= _minFd && smallestFd <= _maxFd);
+ _lastFd = smallestFd;
}
-
- smallestFd = min(smallestFd, fd);
- }
-
- if (largerFd <= _maxFd)
- {
- assert(largerFd >= _minFd);
- _lastFd = largerFd;
- }
- else
- {
- assert(smallestFd >= _minFd && smallestFd <= _maxFd);
- _lastFd = smallestFd;
- }
#else
- //
- // Round robin for the filedescriptors.
- //
- if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET)
- {
- _lastFd = _minFd - 1;
- }
-
- int loops = 0;
- do
- {
- if (++_lastFd > _maxFd)
+ //
+ // Round robin for the filedescriptors.
+ //
+ if (_lastFd < _minFd - 1 || _lastFd == INVALID_SOCKET)
{
- ++loops;
- _lastFd = _minFd;
+ _lastFd = _minFd - 1;
+ }
+
+ int loops = 0;
+ do
+ {
+ if (++_lastFd > _maxFd)
+ {
+ ++loops;
+ _lastFd = _minFd;
+ }
+ }
+ while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1);
+
+ if (loops > 1)
+ {
+ ostringstream s;
+ s << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
+ _instance->logger()->error(s.str());
+ continue;
}
- }
- while (!FD_ISSET(_lastFd, &fdSet) && loops <= 1);
-
- if (loops > 1)
- {
- ostringstream s;
- s << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
- _instance->logger()->error(s.str());
- goto repeatSelect;
- }
#endif
-
- if (_lastFd == _fdIntrRead)
- {
- shutdown = clearInterrupt();
- goto repeatSelect;
- }
-
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
- if(p == _handlerMap.end())
- {
- ostringstream s;
- s << "filedescriptor " << _lastFd << " not registered with the thread pool";
- _instance->logger()->error(s.str());
- goto repeatSelect;
+
+ if (_lastFd == _fdIntrRead)
+ {
+ shutdown = clearInterrupt();
+ continue;
+ }
+
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
+ if(p == _handlerMap.end())
+ {
+ ostringstream s;
+ s << "filedescriptor " << _lastFd << " not registered with the thread pool";
+ _instance->logger()->error(s.str());
+ continue;
+ }
+
+ handler = p->second;
}
- handler = p->second;
- }
+ assert(handler);
- //
- // If the handler is "readable", try to read a message.
- //
- BasicStream stream(_instance);
- if (handler->readable())
- {
- try
- {
- read(handler);
- }
- catch (const TimeoutException&) // Expected.
- {
- goto repeatSelect;
- }
- catch (const LocalException& ex)
+ //
+ // If the handler is "readable", try to read a message.
+ //
+ BasicStream stream(_instance);
+ if (handler->readable())
{
- handler->exception(ex);
- goto repeatSelect;
+ try
+ {
+ read(handler);
+ }
+ catch (const TimeoutException&) // Expected.
+ {
+ continue;
+ }
+ catch (const LocalException& ex)
+ {
+ handler->exception(ex);
+ continue;
+ }
+
+ stream.swap(handler->_stream);
+ assert(stream.i == stream.b.end());
}
-
- stream.swap(handler->_stream);
- assert(stream.i == stream.b.end());
- }
- handler->message(stream);
+ handler->message(stream);
+ break;
+ }
}
}
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h
index 7a5932dc813..2c90624a865 100644
--- a/cpp/src/Ice/ThreadPool.h
+++ b/cpp/src/Ice/ThreadPool.h
@@ -72,7 +72,7 @@ private:
{
public:
- EventHandlerThread(ThreadPoolPtr pool) : _pool(pool) { }
+ EventHandlerThread(const ThreadPoolPtr& pool) : _pool(pool) { }
virtual void run();
private: