diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 24 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 58 | ||||
-rw-r--r-- | cpp/src/Ice/Network.cpp | 8 | ||||
-rw-r--r-- | cpp/src/Ice/SslAcceptor.cpp | 2 | ||||
-rw-r--r-- | cpp/src/Ice/SslConnector.cpp | 1 | ||||
-rw-r--r-- | cpp/src/Ice/TcpAcceptor.cpp | 2 | ||||
-rw-r--r-- | cpp/src/Ice/TcpConnector.cpp | 1 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 243 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.h | 12 | ||||
-rw-r--r-- | cpp/src/Ice/UdpTransceiver.cpp | 2 |
10 files changed, 235 insertions, 118 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 01ecb5fe6e5..81e43236ce0 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -251,6 +251,17 @@ void IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter) { IceUtil::RecMutex::Lock sync(*this); + + if (adapter && !_adapter) + { + _threadPool->clientIsNowServer(); + } + + if (!adapter && _adapter) + { + _threadPool->serverIsNowClient(); + } + _adapter = adapter; } @@ -538,11 +549,14 @@ IceInternal::Connection::finished() { IceUtil::RecMutex::Lock sync(*this); - _threadPool->promoteFollower(); + assert(_state == StateClosed || _state == StateHolding); - assert(_state == StateClosed); + _threadPool->promoteFollower(); - _transceiver->close(); + if (_state == StateClosed) + { + _transceiver->close(); + } } void @@ -699,7 +713,7 @@ IceInternal::Connection::setState(State state) { return; } - _threadPool->unregister(_transceiver->fd(), false); + _threadPool->unregister(_transceiver->fd()); break; } @@ -730,7 +744,7 @@ IceInternal::Connection::setState(State state) // _threadPool->_register(_transceiver->fd(), this); } - _threadPool->unregister(_transceiver->fd(), true); + _threadPool->unregister(_transceiver->fd()); break; } } diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 378ce96d84a..9495c50983f 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -358,41 +358,45 @@ IceInternal::IncomingConnectionFactory::finished() { IceUtil::Mutex::Lock sync(*this); + assert(_state == StateClosed || _state == StateHolding); + _threadPool->promoteFollower(); - assert(_state == StateClosed); - assert(_connections.empty()); - - try + if (_state == StateClosed) { - // - // Clear listen() backlog properly by accepting all queued - // connections, and then shutting them down. - // - while (true) + assert(_connections.empty()); + + try { - try - { - TransceiverPtr transceiver = _acceptor->accept(0); - ConnectionPtr connection = new Connection(_instance, transceiver, _endpoint, _adapter); - connection->exception(ObjectAdapterDeactivatedException(__FILE__, __LINE__)); - } - catch (const TimeoutException&) + // + // Clear listen() backlog properly by accepting all queued + // connections, and then shutting them down. + // + while (true) { - break; // Exit loop on timeout. + try + { + TransceiverPtr transceiver = _acceptor->accept(0); + ConnectionPtr connection = new Connection(_instance, transceiver, _endpoint, _adapter); + connection->exception(ObjectAdapterDeactivatedException(__FILE__, __LINE__)); + } + catch (const TimeoutException&) + { + break; // Exit loop on timeout. + } } } - } - catch (const LocalException& ex) - { - if (_warn) + catch (const LocalException& ex) { - Warning out(_instance->logger()); - out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + if (_warn) + { + Warning out(_instance->logger()); + out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + } } + + _acceptor->close(); } - - _acceptor->close(); } void @@ -493,7 +497,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) if (_threadPool) { - _threadPool->unregister(_acceptor->fd(), false); + _threadPool->unregister(_acceptor->fd()); } for_each(_connections.begin(), _connections.end(), ::Ice::voidMemFun(&Connection::hold)); @@ -512,7 +516,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) { _threadPool->_register(_acceptor->fd(), this); } - _threadPool->unregister(_acceptor->fd(), true); + _threadPool->unregister(_acceptor->fd()); } #ifdef _STLP_BEGIN_NAMESPACE diff --git a/cpp/src/Ice/Network.cpp b/cpp/src/Ice/Network.cpp index 6b20964273c..326c599b89b 100644 --- a/cpp/src/Ice/Network.cpp +++ b/cpp/src/Ice/Network.cpp @@ -232,8 +232,6 @@ IceInternal::createSocket(bool udp) throw ex; } - setBlock(fd, false); - if (!udp) { setTcpNoDelay(fd); @@ -674,6 +672,7 @@ IceInternal::createPipe(SOCKET fds[2]) #ifdef _WIN32 SOCKET fd = createSocket(false); + setBlock(fd, true); struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); @@ -687,6 +686,7 @@ IceInternal::createPipe(SOCKET fds[2]) try { fds[0] = createSocket(false); + setBlock(fds[0], true); } catch(...) { @@ -698,6 +698,7 @@ IceInternal::createPipe(SOCKET fds[2]) { doConnect(fds[0], addr, -1); fds[1] = doAccept(fd, -1); + setBlock(fds[1], true); } catch(...) { @@ -717,6 +718,9 @@ IceInternal::createPipe(SOCKET fds[2]) throw ex; } + setBlock(fds[0], true); + setBlock(fds[1], true); + #endif } diff --git a/cpp/src/Ice/SslAcceptor.cpp b/cpp/src/Ice/SslAcceptor.cpp index 15c28498d75..f421a7d2fe5 100644 --- a/cpp/src/Ice/SslAcceptor.cpp +++ b/cpp/src/Ice/SslAcceptor.cpp @@ -84,6 +84,7 @@ TransceiverPtr IceInternal::SslAcceptor::accept(int timeout) { SOCKET fd = doAccept(_fd, timeout); + setBlock(fd, false); if (_traceLevels->network >= 1) { @@ -136,6 +137,7 @@ IceInternal::SslAcceptor::SslAcceptor(const InstancePtr& instance, const string& try { _fd = createSocket(false); + setBlock(_fd, false); getAddress(host, port, _addr); doBind(_fd, _addr); } diff --git a/cpp/src/Ice/SslConnector.cpp b/cpp/src/Ice/SslConnector.cpp index fa14f529dbc..09d51af325c 100644 --- a/cpp/src/Ice/SslConnector.cpp +++ b/cpp/src/Ice/SslConnector.cpp @@ -49,6 +49,7 @@ IceInternal::SslConnector::connect(int timeout) } SOCKET fd = createSocket(false); + setBlock(fd, false); doConnect(fd, _addr, timeout); if (_traceLevels->network >= 1) diff --git a/cpp/src/Ice/TcpAcceptor.cpp b/cpp/src/Ice/TcpAcceptor.cpp index 84691e16e2e..4217668f956 100644 --- a/cpp/src/Ice/TcpAcceptor.cpp +++ b/cpp/src/Ice/TcpAcceptor.cpp @@ -64,6 +64,7 @@ TransceiverPtr IceInternal::TcpAcceptor::accept(int timeout) { SOCKET fd = doAccept(_fd, timeout); + setBlock(fd, false); if (_traceLevels->network >= 1) { @@ -108,6 +109,7 @@ IceInternal::TcpAcceptor::TcpAcceptor(const InstancePtr& instance, const string& try { _fd = createSocket(false); + setBlock(_fd, false); getAddress(host, port, _addr); doBind(_fd, _addr); } diff --git a/cpp/src/Ice/TcpConnector.cpp b/cpp/src/Ice/TcpConnector.cpp index c00d6789667..236198804a7 100644 --- a/cpp/src/Ice/TcpConnector.cpp +++ b/cpp/src/Ice/TcpConnector.cpp @@ -30,6 +30,7 @@ IceInternal::TcpConnector::connect(int timeout) } SOCKET fd = createSocket(false); + setBlock(fd, false); doConnect(fd, _addr, timeout); if (_traceLevels->network >= 1) diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 6b7827663aa..b149265b717 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -34,16 +34,42 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler) { ++_servers; } - _adds.push_back(make_pair(fd, handler)); - setInterrupt(); + else + { + ++_clients; + } + _changes.push_back(make_pair(fd, handler)); + setInterrupt(0); +} + +void +IceInternal::ThreadPool::unregister(SOCKET fd) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + _changes.push_back(make_pair(fd, EventHandlerPtr(0))); + setInterrupt(0); +} + +void +IceInternal::ThreadPool::serverIsNowClient() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + ++_clients; + assert(_servers > 0); + --_servers; + if (_servers == 0) + { + notifyAll(); // For waitUntil...Finished() methods. + } } void -IceInternal::ThreadPool::unregister(SOCKET fd, bool callFinished) +IceInternal::ThreadPool::clientIsNowServer() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - _removes.push_back(make_pair(fd, callFinished)); - setInterrupt(); + ++_servers; + assert(_clients > 0); + --_clients; } void @@ -55,12 +81,7 @@ IceInternal::ThreadPool::promoteFollower() void IceInternal::ThreadPool::initiateServerShutdown() { - char c = 1; -#ifdef _WIN32 - ::send(_fdIntrWrite, &c, 1, 0); -#else - ::write(_fdIntrWrite, &c, 1); -#endif + setInterrupt(1); } void @@ -86,17 +107,21 @@ IceInternal::ThreadPool::waitUntilFinished() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - while (!_handlerMap.empty() && _threadNum != 0) + while (_clients + _servers != 0 && _threadNum != 0) { wait(); } - if (!_handlerMap.empty()) + if (_clients + _servers != 0) { Error out(_logger); out << "can't wait for graceful application termination in thread pool\n" << "since all threads have vanished"; } + else + { + assert(_handlerMap.empty()); + } } void @@ -141,6 +166,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) : _properties(_instance->properties()), _destroyed(false), _lastFd(INVALID_SOCKET), + _clients(0), _servers(0), _timeout(0) { @@ -197,37 +223,74 @@ IceInternal::ThreadPool::destroy() IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(!_destroyed); _destroyed = true; - setInterrupt(); + setInterrupt(0); } bool IceInternal::ThreadPool::clearInterrupt() { - bool shutdown = false; char c; + +repeat: + #ifdef _WIN32 - while (::recv(_fdIntrRead, &c, 1, 0) == 1) -#else - while (::read(_fdIntrRead, &c, 1) == 1) -#endif + if (::recv(_fdIntrRead, &c, 1, 0) == SOCKET_ERROR) { - if (c == 1) // Shutdown initiated? + if (interrupted()) { - shutdown = true; + goto repeat; } + + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; } +#else + if (::read(_fdIntrRead, &c, 1) == -1) + { + if (interrupted()) + { + goto repeat; + } - return shutdown; + SystemException ex(__FILE__, __LINE__); + ex.error = getSystemErrno(); + throw ex; + } +#endif + + return c == 1; // Return true if shutdown has been initiated. } void -IceInternal::ThreadPool::setInterrupt() +IceInternal::ThreadPool::setInterrupt(char c) { - char c = 0; +repeat: + #ifdef _WIN32 - ::send(_fdIntrWrite, &c, 1, 0); + if (::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR) + { + if (interrupted()) + { + goto repeat; + } + + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } #else - ::write(_fdIntrWrite, &c, 1); + if (::write(_fdIntrWrite, &c, 1) == -1) + { + if (interrupted()) + { + goto repeat; + } + + SystemException ex(__FILE__, __LINE__); + ex.error = getSystemErrno(); + throw ex; + } #endif } @@ -288,49 +351,82 @@ IceInternal::ThreadPool::run() } EventHandlerPtr handler; - std::pair<SOCKET, bool> remove(INVALID_SOCKET, false); + bool finished = false; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if (_destroyed) + + if (FD_ISSET(_fdIntrRead, &fdSet)) { // - // Don't clear the interrupt fd if destroyed, so that - // the other threads exit as well. + // There are three possiblities for an interrupt: // - return; - } + // - The thread pool has been destroyed. + // + // - Server shutdown has been initiated. + // + // - An event handler was registered or unregistered. + // + + // + // Thread pool destroyed? + // + if (_destroyed) + { + // + // Don't clear the interrupt if destroyed, so that + // the other threads exit as well. + // + return; + } + + shutdown = clearInterrupt(); - if (!_adds.empty()) - { // - // New handlers have been added. + // Server shutdown? // - for (vector<pair<SOCKET, EventHandlerPtr> >::iterator p = _adds.begin(); p != _adds.end(); ++p) + if (shutdown) { - _handlerMap.insert(*p); - FD_SET(p->first, &_fdSet); - _maxFd = max(_maxFd, p->first); - _minFd = min(_minFd, p->first); + goto repeatSelect; } - _adds.clear(); - } - - if (!_removes.empty()) - { + // - // Handlers are permanently removed. + // An event handler must have been registered or + // unregistered. // - remove = _removes.front(); - _removes.pop_front(); - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(remove.first); - assert(p != _handlerMap.end()); - FD_CLR(p->first, &_fdSet); - handler = p->second; + assert(!_changes.empty()); + pair<SOCKET, EventHandlerPtr> change = _changes.front(); + _changes.pop_front(); + + if (change.second) // Addition if handler is set. + { + _handlerMap.insert(change); + FD_SET(change.first, &_fdSet); + _maxFd = max(_maxFd, change.first); + _minFd = min(_minFd, change.first); + goto repeatSelect; + } + else // Removal if handler is not set. + { + map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first); + assert(p != _handlerMap.end()); + handler = p->second; + finished = true; + _handlerMap.erase(p); + FD_CLR(change.first, &_fdSet); + _maxFd = _fdIntrRead; + _minFd = _fdIntrRead; + if (!_handlerMap.empty()) + { + _maxFd = max(_maxFd, (--_handlerMap.end())->first); + _minFd = min(_minFd, _handlerMap.begin()->first); + } + // Don't goto repeatSelect; we have to call + // finished() on the event handler below, outside + // the thread synchronization. + } } - - if (!handler) + else { // // Optimization for WIN32 specific version of fd_set. Looping with a @@ -400,11 +496,7 @@ IceInternal::ThreadPool::run() } #endif - if (_lastFd == _fdIntrRead) - { - shutdown = clearInterrupt(); - goto repeatSelect; - } + assert(_lastFd != _fdIntrRead); map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); if(p == _handlerMap.end()) @@ -420,34 +512,27 @@ IceInternal::ThreadPool::run() assert(handler); - if (remove.first != INVALID_SOCKET) + if (finished) { // - // Call finished() on a handler if necessary. + // Notify a handler about it's removal from the thread + // pool. // - if (remove.second) - { - handler->finished(); - } - + handler->finished(); + { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(remove.first); - assert(p != _handlerMap.end()); - _handlerMap.erase(p); - _maxFd = _fdIntrRead; - _minFd = _fdIntrRead; - if (!_handlerMap.empty()) - { - _maxFd = max(_maxFd, (--_handlerMap.end())->first); - _minFd = min(_minFd, _handlerMap.begin()->first); - } if (handler->server()) { + assert(_servers > 0); --_servers; } - if (_handlerMap.empty() || _servers == 0) + else + { + assert(_clients > 0); + --_clients; + } + if (_clients == 0 || _servers == 0) { notifyAll(); // For waitUntil...Finished() methods. } diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index 90674028b28..464bbb3711e 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -21,7 +21,7 @@ #include <Ice/LoggerF.h> #include <Ice/PropertiesF.h> #include <Ice/EventHandlerF.h> -#include <deque> +#include <list> #ifndef _WIN32 # define SOCKET int @@ -37,7 +37,9 @@ class ThreadPool : public ::IceUtil::Shared, public IceUtil::Monitor<IceUtil::Mu public: void _register(SOCKET, const EventHandlerPtr&); - void unregister(SOCKET, bool); + void unregister(SOCKET); + void serverIsNowClient(); + void clientIsNowServer(); void promoteFollower(); void initiateServerShutdown(); // Signal-safe shutdown initiation. void waitUntilServerFinished(); @@ -54,7 +56,7 @@ private: friend class Instance; bool clearInterrupt(); - void setInterrupt(); + void setInterrupt(char); void run(); void read(const EventHandlerPtr&); @@ -69,9 +71,9 @@ private: SOCKET _fdIntrRead; SOCKET _fdIntrWrite; fd_set _fdSet; - std::vector<std::pair<SOCKET, EventHandlerPtr> > _adds; - std::deque<std::pair<SOCKET, bool> > _removes; + std::list<std::pair<SOCKET, EventHandlerPtr> > _changes; // Event handler set for addition; null for removal. std::map<SOCKET, EventHandlerPtr> _handlerMap; + int _clients; int _servers; int _timeout; ::IceUtil::Mutex _threadMutex; diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp index 2a0d90c645a..e5b74f31dbd 100644 --- a/cpp/src/Ice/UdpTransceiver.cpp +++ b/cpp/src/Ice/UdpTransceiver.cpp @@ -177,6 +177,7 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s try { _fd = createSocket(true); + setBlock(_fd, false); getAddress(host, port, _addr); doConnect(_fd, _addr, -1); _connect = false; // We're connected now @@ -206,6 +207,7 @@ IceInternal::UdpTransceiver::UdpTransceiver(const InstancePtr& instance, const s try { _fd = createSocket(true); + setBlock(_fd, false); getAddress(host, port, _addr); doBind(_fd, _addr); |