diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 48 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 16 | ||||
-rw-r--r-- | cpp/src/Ice/ObjectAdapterI.cpp | 50 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 43 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.h | 4 |
5 files changed, 115 insertions, 46 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 6b989962dd7..4f49e28403f 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -789,12 +789,15 @@ IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + // XXX: must promoteFollower be inside or outside the mutex? threadPool->promoteFollower(); + assert(threadPool.get() == dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool().get()); --_finishedCount; if(_finishedCount == 0 && _state == StateClosed) { + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->decFdsInUse(); _acceptor->close(); _acceptor = 0; notifyAll(); @@ -877,10 +880,10 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance assert(_acceptor); _acceptor->listen(); - if(_instance->threadPerConnection()) + __setNoDelete(true); + try { - __setNoDelete(true); - try + if(_instance->threadPerConnection()) { // // If we are in thread per connection mode, we also use @@ -890,27 +893,32 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this); _threadPerIncomingConnectionFactory->start(_instance->threadPerConnectionStackSize()); } - catch(const IceUtil::Exception& ex) + else { - { - Error out(_instance->initializationData().logger); - out << "cannot create thread for incoming connection factory:\n" << ex; - } - - try - { - _acceptor->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - - __setNoDelete(false); - ex.ice_throw(); + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->incFdsInUse(); + } + } + catch(const IceUtil::Exception& ex) + { + if(_instance->threadPerConnection()) + { + Error out(_instance->initializationData().logger); + out << "cannot create thread for incoming connection factory:\n" << ex; + } + + try + { + _acceptor->close(); } + catch(const LocalException&) + { + // Here we ignore any exceptions in close(). + } + __setNoDelete(false); + ex.ice_throw(); } + __setNoDelete(false); } } diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 1597636cea6..9f7dea5d0c2 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -1451,9 +1451,12 @@ Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool) IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); --_finishedCount; + assert(threadPool.get() == _threadPool.get()); if(_finishedCount == 0 && _state == StateClosed) { + _threadPool->decFdsInUse(); + // // We must make sure that nobody is sending when we close // the transceiver. @@ -1606,8 +1609,8 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, } int& compressionLevel = const_cast<int&>(_compressionLevel); - compressionLevel = \ - _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Compression.Level", 1); + compressionLevel = _instance->initializationData().properties->getPropertyAsIntWithDefault( + "Ice.Compression.Level", 1); if(compressionLevel < 1) { compressionLevel = 1; @@ -1642,6 +1645,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, { const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool(); } + _threadPool->incFdsInUse(); } else { @@ -1671,7 +1675,11 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, { out << "cannot create thread for connection:\n" << ex; } - else + // + // If the _threadPool member variable is set then + // ThreadPool::incFdsInUse() failed. + // + else if(!_threadPool) { out << "cannot create thread pool for connection:\n" << ex; } @@ -1859,6 +1867,8 @@ Ice::ConnectionI::setState(State state) // assert(!_registeredWithPool); + _threadPool->decFdsInUse(); + // // We must make sure that nobody is sending when we // close the transceiver. diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp index 27805c6a4e9..9338f35bd70 100644 --- a/cpp/src/Ice/ObjectAdapterI.cpp +++ b/cpp/src/Ice/ObjectAdapterI.cpp @@ -777,6 +777,33 @@ Ice::ObjectAdapterI::ObjectAdapterI(const InstancePtr& instance, const Communica __setNoDelete(true); try { + // First create the per-adapter thread pool, if + // necessary. This is done before the creation of the incoming + // connection factory as the thread pool is needed during + // creation for the call to incFdsInUse. + if(!_instance->threadPerConnection()) + { + if(!properties->getProperty(_propertyPrefix + _name + ".ThreadPool.Size").empty() || + !properties->getProperty(_propertyPrefix + _name + ".ThreadPool.SizeMax").empty()) + { + int size = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.Size"); + int sizeMax = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.SizeMax"); + if(size > 0 || sizeMax > 0) + { + _threadPool = new ThreadPool(_instance, _propertyPrefix + _name + ".ThreadPool", 0); + } + } + else + { + int size = properties->getPropertyAsInt(_name + ".ThreadPool.Size"); + int sizeMax = properties->getPropertyAsInt(_name + ".ThreadPool.SizeMax"); + if(size > 0 || sizeMax > 0) + { + _threadPool = new ThreadPool(_instance, _name + ".ThreadPool", 0); + } + } + } + if(!router) { const_cast<RouterPrx&>(router) = RouterPrx::uncheckedCast( @@ -890,29 +917,6 @@ Ice::ObjectAdapterI::ObjectAdapterI(const InstancePtr& instance, const Communica { setLocator(_instance->referenceFactory()->getDefaultLocator()); } - - if(!_instance->threadPerConnection()) - { - if(!properties->getProperty(_propertyPrefix + _name + ".ThreadPool.Size").empty() || - !properties->getProperty(_propertyPrefix + _name + ".ThreadPool.SizeMax").empty()) - { - int size = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.Size"); - int sizeMax = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.SizeMax"); - if(size > 0 || sizeMax > 0) - { - _threadPool = new ThreadPool(_instance, _propertyPrefix + _name + ".ThreadPool", 0); - } - } - else - { - int size = properties->getPropertyAsInt(_name + ".ThreadPool.Size"); - int sizeMax = properties->getPropertyAsInt(_name + ".ThreadPool.SizeMax"); - if(size > 0 || sizeMax > 0) - { - _threadPool = new ThreadPool(_instance, _name + ".ThreadPool", 0); - } - } - } } catch(...) { diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 5b16ee748eb..5240889b537 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -60,6 +60,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _minFd = _fdIntrRead; #if defined(_WIN32) + _fdsInUse = 1; // _fdIntrRead is always in use. FD_ZERO(&_fdSet); FD_SET(_fdIntrRead, &_fdSet); #elif defined(__linux) @@ -210,6 +211,48 @@ IceInternal::ThreadPool::destroy() } void +IceInternal::ThreadPool::incFdsInUse() +{ + // This is windows specific since every other platform uses an API + // that doesn't have a specific FD limit. +#ifdef _WIN32 + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(!_destroyed); + if(_fdsInUse + 1 > FD_SETSIZE) + { + Warning warn(_instance->initializationData().logger); + warn << "maximum number of connections exceeded"; + + // + // No appropriate errno. + // + SocketException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; + } + ++_fdsInUse; +#endif +} + +void +IceInternal::ThreadPool::decFdsInUse() +{ + // This is windows specific since every other platform uses an API + // that doesn't have a specific FD limit. +#ifdef _WIN32 + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(!_destroyed); + if(_fdsInUse <= 1) + { + Trace trace(_instance->initializationData().logger, "ThreadPool"); + trace << _prefix << ": about to assert"; + } + assert(_fdsInUse > 1); // _fdIntrRead is always in use. + --_fdsInUse; +#endif +} + +void IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index 84ee595ceb4..a0ecc923a46 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -48,6 +48,9 @@ public: void destroy(); + void incFdsInUse(); + void decFdsInUse(); + void _register(SOCKET, const EventHandlerPtr&); void unregister(SOCKET); void promoteFollower(); @@ -74,6 +77,7 @@ private: SOCKET _fdIntrWrite; #if defined(_WIN32) fd_set _fdSet; + int _fdsInUse; #elif defined(__linux) int _epollFd; std::vector<struct epoll_event> _events; |