diff options
author | Marc Laukien <marc@zeroc.com> | 2002-04-18 19:52:58 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2002-04-18 19:52:58 +0000 |
commit | 9d5a45489ae73aba5ce15767037df8377aeab91b (patch) | |
tree | 55e63caf4fcc9c5c46eb700618cfb7d31a902053 /cpp | |
parent | minor update. (diff) | |
download | ice-9d5a45489ae73aba5ce15767037df8377aeab91b.tar.bz2 ice-9d5a45489ae73aba5ce15767037df8377aeab91b.tar.xz ice-9d5a45489ae73aba5ce15767037df8377aeab91b.zip |
massive thread pool changes
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/config/Make.rules | 4 | ||||
-rw-r--r-- | cpp/demo/Ice/nested/Client.cpp | 6 | ||||
-rw-r--r-- | cpp/doc/Properties.sgml | 30 | ||||
-rw-r--r-- | cpp/src/Ice/CommunicatorI.cpp | 8 | ||||
-rw-r--r-- | cpp/src/Ice/CommunicatorI.h | 13 | ||||
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 105 | ||||
-rw-r--r-- | cpp/src/Ice/Connection.h | 10 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 82 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.h | 9 | ||||
-rw-r--r-- | cpp/src/Ice/EventHandler.h | 12 | ||||
-rw-r--r-- | cpp/src/Ice/Instance.cpp | 88 | ||||
-rw-r--r-- | cpp/src/Ice/Instance.h | 9 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 132 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.h | 15 |
14 files changed, 256 insertions, 267 deletions
diff --git a/cpp/config/Make.rules b/cpp/config/Make.rules index 75b86f9e542..00d0ff29905 100644 --- a/cpp/config/Make.rules +++ b/cpp/config/Make.rules @@ -38,8 +38,8 @@ slicedir = $(top_srcdir)/slice # set different compilation options for debug or optimization # CXX = c++ -CXXFLAGS = -g -ftemplate-depth-128 -fPIC -Wall -#CXXFLAGS = -O3 -DNDEBUG -ftemplate-depth-128 -fPIC -Wall +#CXXFLAGS = -g -ftemplate-depth-128 -fPIC -Wall +CXXFLAGS = -O3 -DNDEBUG -ftemplate-depth-128 -fPIC -Wall CPPFLAGS = -I$(includedir) -I$(STLPORT_HOME)/include/stlport LDFLAGS = -L$(libdir) -L$(STLPORT_HOME)/lib -L$(OPENSSL_HOME)/lib -L$(XERCESC_HOME)/lib BASELIBS = -lIceUtil -lstlport_gcc -lpthread -luuid -ldl diff --git a/cpp/demo/Ice/nested/Client.cpp b/cpp/demo/Ice/nested/Client.cpp index 11a37001a26..d915035b543 100644 --- a/cpp/demo/Ice/nested/Client.cpp +++ b/cpp/demo/Ice/nested/Client.cpp @@ -54,9 +54,9 @@ NestedClient::run(int argc, char* argv[]) adapter->activate(); cout << "Note: The maximum nesting level is (sz - 1) * 2, with sz\n" - << "being the number of threads in the thread pool. if you\n" - << "specify a value higher than that, the application will block\n" - << "or timeout.\n" + << "being the number of threads in the server thread pool. if\n" + << "you specify a value higher than that, the application will\n" + << "block or timeout.\n" << endl; string s; diff --git a/cpp/doc/Properties.sgml b/cpp/doc/Properties.sgml index 07a37835afa..5142c9f5341 100644 --- a/cpp/doc/Properties.sgml +++ b/cpp/doc/Properties.sgml @@ -179,24 +179,7 @@ has been created with <section><title>&Ice; Thread Pool Properties</title> <!-- ********************************************************************** --> -<section><title>Ice.ThreadPool.MaxConnections</title> -<section><title>Synopsis</title> -<synopsis> -Ice.ThreadPool.MaxConnections=<replaceable>num</replaceable> -</synopsis> -</section> -<section> -<title>Description</title> -<para> -The maximum number of connections the thread pool will use. Default is -zero, meaning no limit. If <replaceable>num</replaceable> is not set -to zero, it must be set to a value larger than the number of threads -in the thread pool. <note><para>Currently not implemented!</para></note> -</para> -</section> -</section> - -<section><title>Ice.ThreadPool.Size</title> +<section><title>Ice.ClientThreadPool.Size, Ice.ServerThreadPool.Size</title> <section><title>Synopsis</title> <synopsis> Ice.ThreadPool.Size=<replaceable>num</replaceable> @@ -205,8 +188,15 @@ Ice.ThreadPool.Size=<replaceable>num</replaceable> <section> <title>Description</title> <para> -The number of threads in the thread pool. Default is 10. -<replaceable>num</replaceable> must be larger than zero. +The number of threads in the client and server thread pools. Default +is one thread for the client thread pool, and 10 threads for the +server thread pool. The number of threads in each pool must be larger +than zero. <note><para> At present, there is no need to set the number +of threads in the client thread pool to a value larger than +one. Having multiple threads will only be of benefit with future +versions of &Ice; that support callbacks on the client, which are +invoked when a response from the server is received (for asynchronous +method invocations). </para></note> </para> </section> </section> diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp index bc98c080681..bf46e3d3544 100644 --- a/cpp/src/Ice/CommunicatorI.cpp +++ b/cpp/src/Ice/CommunicatorI.cpp @@ -45,7 +45,7 @@ Ice::CommunicatorI::shutdown() // // No mutex locking here! This operation must be signal-safe. // - _threadPool->initiateServerShutdown(); + _serverThreadPool->initiateShutdown(); } void @@ -55,7 +55,7 @@ Ice::CommunicatorI::waitForShutdown() // No mutex locking here, otherwise the communicator is blocked // while waiting for shutdown. // - _threadPool->waitUntilServerFinished(); + _serverThreadPool->waitUntilFinished(); } ObjectPrx @@ -258,9 +258,9 @@ Ice::CommunicatorI::CommunicatorI(const PropertiesPtr& properties) // // See the comments in the header file for an explanation of why we - // need _threadPool directly in CommunicatorI. + // need _serverThreadPool directly in CommunicatorI. // - _threadPool = _instance->threadPool(); + _serverThreadPool = _instance->serverThreadPool(); } Ice::CommunicatorI::~CommunicatorI() diff --git a/cpp/src/Ice/CommunicatorI.h b/cpp/src/Ice/CommunicatorI.h index 7c82efbbcf2..6e1c19598a2 100644 --- a/cpp/src/Ice/CommunicatorI.h +++ b/cpp/src/Ice/CommunicatorI.h @@ -68,13 +68,14 @@ private: ::IceInternal::InstancePtr _instance; // - // We need _threadPool directly in CommunicatorI, and it must - // never be set to null. That's because the shutdown() operation - // is signal-safe, and thus must not access any mutex locks or - // _instance. It may only access _threadPool->initiateShutdown(), - // which is signal-safe as well. + // We need _serverThreadPool directly in CommunicatorI, and it + // must never be set to null. That's because the shutdown() + // operation is signal-safe, and thus must not access any mutex + // locks or _instance. It may only access + // _serverThreadPool->initiateShutdown(), which is signal-safe as + // well. // - ::IceInternal::ThreadPoolPtr _threadPool; + ::IceInternal::ThreadPoolPtr _serverThreadPool; }; } diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 6ae51a0d9eb..5920e58d1c3 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -252,26 +252,31 @@ void IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter) { IceUtil::RecMutex::Lock sync(*this); - + // - // In closed and holding state, we are not registered with the - // thread pool. For all other states, we have to notify the thread - // pool in case this event handler changed from a client to a - // server or vice versa. + // We are registered with a thread pool in active and closing + // mode. However, we only change subscription if we're in active + // mode, and thus ignore closing mode here.k // - if (_state != StateHolding && _state != StateClosed) + if (_state == StateActive) { if (adapter && !_adapter) { - _threadPool->clientIsNowServer(); + // + // Client is now server. + // + unregisterWithPool(); } if (!adapter && _adapter) { - _threadPool->serverIsNowClient(); + // + // Server is now client. + // + unregisterWithPool(); } } - + _adapter = adapter; } @@ -283,13 +288,6 @@ IceInternal::Connection::getAdapter() const } bool -IceInternal::Connection::server() const -{ - IceUtil::RecMutex::Lock sync(*this); - return _adapter; -} - -bool IceInternal::Connection::readable() const { return true; @@ -302,7 +300,7 @@ IceInternal::Connection::read(BasicStream& stream) } void -IceInternal::Connection::message(BasicStream& stream) +IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threadPool) { bool invoke = false; bool batch = false; @@ -310,7 +308,7 @@ IceInternal::Connection::message(BasicStream& stream) { IceUtil::RecMutex::Lock sync(*this); - _threadPool->promoteFollower(); + threadPool->promoteFollower(); if (_state == StateClosed) { @@ -555,15 +553,17 @@ IceInternal::Connection::message(BasicStream& stream) } void -IceInternal::Connection::finished() +IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) { IceUtil::RecMutex::Lock sync(*this); - assert(_state == StateClosed || _state == StateHolding); - - _threadPool->promoteFollower(); + threadPool->promoteFollower(); - if (_state == StateClosed) + if (_state == StateActive || _state == StateClosing) + { + registerWithPool(); + } + else if (_state == StateClosed) { _transceiver->close(); } @@ -578,7 +578,7 @@ IceInternal::Connection::exception(const LocalException& ex) /* bool -IceInternal::Connection::tryDestroy() +IceInternal::Connection::tryDestroy(const ThreadPoolPtr& threadPool) { bool isLocked = trylock(); if(!isLocked) @@ -586,7 +586,7 @@ IceInternal::Connection::tryDestroy() return false; } - _threadPool->promoteFollower(); + threadPool->promoteFollower(); try { @@ -611,12 +611,11 @@ IceInternal::Connection::Connection(const InstancePtr& instance, _transceiver(transceiver), _endpoint(endpoint), _adapter(adapter), - _threadPool(instance->threadPool()), - _logger(instance->logger()), - _traceLevels(instance->traceLevels()), + _logger(_instance->logger()), + _traceLevels(_instance->traceLevels()), _nextRequestId(1), _requestsHint(_requests.end()), - _batchStream(instance), + _batchStream(_instance), _responseCount(0), _proxyUsageCount(0), _state(StateHolding) @@ -713,7 +712,7 @@ IceInternal::Connection::setState(State state) { return; } - _threadPool->_register(_transceiver->fd(), this); + registerWithPool(); break; } @@ -723,7 +722,7 @@ IceInternal::Connection::setState(State state) { return; } - _threadPool->unregister(_transceiver->fd()); + unregisterWithPool(); break; } @@ -738,7 +737,7 @@ IceInternal::Connection::setState(State state) // // We need to continue to read data in closing state. // - _threadPool->_register(_transceiver->fd(), this); + registerWithPool(); } break; } @@ -752,9 +751,9 @@ IceInternal::Connection::setState(State state) // register again before we unregister, so that // finished() is called correctly. // - _threadPool->_register(_transceiver->fd(), this); + registerWithPool(); } - _threadPool->unregister(_transceiver->fd()); + unregisterWithPool(); break; } } @@ -787,3 +786,41 @@ IceInternal::Connection::closeConnection() _transceiver->write(os, _endpoint->timeout()); _transceiver->shutdown(); } + +void +IceInternal::Connection::registerWithPool() +{ + if (_adapter) + { + if (!_serverThreadPool) + { + _serverThreadPool = _instance->serverThreadPool(); + assert(_serverThreadPool); + } + _serverThreadPool->_register(_transceiver->fd(), this); + } + else + { + if (!_clientThreadPool) + { + _clientThreadPool = _instance->clientThreadPool(); + assert(_clientThreadPool); + } + _clientThreadPool->_register(_transceiver->fd(), this); + } +} + +void +IceInternal::Connection::unregisterWithPool() +{ + if (_adapter) + { + assert(_serverThreadPool); + _serverThreadPool->unregister(_transceiver->fd()); + } + else + { + assert(_clientThreadPool); + _clientThreadPool->unregister(_transceiver->fd()); + } +} diff --git a/cpp/src/Ice/Connection.h b/cpp/src/Ice/Connection.h index 18f95a38230..de623c0187e 100644 --- a/cpp/src/Ice/Connection.h +++ b/cpp/src/Ice/Connection.h @@ -60,11 +60,10 @@ public: // // Operations from EventHandler // - virtual bool server() const; virtual bool readable() const; virtual void read(BasicStream&); - virtual void message(BasicStream&); - virtual void finished(); + virtual void message(BasicStream&, const ThreadPoolPtr&); + virtual void finished(const ThreadPoolPtr&); virtual void exception(const ::Ice::LocalException&); private: @@ -91,13 +90,16 @@ private: void setState(State, const ::Ice::LocalException&); void setState(State); void closeConnection(); + void registerWithPool(); + void unregisterWithPool(); TransceiverPtr _transceiver; EndpointPtr _endpoint; ::Ice::ObjectAdapterPtr _adapter; - ThreadPoolPtr _threadPool; ::Ice::LoggerPtr _logger; TraceLevelsPtr _traceLevels; + ThreadPoolPtr _clientThreadPool; + ThreadPoolPtr _serverThreadPool; ::Ice::Int _nextRequestId; std::map< ::Ice::Int, Outgoing*> _requests; std::map< ::Ice::Int, Outgoing*>::iterator _requestsHint; diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 62070f0cde4..fae96436b60 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -282,12 +282,6 @@ IceInternal::IncomingConnectionFactory::connections() const } bool -IceInternal::IncomingConnectionFactory::server() const -{ - return true; -} - -bool IceInternal::IncomingConnectionFactory::readable() const { return false; @@ -300,11 +294,11 @@ IceInternal::IncomingConnectionFactory::read(BasicStream&) } void -IceInternal::IncomingConnectionFactory::message(BasicStream&) +IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPtr& threadPool) { IceUtil::Mutex::Lock sync(*this); - _threadPool->promoteFollower(); + threadPool->promoteFollower(); if (_state != StateActive) { @@ -354,18 +348,18 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&) } void -IceInternal::IncomingConnectionFactory::finished() +IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool) { IceUtil::Mutex::Lock sync(*this); - assert(_state == StateClosed || _state == StateHolding); - - _threadPool->promoteFollower(); + threadPool->promoteFollower(); - if (_state == StateClosed) + if (_state == StateActive) + { + registerWithPool(); + } + else if (_state == StateClosed) { - assert(_connections.empty()); - try { // @@ -407,7 +401,7 @@ IceInternal::IncomingConnectionFactory::exception(const LocalException&) /* bool -IceInternal::IncomingConnectionFactory::tryDestroy() +IceInternal::IncomingConnectionFactory::tryDestroy(const ThreadPoolPtr&) { // // Do nothing. We don't want collector factories to be closed by @@ -440,7 +434,6 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance _acceptor = _endpoint->acceptor(_endpoint); assert(_acceptor); _acceptor->listen(); - _threadPool = _instance->threadPool(); } } catch (...) @@ -478,12 +471,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) { return; } - - if (_threadPool) - { - _threadPool->_register(_acceptor->fd(), this); - } - + registerWithPool(); for_each(_connections.begin(), _connections.end(), ::Ice::voidMemFun(&Connection::activate)); break; } @@ -494,30 +482,22 @@ IceInternal::IncomingConnectionFactory::setState(State state) { return; } - - if (_threadPool) - { - _threadPool->unregister(_acceptor->fd()); - } - + unregisterWithPool(); for_each(_connections.begin(), _connections.end(), ::Ice::voidMemFun(&Connection::hold)); break; } case StateClosed: { - if (_threadPool) + // + // If we come from holding state, we first need to + // register again before we unregister. + // + if (_state == StateHolding) { - // - // If we come from holding state, we first need to - // register again before we unregister. - // - if (_state == StateHolding) - { - _threadPool->_register(_acceptor->fd(), this); - } - _threadPool->unregister(_acceptor->fd()); + registerWithPool(); } + unregisterWithPool(); #ifdef _STLP_BEGIN_NAMESPACE // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h @@ -535,3 +515,27 @@ IceInternal::IncomingConnectionFactory::setState(State state) _state = state; } + +void +IceInternal::IncomingConnectionFactory::registerWithPool() +{ + if (_acceptor) + { + if (!_serverThreadPool) + { + _serverThreadPool = _instance->serverThreadPool(); + assert(_serverThreadPool); + } + _serverThreadPool->_register(_acceptor->fd(), this); + } +} + +void +IceInternal::IncomingConnectionFactory::unregisterWithPool() +{ + if (_acceptor) + { + assert(_serverThreadPool); + _serverThreadPool->unregister(_acceptor->fd()); + } +} diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index 7b962c895cb..9dec534693a 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -67,11 +67,10 @@ public: // // Operations from EventHandler // - virtual bool server() const; virtual bool readable() const; virtual void read(BasicStream&); - virtual void message(BasicStream&); - virtual void finished(); + virtual void message(BasicStream&, const ThreadPoolPtr&); + virtual void finished(const ThreadPoolPtr&); virtual void exception(const ::Ice::LocalException&); private: @@ -89,12 +88,14 @@ private: }; void setState(State); + void registerWithPool(); + void unregisterWithPool(); EndpointPtr _endpoint; ::Ice::ObjectAdapterPtr _adapter; - ThreadPoolPtr _threadPool; AcceptorPtr _acceptor; TransceiverPtr _transceiver; + ThreadPoolPtr _serverThreadPool; std::list<ConnectionPtr> _connections; State _state; bool _warn; diff --git a/cpp/src/Ice/EventHandler.h b/cpp/src/Ice/EventHandler.h index 4d5434b579c..7951b43ea2c 100644 --- a/cpp/src/Ice/EventHandler.h +++ b/cpp/src/Ice/EventHandler.h @@ -32,12 +32,6 @@ class EventHandler : public ::IceUtil::Shared public: // - // Returns true if the event handler belongs to the server-side of - // an application. Client-side otherwise. - // - virtual bool server() const = 0; - - // // Return true if read() must be called before calling message(). // virtual bool readable() const = 0; @@ -51,14 +45,14 @@ public: // // A complete message has been received. // - virtual void message(BasicStream&) = 0; + virtual void message(BasicStream&, const ThreadPoolPtr&) = 0; // // Will be called if the event handler is finally // unregistered. (Calling unregister() does not unregister // immediately.) // - virtual void finished() = 0; + virtual void finished(const ThreadPoolPtr&) = 0; // // Propagate an exception to the event handler. @@ -70,7 +64,7 @@ public: // handler cannot be destroyed because it is in use, or true // otherwise. // -// virtual bool tryDestroy() = 0; +// virtual bool tryDestroy(const ThreadPoolPtr&) = 0; protected: diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index 92b97b5ab33..3a22390c446 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -73,7 +73,7 @@ void IceInternal::decRef(Instance* p) { p->__decRef(); } CommunicatorPtr IceInternal::Instance::communicator() { - IceUtil::Mutex::Lock sync(*this); + IceUtil::RecMutex::Lock sync(*this); return _communicator; } @@ -87,14 +87,14 @@ IceInternal::Instance::properties() LoggerPtr IceInternal::Instance::logger() { - IceUtil::Mutex::Lock sync(*this); + IceUtil::RecMutex::Lock sync(*this); return _logger; } void IceInternal::Instance::logger(const LoggerPtr& logger) { - IceUtil::Mutex::Lock sync(*this); + IceUtil::RecMutex::Lock sync(*this); _logger = logger; } @@ -128,57 +128,82 @@ IceInternal::Instance::getSslSystem() RouterManagerPtr IceInternal::Instance::routerManager() { - IceUtil::Mutex::Lock sync(*this); + IceUtil::RecMutex::Lock sync(*this); return _routerManager; } ReferenceFactoryPtr IceInternal::Instance::referenceFactory() { - IceUtil::Mutex::Lock sync(*this); + IceUtil::RecMutex::Lock sync(*this); return _referenceFactory; } ProxyFactoryPtr IceInternal::Instance::proxyFactory() { - IceUtil::Mutex::Lock sync(*this); + IceUtil::RecMutex::Lock sync(*this); return _proxyFactory; } OutgoingConnectionFactoryPtr IceInternal::Instance::outgoingConnectionFactory() { - IceUtil::Mutex::Lock sync(*this); + IceUtil::RecMutex::Lock sync(*this); return _outgoingConnectionFactory; } ObjectFactoryManagerPtr IceInternal::Instance::servantFactoryManager() { - IceUtil::Mutex::Lock sync(*this); + IceUtil::RecMutex::Lock sync(*this); return _servantFactoryManager; } UserExceptionFactoryManagerPtr IceInternal::Instance::userExceptionFactoryManager() { - IceUtil::Mutex::Lock sync(*this); + IceUtil::RecMutex::Lock sync(*this); return _userExceptionFactoryManager; } ObjectAdapterFactoryPtr IceInternal::Instance::objectAdapterFactory() { - IceUtil::Mutex::Lock sync(*this); + IceUtil::RecMutex::Lock sync(*this); return _objectAdapterFactory; } ThreadPoolPtr -IceInternal::Instance::threadPool() +IceInternal::Instance::clientThreadPool() { - IceUtil::Mutex::Lock sync(*this); - return _threadPool; + IceUtil::RecMutex::Lock sync(*this); + + if (_communicator) // Not destroyed? + { + if (!_clientThreadPool) // Lazy initialization. + { + _clientThreadPool = new ThreadPool(this, false); + } + } + + return _clientThreadPool; +} + +ThreadPoolPtr +IceInternal::Instance::serverThreadPool() +{ + IceUtil::RecMutex::Lock sync(*this); + + if (_communicator) // Not destroyed? + { + if (!_serverThreadPool) // Lazy initialization. + { + _serverThreadPool = new ThreadPool(this, true); + } + } + + return _serverThreadPool; } IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const PropertiesPtr& properties) : @@ -342,10 +367,14 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Prope } // - // Thread pool initialization must be done after daemon() is + // Thread pool initializations must be done after daemon() is // called, since daemon() forks. // - _threadPool = new ThreadPool(this); + + // + // Thread pool initialization is now lazy initialization in + // clientThreadPool() and serverThreadPool(). + // __setNoDelete(false); } catch(...) @@ -366,7 +395,8 @@ IceInternal::Instance::~Instance() assert(!_servantFactoryManager); assert(!_userExceptionFactoryManager); assert(!_objectAdapterFactory); - assert(!_threadPool); + assert(!_clientThreadPool); + assert(!_serverThreadPool); assert(!_routerManager); assert(!_sslSystem); @@ -408,10 +438,11 @@ IceInternal::Instance::~Instance() void IceInternal::Instance::destroy() { - ThreadPoolPtr threadPool; + ThreadPoolPtr clientThreadPool; + ThreadPoolPtr serverThreadPool; { - IceUtil::Mutex::Lock sync(*this); + IceUtil::RecMutex::Lock sync(*this); // // Destroy all contained objects. Then set all references to null, @@ -479,14 +510,23 @@ IceInternal::Instance::destroy() // We destroy the thread pool outside the thread // synchronization. // - threadPool = _threadPool; - _threadPool = 0; + clientThreadPool = _clientThreadPool; + _clientThreadPool = 0; + serverThreadPool = _serverThreadPool; + _serverThreadPool = 0; } - if (threadPool) + if (clientThreadPool) + { + clientThreadPool->waitUntilFinished(); + clientThreadPool->destroy(); + clientThreadPool->joinWithAllThreads(); + } + + if (serverThreadPool) { - threadPool->waitUntilFinished(); - threadPool->destroy(); - threadPool->joinWithAllThreads(); + serverThreadPool->waitUntilFinished(); + serverThreadPool->destroy(); + serverThreadPool->joinWithAllThreads(); } } diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h index baf800e648f..7b8cd41c553 100644 --- a/cpp/src/Ice/Instance.h +++ b/cpp/src/Ice/Instance.h @@ -13,6 +13,7 @@ #include <IceUtil/Shared.h> #include <IceUtil/Mutex.h> +#include <IceUtil/RecMutex.h> #include <Ice/InstanceF.h> #include <Ice/CommunicatorF.h> #include <Ice/PropertiesF.h> @@ -40,7 +41,7 @@ class CommunicatorI; namespace IceInternal { -class Instance : public ::IceUtil::Shared, public ::IceUtil::Mutex +class Instance : public ::IceUtil::Shared, public ::IceUtil::RecMutex { public: @@ -56,7 +57,8 @@ public: ObjectFactoryManagerPtr servantFactoryManager(); UserExceptionFactoryManagerPtr userExceptionFactoryManager(); ObjectAdapterFactoryPtr objectAdapterFactory(); - ThreadPoolPtr threadPool(); + ThreadPoolPtr clientThreadPool(); + ThreadPoolPtr serverThreadPool(); std::string defaultProtocol(); std::string defaultHost(); ::IceSSL::SystemInternalPtr getSslSystem(); @@ -79,7 +81,8 @@ private: ObjectFactoryManagerPtr _servantFactoryManager; UserExceptionFactoryManagerPtr _userExceptionFactoryManager; ObjectAdapterFactoryPtr _objectAdapterFactory; - ThreadPoolPtr _threadPool; + ThreadPoolPtr _clientThreadPool; + ThreadPoolPtr _serverThreadPool; std::string _defaultProtocol; // Immutable, not reset by destroy(). std::string _defaultHost; // Immutable, not reset by destroy(). ::IceSSL::SystemInternalPtr _sslSystem; diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index fcee3afb792..07dedbead26 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -30,14 +30,7 @@ void IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if (handler->server()) - { - ++_servers; - } - else - { - ++_clients; - } + ++_handlers; _changes.push_back(make_pair(fd, handler)); setInterrupt(0); } @@ -51,70 +44,30 @@ IceInternal::ThreadPool::unregister(SOCKET fd) } void -IceInternal::ThreadPool::serverIsNowClient() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - ++_clients; - assert(_servers > 0); - --_servers; - if (_servers == 0) - { - notifyAll(); // For waitUntil...Finished() methods. - } -} - -void -IceInternal::ThreadPool::clientIsNowServer() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - ++_servers; - assert(_clients > 0); - --_clients; -} - -void IceInternal::ThreadPool::promoteFollower() { _threadMutex.unlock(); } void -IceInternal::ThreadPool::initiateServerShutdown() +IceInternal::ThreadPool::initiateShutdown() { setInterrupt(1); } void -IceInternal::ThreadPool::waitUntilServerFinished() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - while (_servers != 0 && _threadNum != 0) - { - wait(); - } - - if (_servers != 0) - { - Error out(_logger); - out << "can't wait for graceful server termination in thread pool\n" - << "since all threads have vanished"; - } -} - -void IceInternal::ThreadPool::waitUntilFinished() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - while (_clients + _servers != 0 && _threadNum != 0) + while (_handlers != 0 && _threadNum != 0) { wait(); } - if (_clients + _servers != 0) + if (_handlers != 0) { - Error out(_logger); + Error out(_instance->logger()); out << "can't wait for graceful application termination in thread pool\n" << "since all threads have vanished"; } @@ -139,35 +92,11 @@ IceInternal::ThreadPool::joinWithAllThreads() } } -void -IceInternal::ThreadPool::setMaxConnections(int maxConnections) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if (maxConnections < _threadNum + 1 && maxConnections != 0) - { - _maxConnections = _threadNum + 1; - } - else - { - _maxConnections = maxConnections; - } -} - -int -IceInternal::ThreadPool::getMaxConnections() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - return _maxConnections; -} - -IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) : +IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, bool server) : _instance(instance), - _logger(_instance->logger()), - _properties(_instance->properties()), _destroyed(false), _lastFd(INVALID_SOCKET), - _clients(0), - _servers(0), + _handlers(0), _timeout(0) { SOCKET fds[2]; @@ -181,8 +110,16 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) : _maxFd = _fdIntrRead; _minFd = _fdIntrRead; - _timeout = _properties->getPropertyAsInt("Ice.ServerIdleTime"); - _threadNum = _properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Size", 10); + if (server) + { + _timeout = _instance->properties()->getPropertyAsInt("Ice.ServerIdleTime"); + _threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ServerThreadPool.Size", 10); + } + else + { + _threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ClientThreadPool.Size", 1); + } + if (_threadNum < 1) { _threadNum = 1; @@ -205,9 +142,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) : destroy(); throw; } - - // Must be called after _threadNum is set. - setMaxConnections(_properties->getPropertyAsInt("Ice.ThreadPool.MaxConnections")); } IceInternal::ThreadPool::~ThreadPool() @@ -297,12 +231,13 @@ repeat: void IceInternal::ThreadPool::run() { + ThreadPoolPtr self = this; bool shutdown = false; while (true) { _threadMutex.lock(); - + repeatSelect: if (shutdown) // Shutdown has been initiated. @@ -438,7 +373,7 @@ IceInternal::ThreadPool::run() // if (fdSet.fd_count == 0) { - Error out(_logger); + Error out(_instance->logger()); out << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; goto repeatSelect; } @@ -490,7 +425,7 @@ IceInternal::ThreadPool::run() if (loops > 1) { - Error out(_logger); + Error out(_instance->logger()); out << "select() in thread pool returned " << ret << " but no filedescriptor is readable"; goto repeatSelect; } @@ -501,7 +436,7 @@ IceInternal::ThreadPool::run() map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd); if(p == _handlerMap.end()) { - Error out(_logger); + Error out(_instance->logger()); out << "filedescriptor " << _lastFd << " not registered with the thread pool"; goto repeatSelect; } @@ -518,23 +453,14 @@ IceInternal::ThreadPool::run() // Notify a handler about it's removal from the thread // pool. // - handler->finished(); + handler->finished(self); // "self" is faster than "this", as the reference count is not modified. { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if (handler->server()) - { - assert(_servers > 0); - --_servers; - } - else - { - assert(_clients > 0); - --_clients; - } - if (_clients == 0 || _servers == 0) + assert(_handlers > 0); + if (--_handlers == 0) { - notifyAll(); // For waitUntil...Finished() methods. + notifyAll(); // For waitUntilFinished(). } } } @@ -564,7 +490,7 @@ IceInternal::ThreadPool::run() assert(stream.i == stream.b.end()); } - handler->message(stream); + handler->message(stream, self); // "self" is faster than "this", as the reference count is not modified. } } } @@ -640,12 +566,12 @@ IceInternal::ThreadPool::EventHandlerThread::run() } catch (const Exception& ex) { - Error out(_pool->_logger); + Error out(_pool->_instance->logger()); out << "exception in thread pool:\n" << ex; } catch (...) { - Error out(_pool->_logger); + Error out(_pool->_instance->logger()); out << "unknown exception in thread pool"; } diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index 464bbb3711e..7effc029dfa 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -38,19 +38,14 @@ public: void _register(SOCKET, const EventHandlerPtr&); void unregister(SOCKET); - void serverIsNowClient(); - void clientIsNowServer(); void promoteFollower(); - void initiateServerShutdown(); // Signal-safe shutdown initiation. - void waitUntilServerFinished(); + void initiateShutdown(); // Signal-safe shutdown initiation. void waitUntilFinished(); void joinWithAllThreads(); - void setMaxConnections(int); - int getMaxConnections(); private: - ThreadPool(const InstancePtr&); + ThreadPool(const InstancePtr&, bool); virtual ~ThreadPool(); void destroy(); friend class Instance; @@ -62,8 +57,6 @@ private: void read(const EventHandlerPtr&); InstancePtr _instance; - ::Ice::LoggerPtr _logger; - ::Ice::PropertiesPtr _properties; bool _destroyed; SOCKET _maxFd; SOCKET _minFd; @@ -73,8 +66,7 @@ private: fd_set _fdSet; std::list<std::pair<SOCKET, EventHandlerPtr> > _changes; // Event handler set for addition; null for removal. std::map<SOCKET, EventHandlerPtr> _handlerMap; - int _clients; - int _servers; + int _handlers; int _timeout; ::IceUtil::Mutex _threadMutex; @@ -93,7 +85,6 @@ private: std::vector<IceUtil::ThreadControl> _threads; // Control for all threads, running or not. int _threadNum; // Number of running threads. - int _maxConnections; // Maximum number of connections. If set to zero, the number of connections is not limited. }; } |