diff options
author | Marc Laukien <marc@zeroc.com> | 2002-04-13 14:20:11 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2002-04-13 14:20:11 +0000 |
commit | 14eb5a90dadb90493192b35dcc241068a9c99550 (patch) | |
tree | b565a551fbfa6163de72015135b424c531a40930 /cpp/src/Ice/ThreadPool.cpp | |
parent | brought getLocalHost() back (diff) | |
download | ice-14eb5a90dadb90493192b35dcc241068a9c99550.tar.bz2 ice-14eb5a90dadb90493192b35dcc241068a9c99550.tar.xz ice-14eb5a90dadb90493192b35dcc241068a9c99550.zip |
fixes for the thread pool
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 243 |
1 files changed, 164 insertions, 79 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 6b7827663aa..b149265b717 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -34,16 +34,42 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler) { ++_servers; } - _adds.push_back(make_pair(fd, handler)); - setInterrupt(); + else + { + ++_clients; + } + _changes.push_back(make_pair(fd, handler)); + setInterrupt(0); +} + +void +IceInternal::ThreadPool::unregister(SOCKET fd) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + _changes.push_back(make_pair(fd, EventHandlerPtr(0))); + setInterrupt(0); +} + +void +IceInternal::ThreadPool::serverIsNowClient() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + ++_clients; + assert(_servers > 0); + --_servers; + if (_servers == 0) + { + notifyAll(); // For waitUntil...Finished() methods. + } } void -IceInternal::ThreadPool::unregister(SOCKET fd, bool callFinished) +IceInternal::ThreadPool::clientIsNowServer() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - _removes.push_back(make_pair(fd, callFinished)); - setInterrupt(); + ++_servers; + assert(_clients > 0); + --_clients; } void @@ -55,12 +81,7 @@ IceInternal::ThreadPool::promoteFollower() void IceInternal::ThreadPool::initiateServerShutdown() { - char c = 1; -#ifdef _WIN32 - ::send(_fdIntrWrite, &c, 1, 0); -#else - ::write(_fdIntrWrite, &c, 1); -#endif + setInterrupt(1); } void @@ -86,17 +107,21 @@ IceInternal::ThreadPool::waitUntilFinished() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - while (!_handlerMap.empty() && _threadNum != 0) + while (_clients + _servers != 0 && _threadNum != 0) { wait(); } - if (!_handlerMap.empty()) + if (_clients + _servers != 0) { Error out(_logger); out << "can't wait for graceful application termination in thread pool\n" << "since all threads have vanished"; } + else + { + assert(_handlerMap.empty()); + } } void @@ -141,6 +166,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) : _properties(_instance->properties()), _destroyed(false), _lastFd(INVALID_SOCKET), + _clients(0), _servers(0), _timeout(0) { @@ -197,37 +223,74 @@ IceInternal::ThreadPool::destroy() IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(!_destroyed); _destroyed = true; - setInterrupt(); + setInterrupt(0); } bool IceInternal::ThreadPool::clearInterrupt() { - bool shutdown = false; char c; + +repeat: + #ifdef _WIN32 - while (::recv(_fdIntrRead, &c, 1, 0) == 1) -#else - while (::read(_fdIntrRead, &c, 1) == 1) -#endif + if (::recv(_fdIntrRead, &c, 1, 0) == SOCKET_ERROR) { - if (c == 1) // Shutdown initiated? + if (interrupted()) { - shutdown = true; + goto repeat; } + + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; } +#else + if (::read(_fdIntrRead, &c, 1) == -1) + { + if (interrupted()) + { + goto repeat; + } - return shutdown; + SystemException ex(__FILE__, __LINE__); + ex.error = getSystemErrno(); + throw ex; + } +#endif + + return c == 1; // Return true if shutdown has been initiated. } void -IceInternal::ThreadPool::setInterrupt() +IceInternal::ThreadPool::setInterrupt(char c) { - char c = 0; +repeat: + #ifdef _WIN32 - ::send(_fdIntrWrite, &c, 1, 0); + if (::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR) + { + if (interrupted()) + { + goto repeat; + } + + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } #else - ::write(_fdIntrWrite, &c, 1); + if (::write(_fdIntrWrite, &c, 1) == -1) + { + if (interrupted()) + { + goto repeat; + } + + SystemException ex(__FILE__, __LINE__); + ex.error = getSystemErrno(); + throw ex; + } #endif } @@ -288,49 +351,82 @@ IceInternal::ThreadPool::run() } EventHandlerPtr handler; - std::pair<SOCKET, bool> remove(INVALID_SOCKET, false); + bool finished = false; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if (_destroyed) + + if (FD_ISSET(_fdIntrRead, &fdSet)) { // - // Don't clear the interrupt fd if destroyed, so that - // the other threads exit as well. + // There are three possiblities for an interrupt: // - return; - } + // - The thread pool has been destroyed. + // + // - Server shutdown has been initiated. + // + // - An event handler was registered or unregistered. + // + + // + // Thread pool destroyed? + // + if (_destroyed) + { + // + // Don't clear the interrupt if destroyed, so that + // the other threads exit as well. + // + return; + } + + shutdown = clearInterrupt(); - if (!_adds.empty()) - { // - // New handlers have been added. + // Server shutdown? // - for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p) + if (shutdown) { - _handlerMap.insert(*p); - FD_SET(p->first, &_fdSet); - _maxFd = max(_maxFd, p->first); - _minFd = min(_minFd, p->first); + goto repeatSelect; } - _adds.clear(); - } - - if (!_removes.empty()) - { + // - // Handlers are permanently removed. + // An event handler must have been registered or + // unregistered. // - remove = _removes.front(); - _removes.pop_front(); - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(remove.first); - assert(p != _handlerMap.end()); - FD_CLR(p->first, &_fdSet); - handler = p->second; + assert(!_changes.empty()); + pair<SOCKET, EventHandlerPtr> change = _changes.front(); + _changes.pop_front(); + + if (change.second) // Addition if handler is set. + { + _handlerMap.insert(change); + FD_SET(change.first, &_fdSet); + _maxFd = max(_maxFd, change.first); + _minFd = min(_minFd, change.first); + goto repeatSelect; + } + else // Removal if handler is not set. + { + map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first); + assert(p != _handlerMap.end()); + handler = p->second; + finished = true; + _handlerMap.erase(p); + FD_CLR(change.first, &_fdSet); + _maxFd = _fdIntrRead; + _minFd = _fdIntrRead; + if (!_handlerMap.empty()) + { + _maxFd = max(_maxFd, (--_handlerMap.end())->first); + _minFd = min(_minFd, _handlerMap.begin()->first); + } + // Don't goto repeatSelect; we have to call + // finished() on the event handler below, outside + // the thread synchronization. + } } - - if (!handler) + else { // // Optimization for WIN32 specific version of fd_set. Looping with a @@ -400,11 +496,7 @@ IceInternal::ThreadPool::run() } #endif - if (_lastFd == _fdIntrRead) - { - shutdown = clearInterrupt(); - goto repeatSelect; - } + assert(_lastFd != _fdIntrRead); map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); if(p == _handlerMap.end()) @@ -420,34 +512,27 @@ IceInternal::ThreadPool::run() assert(handler); - if (remove.first != INVALID_SOCKET) + if (finished) { // - // Call finished() on a handler if necessary. + // Notify a handler about it's removal from the thread + // pool. // - if (remove.second) - { - handler->finished(); - } - + handler->finished(); + { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(remove.first); - assert(p != _handlerMap.end()); - _handlerMap.erase(p); - _maxFd = _fdIntrRead; - _minFd = _fdIntrRead; - if (!_handlerMap.empty()) - { - _maxFd = max(_maxFd, (--_handlerMap.end())->first); - _minFd = min(_minFd, _handlerMap.begin()->first); - } if (handler->server()) { + assert(_servers > 0); --_servers; } - if (_handlerMap.empty() || _servers == 0) + else + { + assert(_clients > 0); + --_clients; + } + if (_clients == 0 || _servers == 0) { notifyAll(); // For waitUntil...Finished() methods. } |