summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp48
-rw-r--r--cpp/src/Ice/ConnectionI.cpp16
-rw-r--r--cpp/src/Ice/ObjectAdapterI.cpp50
-rw-r--r--cpp/src/Ice/ThreadPool.cpp43
-rw-r--r--cpp/src/Ice/ThreadPool.h4
5 files changed, 115 insertions, 46 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 6b989962dd7..4f49e28403f 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -789,12 +789,15 @@ IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ // XXX: must promoteFollower be inside or outside the mutex?
threadPool->promoteFollower();
+ assert(threadPool.get() == dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool().get());
--_finishedCount;
if(_finishedCount == 0 && _state == StateClosed)
{
+ dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->decFdsInUse();
_acceptor->close();
_acceptor = 0;
notifyAll();
@@ -877,10 +880,10 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
assert(_acceptor);
_acceptor->listen();
- if(_instance->threadPerConnection())
+ __setNoDelete(true);
+ try
{
- __setNoDelete(true);
- try
+ if(_instance->threadPerConnection())
{
//
// If we are in thread per connection mode, we also use
@@ -890,27 +893,32 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
_threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this);
_threadPerIncomingConnectionFactory->start(_instance->threadPerConnectionStackSize());
}
- catch(const IceUtil::Exception& ex)
+ else
{
- {
- Error out(_instance->initializationData().logger);
- out << "cannot create thread for incoming connection factory:\n" << ex;
- }
-
- try
- {
- _acceptor->close();
- }
- catch(const LocalException&)
- {
- // Here we ignore any exceptions in close().
- }
-
- __setNoDelete(false);
- ex.ice_throw();
+ dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->incFdsInUse();
+ }
+ }
+ catch(const IceUtil::Exception& ex)
+ {
+ if(_instance->threadPerConnection())
+ {
+ Error out(_instance->initializationData().logger);
+ out << "cannot create thread for incoming connection factory:\n" << ex;
+ }
+
+ try
+ {
+ _acceptor->close();
}
+ catch(const LocalException&)
+ {
+ // Here we ignore any exceptions in close().
+ }
+
__setNoDelete(false);
+ ex.ice_throw();
}
+ __setNoDelete(false);
}
}
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 1597636cea6..9f7dea5d0c2 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -1451,9 +1451,12 @@ Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool)
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
--_finishedCount;
+ assert(threadPool.get() == _threadPool.get());
if(_finishedCount == 0 && _state == StateClosed)
{
+ _threadPool->decFdsInUse();
+
//
// We must make sure that nobody is sending when we close
// the transceiver.
@@ -1606,8 +1609,8 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
}
int& compressionLevel = const_cast<int&>(_compressionLevel);
- compressionLevel = \
- _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Compression.Level", 1);
+ compressionLevel = _instance->initializationData().properties->getPropertyAsIntWithDefault(
+ "Ice.Compression.Level", 1);
if(compressionLevel < 1)
{
compressionLevel = 1;
@@ -1642,6 +1645,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
{
const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool();
}
+ _threadPool->incFdsInUse();
}
else
{
@@ -1671,7 +1675,11 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
{
out << "cannot create thread for connection:\n" << ex;
}
- else
+ //
+ // If the _threadPool member variable is set then
+ // ThreadPool::incFdsInUse() failed.
+ //
+ else if(!_threadPool)
{
out << "cannot create thread pool for connection:\n" << ex;
}
@@ -1859,6 +1867,8 @@ Ice::ConnectionI::setState(State state)
//
assert(!_registeredWithPool);
+ _threadPool->decFdsInUse();
+
//
// We must make sure that nobody is sending when we
// close the transceiver.
diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp
index 27805c6a4e9..9338f35bd70 100644
--- a/cpp/src/Ice/ObjectAdapterI.cpp
+++ b/cpp/src/Ice/ObjectAdapterI.cpp
@@ -777,6 +777,33 @@ Ice::ObjectAdapterI::ObjectAdapterI(const InstancePtr& instance, const Communica
__setNoDelete(true);
try
{
+ // First create the per-adapter thread pool, if
+ // necessary. This is done before the creation of the incoming
+ // connection factory as the thread pool is needed during
+ // creation for the call to incFdsInUse.
+ if(!_instance->threadPerConnection())
+ {
+ if(!properties->getProperty(_propertyPrefix + _name + ".ThreadPool.Size").empty() ||
+ !properties->getProperty(_propertyPrefix + _name + ".ThreadPool.SizeMax").empty())
+ {
+ int size = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.Size");
+ int sizeMax = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.SizeMax");
+ if(size > 0 || sizeMax > 0)
+ {
+ _threadPool = new ThreadPool(_instance, _propertyPrefix + _name + ".ThreadPool", 0);
+ }
+ }
+ else
+ {
+ int size = properties->getPropertyAsInt(_name + ".ThreadPool.Size");
+ int sizeMax = properties->getPropertyAsInt(_name + ".ThreadPool.SizeMax");
+ if(size > 0 || sizeMax > 0)
+ {
+ _threadPool = new ThreadPool(_instance, _name + ".ThreadPool", 0);
+ }
+ }
+ }
+
if(!router)
{
const_cast<RouterPrx&>(router) = RouterPrx::uncheckedCast(
@@ -890,29 +917,6 @@ Ice::ObjectAdapterI::ObjectAdapterI(const InstancePtr& instance, const Communica
{
setLocator(_instance->referenceFactory()->getDefaultLocator());
}
-
- if(!_instance->threadPerConnection())
- {
- if(!properties->getProperty(_propertyPrefix + _name + ".ThreadPool.Size").empty() ||
- !properties->getProperty(_propertyPrefix + _name + ".ThreadPool.SizeMax").empty())
- {
- int size = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.Size");
- int sizeMax = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.SizeMax");
- if(size > 0 || sizeMax > 0)
- {
- _threadPool = new ThreadPool(_instance, _propertyPrefix + _name + ".ThreadPool", 0);
- }
- }
- else
- {
- int size = properties->getPropertyAsInt(_name + ".ThreadPool.Size");
- int sizeMax = properties->getPropertyAsInt(_name + ".ThreadPool.SizeMax");
- if(size > 0 || sizeMax > 0)
- {
- _threadPool = new ThreadPool(_instance, _name + ".ThreadPool", 0);
- }
- }
- }
}
catch(...)
{
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 5b16ee748eb..5240889b537 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -60,6 +60,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
_minFd = _fdIntrRead;
#if defined(_WIN32)
+ _fdsInUse = 1; // _fdIntrRead is always in use.
FD_ZERO(&_fdSet);
FD_SET(_fdIntrRead, &_fdSet);
#elif defined(__linux)
@@ -210,6 +211,48 @@ IceInternal::ThreadPool::destroy()
}
void
+IceInternal::ThreadPool::incFdsInUse()
+{
+ // This is windows specific since every other platform uses an API
+ // that doesn't have a specific FD limit.
+#ifdef _WIN32
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(!_destroyed);
+ if(_fdsInUse + 1 > FD_SETSIZE)
+ {
+ Warning warn(_instance->initializationData().logger);
+ warn << "maximum number of connections exceeded";
+
+ //
+ // No appropriate errno.
+ //
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
+ ++_fdsInUse;
+#endif
+}
+
+void
+IceInternal::ThreadPool::decFdsInUse()
+{
+ // This is windows specific since every other platform uses an API
+ // that doesn't have a specific FD limit.
+#ifdef _WIN32
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(!_destroyed);
+ if(_fdsInUse <= 1)
+ {
+ Trace trace(_instance->initializationData().logger, "ThreadPool");
+ trace << _prefix << ": about to assert";
+ }
+ assert(_fdsInUse > 1); // _fdIntrRead is always in use.
+ --_fdsInUse;
+#endif
+}
+
+void
IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h
index 84ee595ceb4..a0ecc923a46 100644
--- a/cpp/src/Ice/ThreadPool.h
+++ b/cpp/src/Ice/ThreadPool.h
@@ -48,6 +48,9 @@ public:
void destroy();
+ void incFdsInUse();
+ void decFdsInUse();
+
void _register(SOCKET, const EventHandlerPtr&);
void unregister(SOCKET);
void promoteFollower();
@@ -74,6 +77,7 @@ private:
SOCKET _fdIntrWrite;
#if defined(_WIN32)
fd_set _fdSet;
+ int _fdsInUse;
#elif defined(__linux)
int _epollFd;
std::vector<struct epoll_event> _events;