summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2002-04-18 19:52:58 +0000
committerMarc Laukien <marc@zeroc.com>2002-04-18 19:52:58 +0000
commit9d5a45489ae73aba5ce15767037df8377aeab91b (patch)
tree55e63caf4fcc9c5c46eb700618cfb7d31a902053 /cpp
parentminor update. (diff)
downloadice-9d5a45489ae73aba5ce15767037df8377aeab91b.tar.bz2
ice-9d5a45489ae73aba5ce15767037df8377aeab91b.tar.xz
ice-9d5a45489ae73aba5ce15767037df8377aeab91b.zip
massive thread pool changes
Diffstat (limited to 'cpp')
-rw-r--r--cpp/config/Make.rules4
-rw-r--r--cpp/demo/Ice/nested/Client.cpp6
-rw-r--r--cpp/doc/Properties.sgml30
-rw-r--r--cpp/src/Ice/CommunicatorI.cpp8
-rw-r--r--cpp/src/Ice/CommunicatorI.h13
-rw-r--r--cpp/src/Ice/Connection.cpp105
-rw-r--r--cpp/src/Ice/Connection.h10
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp82
-rw-r--r--cpp/src/Ice/ConnectionFactory.h9
-rw-r--r--cpp/src/Ice/EventHandler.h12
-rw-r--r--cpp/src/Ice/Instance.cpp88
-rw-r--r--cpp/src/Ice/Instance.h9
-rw-r--r--cpp/src/Ice/ThreadPool.cpp132
-rw-r--r--cpp/src/Ice/ThreadPool.h15
14 files changed, 256 insertions, 267 deletions
diff --git a/cpp/config/Make.rules b/cpp/config/Make.rules
index 75b86f9e542..00d0ff29905 100644
--- a/cpp/config/Make.rules
+++ b/cpp/config/Make.rules
@@ -38,8 +38,8 @@ slicedir = $(top_srcdir)/slice
# set different compilation options for debug or optimization
#
CXX = c++
-CXXFLAGS = -g -ftemplate-depth-128 -fPIC -Wall
-#CXXFLAGS = -O3 -DNDEBUG -ftemplate-depth-128 -fPIC -Wall
+#CXXFLAGS = -g -ftemplate-depth-128 -fPIC -Wall
+CXXFLAGS = -O3 -DNDEBUG -ftemplate-depth-128 -fPIC -Wall
CPPFLAGS = -I$(includedir) -I$(STLPORT_HOME)/include/stlport
LDFLAGS = -L$(libdir) -L$(STLPORT_HOME)/lib -L$(OPENSSL_HOME)/lib -L$(XERCESC_HOME)/lib
BASELIBS = -lIceUtil -lstlport_gcc -lpthread -luuid -ldl
diff --git a/cpp/demo/Ice/nested/Client.cpp b/cpp/demo/Ice/nested/Client.cpp
index 11a37001a26..d915035b543 100644
--- a/cpp/demo/Ice/nested/Client.cpp
+++ b/cpp/demo/Ice/nested/Client.cpp
@@ -54,9 +54,9 @@ NestedClient::run(int argc, char* argv[])
adapter->activate();
cout << "Note: The maximum nesting level is (sz - 1) * 2, with sz\n"
- << "being the number of threads in the thread pool. if you\n"
- << "specify a value higher than that, the application will block\n"
- << "or timeout.\n"
+ << "being the number of threads in the server thread pool. if\n"
+ << "you specify a value higher than that, the application will\n"
+ << "block or timeout.\n"
<< endl;
string s;
diff --git a/cpp/doc/Properties.sgml b/cpp/doc/Properties.sgml
index 07a37835afa..5142c9f5341 100644
--- a/cpp/doc/Properties.sgml
+++ b/cpp/doc/Properties.sgml
@@ -179,24 +179,7 @@ has been created with
<section><title>&Ice; Thread Pool Properties</title>
<!-- ********************************************************************** -->
-<section><title>Ice.ThreadPool.MaxConnections</title>
-<section><title>Synopsis</title>
-<synopsis>
-Ice.ThreadPool.MaxConnections=<replaceable>num</replaceable>
-</synopsis>
-</section>
-<section>
-<title>Description</title>
-<para>
-The maximum number of connections the thread pool will use. Default is
-zero, meaning no limit. If <replaceable>num</replaceable> is not set
-to zero, it must be set to a value larger than the number of threads
-in the thread pool. <note><para>Currently not implemented!</para></note>
-</para>
-</section>
-</section>
-
-<section><title>Ice.ThreadPool.Size</title>
+<section><title>Ice.ClientThreadPool.Size, Ice.ServerThreadPool.Size</title>
<section><title>Synopsis</title>
<synopsis>
Ice.ThreadPool.Size=<replaceable>num</replaceable>
@@ -205,8 +188,15 @@ Ice.ThreadPool.Size=<replaceable>num</replaceable>
<section>
<title>Description</title>
<para>
-The number of threads in the thread pool. Default is 10.
-<replaceable>num</replaceable> must be larger than zero.
+The number of threads in the client and server thread pools. Default
+is one thread for the client thread pool, and 10 threads for the
+server thread pool. The number of threads in each pool must be larger
+than zero. <note><para> At present, there is no need to set the number
+of threads in the client thread pool to a value larger than
+one. Having multiple threads will only be of benefit with future
+versions of &Ice; that support callbacks on the client, which are
+invoked when a response from the server is received (for asynchronous
+method invocations). </para></note>
</para>
</section>
</section>
diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp
index bc98c080681..bf46e3d3544 100644
--- a/cpp/src/Ice/CommunicatorI.cpp
+++ b/cpp/src/Ice/CommunicatorI.cpp
@@ -45,7 +45,7 @@ Ice::CommunicatorI::shutdown()
//
// No mutex locking here! This operation must be signal-safe.
//
- _threadPool->initiateServerShutdown();
+ _serverThreadPool->initiateShutdown();
}
void
@@ -55,7 +55,7 @@ Ice::CommunicatorI::waitForShutdown()
// No mutex locking here, otherwise the communicator is blocked
// while waiting for shutdown.
//
- _threadPool->waitUntilServerFinished();
+ _serverThreadPool->waitUntilFinished();
}
ObjectPrx
@@ -258,9 +258,9 @@ Ice::CommunicatorI::CommunicatorI(const PropertiesPtr& properties)
//
// See the comments in the header file for an explanation of why we
- // need _threadPool directly in CommunicatorI.
+ // need _serverThreadPool directly in CommunicatorI.
//
- _threadPool = _instance->threadPool();
+ _serverThreadPool = _instance->serverThreadPool();
}
Ice::CommunicatorI::~CommunicatorI()
diff --git a/cpp/src/Ice/CommunicatorI.h b/cpp/src/Ice/CommunicatorI.h
index 7c82efbbcf2..6e1c19598a2 100644
--- a/cpp/src/Ice/CommunicatorI.h
+++ b/cpp/src/Ice/CommunicatorI.h
@@ -68,13 +68,14 @@ private:
::IceInternal::InstancePtr _instance;
//
- // We need _threadPool directly in CommunicatorI, and it must
- // never be set to null. That's because the shutdown() operation
- // is signal-safe, and thus must not access any mutex locks or
- // _instance. It may only access _threadPool->initiateShutdown(),
- // which is signal-safe as well.
+ // We need _serverThreadPool directly in CommunicatorI, and it
+ // must never be set to null. That's because the shutdown()
+ // operation is signal-safe, and thus must not access any mutex
+ // locks or _instance. It may only access
+ // _serverThreadPool->initiateShutdown(), which is signal-safe as
+ // well.
//
- ::IceInternal::ThreadPoolPtr _threadPool;
+ ::IceInternal::ThreadPoolPtr _serverThreadPool;
};
}
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 6ae51a0d9eb..5920e58d1c3 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -252,26 +252,31 @@ void
IceInternal::Connection::setAdapter(const ObjectAdapterPtr& adapter)
{
IceUtil::RecMutex::Lock sync(*this);
-
+
//
- // In closed and holding state, we are not registered with the
- // thread pool. For all other states, we have to notify the thread
- // pool in case this event handler changed from a client to a
- // server or vice versa.
+ // We are registered with a thread pool in active and closing
+ // mode. However, we only change subscription if we're in active
+ // mode, and thus ignore closing mode here.k
//
- if (_state != StateHolding && _state != StateClosed)
+ if (_state == StateActive)
{
if (adapter && !_adapter)
{
- _threadPool->clientIsNowServer();
+ //
+ // Client is now server.
+ //
+ unregisterWithPool();
}
if (!adapter && _adapter)
{
- _threadPool->serverIsNowClient();
+ //
+ // Server is now client.
+ //
+ unregisterWithPool();
}
}
-
+
_adapter = adapter;
}
@@ -283,13 +288,6 @@ IceInternal::Connection::getAdapter() const
}
bool
-IceInternal::Connection::server() const
-{
- IceUtil::RecMutex::Lock sync(*this);
- return _adapter;
-}
-
-bool
IceInternal::Connection::readable() const
{
return true;
@@ -302,7 +300,7 @@ IceInternal::Connection::read(BasicStream& stream)
}
void
-IceInternal::Connection::message(BasicStream& stream)
+IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
{
bool invoke = false;
bool batch = false;
@@ -310,7 +308,7 @@ IceInternal::Connection::message(BasicStream& stream)
{
IceUtil::RecMutex::Lock sync(*this);
- _threadPool->promoteFollower();
+ threadPool->promoteFollower();
if (_state == StateClosed)
{
@@ -555,15 +553,17 @@ IceInternal::Connection::message(BasicStream& stream)
}
void
-IceInternal::Connection::finished()
+IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
{
IceUtil::RecMutex::Lock sync(*this);
- assert(_state == StateClosed || _state == StateHolding);
-
- _threadPool->promoteFollower();
+ threadPool->promoteFollower();
- if (_state == StateClosed)
+ if (_state == StateActive || _state == StateClosing)
+ {
+ registerWithPool();
+ }
+ else if (_state == StateClosed)
{
_transceiver->close();
}
@@ -578,7 +578,7 @@ IceInternal::Connection::exception(const LocalException& ex)
/*
bool
-IceInternal::Connection::tryDestroy()
+IceInternal::Connection::tryDestroy(const ThreadPoolPtr& threadPool)
{
bool isLocked = trylock();
if(!isLocked)
@@ -586,7 +586,7 @@ IceInternal::Connection::tryDestroy()
return false;
}
- _threadPool->promoteFollower();
+ threadPool->promoteFollower();
try
{
@@ -611,12 +611,11 @@ IceInternal::Connection::Connection(const InstancePtr& instance,
_transceiver(transceiver),
_endpoint(endpoint),
_adapter(adapter),
- _threadPool(instance->threadPool()),
- _logger(instance->logger()),
- _traceLevels(instance->traceLevels()),
+ _logger(_instance->logger()),
+ _traceLevels(_instance->traceLevels()),
_nextRequestId(1),
_requestsHint(_requests.end()),
- _batchStream(instance),
+ _batchStream(_instance),
_responseCount(0),
_proxyUsageCount(0),
_state(StateHolding)
@@ -713,7 +712,7 @@ IceInternal::Connection::setState(State state)
{
return;
}
- _threadPool->_register(_transceiver->fd(), this);
+ registerWithPool();
break;
}
@@ -723,7 +722,7 @@ IceInternal::Connection::setState(State state)
{
return;
}
- _threadPool->unregister(_transceiver->fd());
+ unregisterWithPool();
break;
}
@@ -738,7 +737,7 @@ IceInternal::Connection::setState(State state)
//
// We need to continue to read data in closing state.
//
- _threadPool->_register(_transceiver->fd(), this);
+ registerWithPool();
}
break;
}
@@ -752,9 +751,9 @@ IceInternal::Connection::setState(State state)
// register again before we unregister, so that
// finished() is called correctly.
//
- _threadPool->_register(_transceiver->fd(), this);
+ registerWithPool();
}
- _threadPool->unregister(_transceiver->fd());
+ unregisterWithPool();
break;
}
}
@@ -787,3 +786,41 @@ IceInternal::Connection::closeConnection()
_transceiver->write(os, _endpoint->timeout());
_transceiver->shutdown();
}
+
+void
+IceInternal::Connection::registerWithPool()
+{
+ if (_adapter)
+ {
+ if (!_serverThreadPool)
+ {
+ _serverThreadPool = _instance->serverThreadPool();
+ assert(_serverThreadPool);
+ }
+ _serverThreadPool->_register(_transceiver->fd(), this);
+ }
+ else
+ {
+ if (!_clientThreadPool)
+ {
+ _clientThreadPool = _instance->clientThreadPool();
+ assert(_clientThreadPool);
+ }
+ _clientThreadPool->_register(_transceiver->fd(), this);
+ }
+}
+
+void
+IceInternal::Connection::unregisterWithPool()
+{
+ if (_adapter)
+ {
+ assert(_serverThreadPool);
+ _serverThreadPool->unregister(_transceiver->fd());
+ }
+ else
+ {
+ assert(_clientThreadPool);
+ _clientThreadPool->unregister(_transceiver->fd());
+ }
+}
diff --git a/cpp/src/Ice/Connection.h b/cpp/src/Ice/Connection.h
index 18f95a38230..de623c0187e 100644
--- a/cpp/src/Ice/Connection.h
+++ b/cpp/src/Ice/Connection.h
@@ -60,11 +60,10 @@ public:
//
// Operations from EventHandler
//
- virtual bool server() const;
virtual bool readable() const;
virtual void read(BasicStream&);
- virtual void message(BasicStream&);
- virtual void finished();
+ virtual void message(BasicStream&, const ThreadPoolPtr&);
+ virtual void finished(const ThreadPoolPtr&);
virtual void exception(const ::Ice::LocalException&);
private:
@@ -91,13 +90,16 @@ private:
void setState(State, const ::Ice::LocalException&);
void setState(State);
void closeConnection();
+ void registerWithPool();
+ void unregisterWithPool();
TransceiverPtr _transceiver;
EndpointPtr _endpoint;
::Ice::ObjectAdapterPtr _adapter;
- ThreadPoolPtr _threadPool;
::Ice::LoggerPtr _logger;
TraceLevelsPtr _traceLevels;
+ ThreadPoolPtr _clientThreadPool;
+ ThreadPoolPtr _serverThreadPool;
::Ice::Int _nextRequestId;
std::map< ::Ice::Int, Outgoing*> _requests;
std::map< ::Ice::Int, Outgoing*>::iterator _requestsHint;
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 62070f0cde4..fae96436b60 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -282,12 +282,6 @@ IceInternal::IncomingConnectionFactory::connections() const
}
bool
-IceInternal::IncomingConnectionFactory::server() const
-{
- return true;
-}
-
-bool
IceInternal::IncomingConnectionFactory::readable() const
{
return false;
@@ -300,11 +294,11 @@ IceInternal::IncomingConnectionFactory::read(BasicStream&)
}
void
-IceInternal::IncomingConnectionFactory::message(BasicStream&)
+IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPtr& threadPool)
{
IceUtil::Mutex::Lock sync(*this);
- _threadPool->promoteFollower();
+ threadPool->promoteFollower();
if (_state != StateActive)
{
@@ -354,18 +348,18 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&)
}
void
-IceInternal::IncomingConnectionFactory::finished()
+IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool)
{
IceUtil::Mutex::Lock sync(*this);
- assert(_state == StateClosed || _state == StateHolding);
-
- _threadPool->promoteFollower();
+ threadPool->promoteFollower();
- if (_state == StateClosed)
+ if (_state == StateActive)
+ {
+ registerWithPool();
+ }
+ else if (_state == StateClosed)
{
- assert(_connections.empty());
-
try
{
//
@@ -407,7 +401,7 @@ IceInternal::IncomingConnectionFactory::exception(const LocalException&)
/*
bool
-IceInternal::IncomingConnectionFactory::tryDestroy()
+IceInternal::IncomingConnectionFactory::tryDestroy(const ThreadPoolPtr&)
{
//
// Do nothing. We don't want collector factories to be closed by
@@ -440,7 +434,6 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
_acceptor = _endpoint->acceptor(_endpoint);
assert(_acceptor);
_acceptor->listen();
- _threadPool = _instance->threadPool();
}
}
catch (...)
@@ -478,12 +471,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
return;
}
-
- if (_threadPool)
- {
- _threadPool->_register(_acceptor->fd(), this);
- }
-
+ registerWithPool();
for_each(_connections.begin(), _connections.end(), ::Ice::voidMemFun(&Connection::activate));
break;
}
@@ -494,30 +482,22 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
return;
}
-
- if (_threadPool)
- {
- _threadPool->unregister(_acceptor->fd());
- }
-
+ unregisterWithPool();
for_each(_connections.begin(), _connections.end(), ::Ice::voidMemFun(&Connection::hold));
break;
}
case StateClosed:
{
- if (_threadPool)
+ //
+ // If we come from holding state, we first need to
+ // register again before we unregister.
+ //
+ if (_state == StateHolding)
{
- //
- // If we come from holding state, we first need to
- // register again before we unregister.
- //
- if (_state == StateHolding)
- {
- _threadPool->_register(_acceptor->fd(), this);
- }
- _threadPool->unregister(_acceptor->fd());
+ registerWithPool();
}
+ unregisterWithPool();
#ifdef _STLP_BEGIN_NAMESPACE
// voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h
@@ -535,3 +515,27 @@ IceInternal::IncomingConnectionFactory::setState(State state)
_state = state;
}
+
+void
+IceInternal::IncomingConnectionFactory::registerWithPool()
+{
+ if (_acceptor)
+ {
+ if (!_serverThreadPool)
+ {
+ _serverThreadPool = _instance->serverThreadPool();
+ assert(_serverThreadPool);
+ }
+ _serverThreadPool->_register(_acceptor->fd(), this);
+ }
+}
+
+void
+IceInternal::IncomingConnectionFactory::unregisterWithPool()
+{
+ if (_acceptor)
+ {
+ assert(_serverThreadPool);
+ _serverThreadPool->unregister(_acceptor->fd());
+ }
+}
diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h
index 7b962c895cb..9dec534693a 100644
--- a/cpp/src/Ice/ConnectionFactory.h
+++ b/cpp/src/Ice/ConnectionFactory.h
@@ -67,11 +67,10 @@ public:
//
// Operations from EventHandler
//
- virtual bool server() const;
virtual bool readable() const;
virtual void read(BasicStream&);
- virtual void message(BasicStream&);
- virtual void finished();
+ virtual void message(BasicStream&, const ThreadPoolPtr&);
+ virtual void finished(const ThreadPoolPtr&);
virtual void exception(const ::Ice::LocalException&);
private:
@@ -89,12 +88,14 @@ private:
};
void setState(State);
+ void registerWithPool();
+ void unregisterWithPool();
EndpointPtr _endpoint;
::Ice::ObjectAdapterPtr _adapter;
- ThreadPoolPtr _threadPool;
AcceptorPtr _acceptor;
TransceiverPtr _transceiver;
+ ThreadPoolPtr _serverThreadPool;
std::list<ConnectionPtr> _connections;
State _state;
bool _warn;
diff --git a/cpp/src/Ice/EventHandler.h b/cpp/src/Ice/EventHandler.h
index 4d5434b579c..7951b43ea2c 100644
--- a/cpp/src/Ice/EventHandler.h
+++ b/cpp/src/Ice/EventHandler.h
@@ -32,12 +32,6 @@ class EventHandler : public ::IceUtil::Shared
public:
//
- // Returns true if the event handler belongs to the server-side of
- // an application. Client-side otherwise.
- //
- virtual bool server() const = 0;
-
- //
// Return true if read() must be called before calling message().
//
virtual bool readable() const = 0;
@@ -51,14 +45,14 @@ public:
//
// A complete message has been received.
//
- virtual void message(BasicStream&) = 0;
+ virtual void message(BasicStream&, const ThreadPoolPtr&) = 0;
//
// Will be called if the event handler is finally
// unregistered. (Calling unregister() does not unregister
// immediately.)
//
- virtual void finished() = 0;
+ virtual void finished(const ThreadPoolPtr&) = 0;
//
// Propagate an exception to the event handler.
@@ -70,7 +64,7 @@ public:
// handler cannot be destroyed because it is in use, or true
// otherwise.
//
-// virtual bool tryDestroy() = 0;
+// virtual bool tryDestroy(const ThreadPoolPtr&) = 0;
protected:
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp
index 92b97b5ab33..3a22390c446 100644
--- a/cpp/src/Ice/Instance.cpp
+++ b/cpp/src/Ice/Instance.cpp
@@ -73,7 +73,7 @@ void IceInternal::decRef(Instance* p) { p->__decRef(); }
CommunicatorPtr
IceInternal::Instance::communicator()
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::RecMutex::Lock sync(*this);
return _communicator;
}
@@ -87,14 +87,14 @@ IceInternal::Instance::properties()
LoggerPtr
IceInternal::Instance::logger()
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::RecMutex::Lock sync(*this);
return _logger;
}
void
IceInternal::Instance::logger(const LoggerPtr& logger)
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::RecMutex::Lock sync(*this);
_logger = logger;
}
@@ -128,57 +128,82 @@ IceInternal::Instance::getSslSystem()
RouterManagerPtr
IceInternal::Instance::routerManager()
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::RecMutex::Lock sync(*this);
return _routerManager;
}
ReferenceFactoryPtr
IceInternal::Instance::referenceFactory()
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::RecMutex::Lock sync(*this);
return _referenceFactory;
}
ProxyFactoryPtr
IceInternal::Instance::proxyFactory()
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::RecMutex::Lock sync(*this);
return _proxyFactory;
}
OutgoingConnectionFactoryPtr
IceInternal::Instance::outgoingConnectionFactory()
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::RecMutex::Lock sync(*this);
return _outgoingConnectionFactory;
}
ObjectFactoryManagerPtr
IceInternal::Instance::servantFactoryManager()
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::RecMutex::Lock sync(*this);
return _servantFactoryManager;
}
UserExceptionFactoryManagerPtr
IceInternal::Instance::userExceptionFactoryManager()
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::RecMutex::Lock sync(*this);
return _userExceptionFactoryManager;
}
ObjectAdapterFactoryPtr
IceInternal::Instance::objectAdapterFactory()
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::RecMutex::Lock sync(*this);
return _objectAdapterFactory;
}
ThreadPoolPtr
-IceInternal::Instance::threadPool()
+IceInternal::Instance::clientThreadPool()
{
- IceUtil::Mutex::Lock sync(*this);
- return _threadPool;
+ IceUtil::RecMutex::Lock sync(*this);
+
+ if (_communicator) // Not destroyed?
+ {
+ if (!_clientThreadPool) // Lazy initialization.
+ {
+ _clientThreadPool = new ThreadPool(this, false);
+ }
+ }
+
+ return _clientThreadPool;
+}
+
+ThreadPoolPtr
+IceInternal::Instance::serverThreadPool()
+{
+ IceUtil::RecMutex::Lock sync(*this);
+
+ if (_communicator) // Not destroyed?
+ {
+ if (!_serverThreadPool) // Lazy initialization.
+ {
+ _serverThreadPool = new ThreadPool(this, true);
+ }
+ }
+
+ return _serverThreadPool;
}
IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const PropertiesPtr& properties) :
@@ -342,10 +367,14 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Prope
}
//
- // Thread pool initialization must be done after daemon() is
+ // Thread pool initializations must be done after daemon() is
// called, since daemon() forks.
//
- _threadPool = new ThreadPool(this);
+
+ //
+ // Thread pool initialization is now lazy initialization in
+ // clientThreadPool() and serverThreadPool().
+ //
__setNoDelete(false);
}
catch(...)
@@ -366,7 +395,8 @@ IceInternal::Instance::~Instance()
assert(!_servantFactoryManager);
assert(!_userExceptionFactoryManager);
assert(!_objectAdapterFactory);
- assert(!_threadPool);
+ assert(!_clientThreadPool);
+ assert(!_serverThreadPool);
assert(!_routerManager);
assert(!_sslSystem);
@@ -408,10 +438,11 @@ IceInternal::Instance::~Instance()
void
IceInternal::Instance::destroy()
{
- ThreadPoolPtr threadPool;
+ ThreadPoolPtr clientThreadPool;
+ ThreadPoolPtr serverThreadPool;
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::RecMutex::Lock sync(*this);
//
// Destroy all contained objects. Then set all references to null,
@@ -479,14 +510,23 @@ IceInternal::Instance::destroy()
// We destroy the thread pool outside the thread
// synchronization.
//
- threadPool = _threadPool;
- _threadPool = 0;
+ clientThreadPool = _clientThreadPool;
+ _clientThreadPool = 0;
+ serverThreadPool = _serverThreadPool;
+ _serverThreadPool = 0;
}
- if (threadPool)
+ if (clientThreadPool)
+ {
+ clientThreadPool->waitUntilFinished();
+ clientThreadPool->destroy();
+ clientThreadPool->joinWithAllThreads();
+ }
+
+ if (serverThreadPool)
{
- threadPool->waitUntilFinished();
- threadPool->destroy();
- threadPool->joinWithAllThreads();
+ serverThreadPool->waitUntilFinished();
+ serverThreadPool->destroy();
+ serverThreadPool->joinWithAllThreads();
}
}
diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h
index baf800e648f..7b8cd41c553 100644
--- a/cpp/src/Ice/Instance.h
+++ b/cpp/src/Ice/Instance.h
@@ -13,6 +13,7 @@
#include <IceUtil/Shared.h>
#include <IceUtil/Mutex.h>
+#include <IceUtil/RecMutex.h>
#include <Ice/InstanceF.h>
#include <Ice/CommunicatorF.h>
#include <Ice/PropertiesF.h>
@@ -40,7 +41,7 @@ class CommunicatorI;
namespace IceInternal
{
-class Instance : public ::IceUtil::Shared, public ::IceUtil::Mutex
+class Instance : public ::IceUtil::Shared, public ::IceUtil::RecMutex
{
public:
@@ -56,7 +57,8 @@ public:
ObjectFactoryManagerPtr servantFactoryManager();
UserExceptionFactoryManagerPtr userExceptionFactoryManager();
ObjectAdapterFactoryPtr objectAdapterFactory();
- ThreadPoolPtr threadPool();
+ ThreadPoolPtr clientThreadPool();
+ ThreadPoolPtr serverThreadPool();
std::string defaultProtocol();
std::string defaultHost();
::IceSSL::SystemInternalPtr getSslSystem();
@@ -79,7 +81,8 @@ private:
ObjectFactoryManagerPtr _servantFactoryManager;
UserExceptionFactoryManagerPtr _userExceptionFactoryManager;
ObjectAdapterFactoryPtr _objectAdapterFactory;
- ThreadPoolPtr _threadPool;
+ ThreadPoolPtr _clientThreadPool;
+ ThreadPoolPtr _serverThreadPool;
std::string _defaultProtocol; // Immutable, not reset by destroy().
std::string _defaultHost; // Immutable, not reset by destroy().
::IceSSL::SystemInternalPtr _sslSystem;
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index fcee3afb792..07dedbead26 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -30,14 +30,7 @@ void
IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if (handler->server())
- {
- ++_servers;
- }
- else
- {
- ++_clients;
- }
+ ++_handlers;
_changes.push_back(make_pair(fd, handler));
setInterrupt(0);
}
@@ -51,70 +44,30 @@ IceInternal::ThreadPool::unregister(SOCKET fd)
}
void
-IceInternal::ThreadPool::serverIsNowClient()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- ++_clients;
- assert(_servers > 0);
- --_servers;
- if (_servers == 0)
- {
- notifyAll(); // For waitUntil...Finished() methods.
- }
-}
-
-void
-IceInternal::ThreadPool::clientIsNowServer()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- ++_servers;
- assert(_clients > 0);
- --_clients;
-}
-
-void
IceInternal::ThreadPool::promoteFollower()
{
_threadMutex.unlock();
}
void
-IceInternal::ThreadPool::initiateServerShutdown()
+IceInternal::ThreadPool::initiateShutdown()
{
setInterrupt(1);
}
void
-IceInternal::ThreadPool::waitUntilServerFinished()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- while (_servers != 0 && _threadNum != 0)
- {
- wait();
- }
-
- if (_servers != 0)
- {
- Error out(_logger);
- out << "can't wait for graceful server termination in thread pool\n"
- << "since all threads have vanished";
- }
-}
-
-void
IceInternal::ThreadPool::waitUntilFinished()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- while (_clients + _servers != 0 && _threadNum != 0)
+ while (_handlers != 0 && _threadNum != 0)
{
wait();
}
- if (_clients + _servers != 0)
+ if (_handlers != 0)
{
- Error out(_logger);
+ Error out(_instance->logger());
out << "can't wait for graceful application termination in thread pool\n"
<< "since all threads have vanished";
}
@@ -139,35 +92,11 @@ IceInternal::ThreadPool::joinWithAllThreads()
}
}
-void
-IceInternal::ThreadPool::setMaxConnections(int maxConnections)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if (maxConnections < _threadNum + 1 && maxConnections != 0)
- {
- _maxConnections = _threadNum + 1;
- }
- else
- {
- _maxConnections = maxConnections;
- }
-}
-
-int
-IceInternal::ThreadPool::getMaxConnections()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- return _maxConnections;
-}
-
-IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) :
+IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, bool server) :
_instance(instance),
- _logger(_instance->logger()),
- _properties(_instance->properties()),
_destroyed(false),
_lastFd(INVALID_SOCKET),
- _clients(0),
- _servers(0),
+ _handlers(0),
_timeout(0)
{
SOCKET fds[2];
@@ -181,8 +110,16 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) :
_maxFd = _fdIntrRead;
_minFd = _fdIntrRead;
- _timeout = _properties->getPropertyAsInt("Ice.ServerIdleTime");
- _threadNum = _properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Size", 10);
+ if (server)
+ {
+ _timeout = _instance->properties()->getPropertyAsInt("Ice.ServerIdleTime");
+ _threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ServerThreadPool.Size", 10);
+ }
+ else
+ {
+ _threadNum = _instance->properties()->getPropertyAsIntWithDefault("Ice.ClientThreadPool.Size", 1);
+ }
+
if (_threadNum < 1)
{
_threadNum = 1;
@@ -205,9 +142,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance) :
destroy();
throw;
}
-
- // Must be called after _threadNum is set.
- setMaxConnections(_properties->getPropertyAsInt("Ice.ThreadPool.MaxConnections"));
}
IceInternal::ThreadPool::~ThreadPool()
@@ -297,12 +231,13 @@ repeat:
void
IceInternal::ThreadPool::run()
{
+ ThreadPoolPtr self = this;
bool shutdown = false;
while (true)
{
_threadMutex.lock();
-
+
repeatSelect:
if (shutdown) // Shutdown has been initiated.
@@ -438,7 +373,7 @@ IceInternal::ThreadPool::run()
//
if (fdSet.fd_count == 0)
{
- Error out(_logger);
+ Error out(_instance->logger());
out << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
goto repeatSelect;
}
@@ -490,7 +425,7 @@ IceInternal::ThreadPool::run()
if (loops > 1)
{
- Error out(_logger);
+ Error out(_instance->logger());
out << "select() in thread pool returned " << ret << " but no filedescriptor is readable";
goto repeatSelect;
}
@@ -501,7 +436,7 @@ IceInternal::ThreadPool::run()
map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
if(p == _handlerMap.end())
{
- Error out(_logger);
+ Error out(_instance->logger());
out << "filedescriptor " << _lastFd << " not registered with the thread pool";
goto repeatSelect;
}
@@ -518,23 +453,14 @@ IceInternal::ThreadPool::run()
// Notify a handler about it's removal from the thread
// pool.
//
- handler->finished();
+ handler->finished(self); // "self" is faster than "this", as the reference count is not modified.
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if (handler->server())
- {
- assert(_servers > 0);
- --_servers;
- }
- else
- {
- assert(_clients > 0);
- --_clients;
- }
- if (_clients == 0 || _servers == 0)
+ assert(_handlers > 0);
+ if (--_handlers == 0)
{
- notifyAll(); // For waitUntil...Finished() methods.
+ notifyAll(); // For waitUntilFinished().
}
}
}
@@ -564,7 +490,7 @@ IceInternal::ThreadPool::run()
assert(stream.i == stream.b.end());
}
- handler->message(stream);
+ handler->message(stream, self); // "self" is faster than "this", as the reference count is not modified.
}
}
}
@@ -640,12 +566,12 @@ IceInternal::ThreadPool::EventHandlerThread::run()
}
catch (const Exception& ex)
{
- Error out(_pool->_logger);
+ Error out(_pool->_instance->logger());
out << "exception in thread pool:\n" << ex;
}
catch (...)
{
- Error out(_pool->_logger);
+ Error out(_pool->_instance->logger());
out << "unknown exception in thread pool";
}
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h
index 464bbb3711e..7effc029dfa 100644
--- a/cpp/src/Ice/ThreadPool.h
+++ b/cpp/src/Ice/ThreadPool.h
@@ -38,19 +38,14 @@ public:
void _register(SOCKET, const EventHandlerPtr&);
void unregister(SOCKET);
- void serverIsNowClient();
- void clientIsNowServer();
void promoteFollower();
- void initiateServerShutdown(); // Signal-safe shutdown initiation.
- void waitUntilServerFinished();
+ void initiateShutdown(); // Signal-safe shutdown initiation.
void waitUntilFinished();
void joinWithAllThreads();
- void setMaxConnections(int);
- int getMaxConnections();
private:
- ThreadPool(const InstancePtr&);
+ ThreadPool(const InstancePtr&, bool);
virtual ~ThreadPool();
void destroy();
friend class Instance;
@@ -62,8 +57,6 @@ private:
void read(const EventHandlerPtr&);
InstancePtr _instance;
- ::Ice::LoggerPtr _logger;
- ::Ice::PropertiesPtr _properties;
bool _destroyed;
SOCKET _maxFd;
SOCKET _minFd;
@@ -73,8 +66,7 @@ private:
fd_set _fdSet;
std::list<std::pair<SOCKET, EventHandlerPtr> > _changes; // Event handler set for addition; null for removal.
std::map<SOCKET, EventHandlerPtr> _handlerMap;
- int _clients;
- int _servers;
+ int _handlers;
int _timeout;
::IceUtil::Mutex _threadMutex;
@@ -93,7 +85,6 @@ private:
std::vector<IceUtil::ThreadControl> _threads; // Control for all threads, running or not.
int _threadNum; // Number of running threads.
- int _maxConnections; // Maximum number of connections. If set to zero, the number of connections is not limited.
};
}