summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2002-04-13 14:20:11 +0000
committerMarc Laukien <marc@zeroc.com>2002-04-13 14:20:11 +0000
commit14eb5a90dadb90493192b35dcc241068a9c99550 (patch)
treeb565a551fbfa6163de72015135b424c531a40930 /cpp/src/Ice/ThreadPool.cpp
parentbrought getLocalHost() back (diff)
downloadice-14eb5a90dadb90493192b35dcc241068a9c99550.tar.bz2
ice-14eb5a90dadb90493192b35dcc241068a9c99550.tar.xz
ice-14eb5a90dadb90493192b35dcc241068a9c99550.zip
fixes for the thread pool
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp243
1 files changed, 164 insertions, 79 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 6b7827663aa..b149265b717 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -34,16 +34,42 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
{
++_servers;
}
- _adds.push_back(make_pair(fd, handler));
- setInterrupt();
+ else
+ {
+ ++_clients;
+ }
+ _changes.push_back(make_pair(fd, handler));
+ setInterrupt(0);
+}
+
+void
+IceInternal::ThreadPool::unregister(SOCKET fd)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ _changes.push_back(make_pair(fd, EventHandlerPtr(0)));
+ setInterrupt(0);
+}
+
+void
+IceInternal::ThreadPool::serverIsNowClient()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ ++_clients;
+ assert(_servers > 0);
+ --_servers;
+ if (_servers == 0)
+ {
+ notifyAll(); // For waitUntil...Finished() methods.
+ }
}
void
-IceInternal::ThreadPool::unregister(SOCKET fd, bool callFinished)
+IceInternal::ThreadPool::clientIsNowServer()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- _removes.push_back(make_pair(fd, callFinished));
- setInterrupt();
+ ++_servers;
+ assert(_clients > 0);
+ --_clients;
}
void
@@ -55,12 +81,7 @@ IceInternal::ThreadPool::promoteFollower()
void
IceInternal::ThreadPool::initiateServerShutdown()
{
- char c = 1;
-#ifdef _WIN32
- ::send(_fdIntrWrite, &c, 1, 0);
-#else
- ::write(_fdIntrWrite, &c, 1);
-#endif
+ setInterrupt(1);
}
void
@@ -86,17 +107,21 @@ IceInternal::ThreadPool::waitUntilFinished()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- while (!_handlerMap.empty() && _threadNum != 0)
+ while (_clients + _servers != 0 && _threadNum != 0)
{
wait();
}
- if (!_handlerMap.empty())
+ if (_clients + _servers != 0)
{
Error out(_logger);
out << "can't wait for graceful application termination in thread pool\n"
<< "since all threads have vanished";
}
+ else
+ {
+ assert(_handlerMap.empty());
+ }
}
void
@@ -141,6 +166,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) :
_properties(_instance->properties()),
_destroyed(false),
_lastFd(INVALID_SOCKET),
+ _clients(0),
_servers(0),
_timeout(0)
{
@@ -197,37 +223,74 @@ IceInternal::ThreadPool::destroy()
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
_destroyed = true;
- setInterrupt();
+ setInterrupt(0);
}
bool
IceInternal::ThreadPool::clearInterrupt()
{
- bool shutdown = false;
char c;
+
+repeat:
+
#ifdef _WIN32
- while (::recv(_fdIntrRead, &c, 1, 0) == 1)
-#else
- while (::read(_fdIntrRead, &c, 1) == 1)
-#endif
+ if (::recv(_fdIntrRead, &c, 1, 0) == SOCKET_ERROR)
{
- if (c == 1) // Shutdown initiated?
+ if (interrupted())
{
- shutdown = true;
+ goto repeat;
}
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
+#else
+ if (::read(_fdIntrRead, &c, 1) == -1)
+ {
+ if (interrupted())
+ {
+ goto repeat;
+ }
- return shutdown;
+ SystemException ex(__FILE__, __LINE__);
+ ex.error = getSystemErrno();
+ throw ex;
+ }
+#endif
+
+ return c == 1; // Return true if shutdown has been initiated.
}
void
-IceInternal::ThreadPool::setInterrupt()
+IceInternal::ThreadPool::setInterrupt(char c)
{
- char c = 0;
+repeat:
+
#ifdef _WIN32
- ::send(_fdIntrWrite, &c, 1, 0);
+ if (::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR)
+ {
+ if (interrupted())
+ {
+ goto repeat;
+ }
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
#else
- ::write(_fdIntrWrite, &c, 1);
+ if (::write(_fdIntrWrite, &c, 1) == -1)
+ {
+ if (interrupted())
+ {
+ goto repeat;
+ }
+
+ SystemException ex(__FILE__, __LINE__);
+ ex.error = getSystemErrno();
+ throw ex;
+ }
#endif
}
@@ -288,49 +351,82 @@ IceInternal::ThreadPool::run()
}
EventHandlerPtr handler;
- std::pair<SOCKET, bool> remove(INVALID_SOCKET, false);
+ bool finished = false;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if (_destroyed)
+
+ if (FD_ISSET(_fdIntrRead, &fdSet))
{
//
- // Don't clear the interrupt fd if destroyed, so that
- // the other threads exit as well.
+ // There are three possiblities for an interrupt:
//
- return;
- }
+ // - The thread pool has been destroyed.
+ //
+ // - Server shutdown has been initiated.
+ //
+ // - An event handler was registered or unregistered.
+ //
+
+ //
+ // Thread pool destroyed?
+ //
+ if (_destroyed)
+ {
+ //
+ // Don't clear the interrupt if destroyed, so that
+ // the other threads exit as well.
+ //
+ return;
+ }
+
+ shutdown = clearInterrupt();
- if (!_adds.empty())
- {
//
- // New handlers have been added.
+ // Server shutdown?
//
- for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p)
+ if (shutdown)
{
- _handlerMap.insert(*p);
- FD_SET(p->first, &_fdSet);
- _maxFd = max(_maxFd, p->first);
- _minFd = min(_minFd, p->first);
+ goto repeatSelect;
}
- _adds.clear();
- }
-
- if (!_removes.empty())
- {
+
//
- // Handlers are permanently removed.
+ // An event handler must have been registered or
+ // unregistered.
//
- remove = _removes.front();
- _removes.pop_front();
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(remove.first);
- assert(p != _handlerMap.end());
- FD_CLR(p->first, &_fdSet);
- handler = p->second;
+ assert(!_changes.empty());
+ pair<SOCKET, EventHandlerPtr> change = _changes.front();
+ _changes.pop_front();
+
+ if (change.second) // Addition if handler is set.
+ {
+ _handlerMap.insert(change);
+ FD_SET(change.first, &_fdSet);
+ _maxFd = max(_maxFd, change.first);
+ _minFd = min(_minFd, change.first);
+ goto repeatSelect;
+ }
+ else // Removal if handler is not set.
+ {
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first);
+ assert(p != _handlerMap.end());
+ handler = p->second;
+ finished = true;
+ _handlerMap.erase(p);
+ FD_CLR(change.first, &_fdSet);
+ _maxFd = _fdIntrRead;
+ _minFd = _fdIntrRead;
+ if (!_handlerMap.empty())
+ {
+ _maxFd = max(_maxFd, (--_handlerMap.end())->first);
+ _minFd = min(_minFd, _handlerMap.begin()->first);
+ }
+ // Don't goto repeatSelect; we have to call
+ // finished() on the event handler below, outside
+ // the thread synchronization.
+ }
}
-
- if (!handler)
+ else
{
//
// Optimization for WIN32 specific version of fd_set. Looping with a
@@ -400,11 +496,7 @@ IceInternal::ThreadPool::run()
}
#endif
- if (_lastFd == _fdIntrRead)
- {
- shutdown = clearInterrupt();
- goto repeatSelect;
- }
+ assert(_lastFd != _fdIntrRead);
map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
if(p == _handlerMap.end())
@@ -420,34 +512,27 @@ IceInternal::ThreadPool::run()
assert(handler);
- if (remove.first != INVALID_SOCKET)
+ if (finished)
{
//
- // Call finished() on a handler if necessary.
+ // Notify a handler about it's removal from the thread
+ // pool.
//
- if (remove.second)
- {
- handler->finished();
- }
-
+ handler->finished();
+
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(remove.first);
- assert(p != _handlerMap.end());
- _handlerMap.erase(p);
- _maxFd = _fdIntrRead;
- _minFd = _fdIntrRead;
- if (!_handlerMap.empty())
- {
- _maxFd = max(_maxFd, (--_handlerMap.end())->first);
- _minFd = min(_minFd, _handlerMap.begin()->first);
- }
if (handler->server())
{
+ assert(_servers > 0);
--_servers;
}
- if (_handlerMap.empty() || _servers == 0)
+ else
+ {
+ assert(_clients > 0);
+ --_clients;
+ }
+ if (_clients == 0 || _servers == 0)
{
notifyAll(); // For waitUntil...Finished() methods.
}