summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionFactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp65
1 files changed, 39 insertions, 26 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index fd1a188387c..b82c780e1a2 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -93,7 +93,8 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
}
ConnectionIPtr
-IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool moreEndpts, bool& compress)
+IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool moreEndpts,
+ bool threadPerConnection, bool& compress)
{
assert(!endpts.empty());
vector<EndpointIPtr> endpoints = endpts;
@@ -158,9 +159,11 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
{
//
// Don't return connections for which destruction has
- // been initiated.
+ // been initiated. The connection must also match the
+ // requested thread-per-connection setting.
//
- if(!pr.first->second->isDestroyed())
+ if(!pr.first->second->isDestroyed() &&
+ pr.first->second->threadPerConnection() == threadPerConnection)
{
if(_instance->defaultsAndOverrides()->overrideCompress)
{
@@ -222,11 +225,13 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
while(pr.first != pr.second)
{
- //
- // Don't return connections for which destruction has
- // been initiated.
- //
- if(!pr.first->second->isDestroyed())
+ //
+ // Don't return connections for which destruction has
+ // been initiated. The connection must also match the
+ // requested thread-per-connection setting.
+ //
+ if(!pr.first->second->isDestroyed() &&
+ pr.first->second->threadPerConnection() == threadPerConnection)
{
if(_instance->defaultsAndOverrides()->overrideCompress)
{
@@ -287,7 +292,8 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
transceiver = connector->connect(timeout);
assert(transceiver);
}
- connection = new ConnectionI(_instance, transceiver, endpoint, 0);
+ connection = new ConnectionI(_instance, transceiver, endpoint, 0, threadPerConnection,
+ _instance->threadPerConnectionStackSize());
connection->validate();
if(_instance->defaultsAndOverrides()->overrideCompress)
@@ -655,21 +661,21 @@ IceInternal::IncomingConnectionFactory::flushBatchRequests()
bool
IceInternal::IncomingConnectionFactory::datagram() const
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
return _endpoint->datagram();
}
bool
IceInternal::IncomingConnectionFactory::readable() const
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
return false;
}
void
IceInternal::IncomingConnectionFactory::read(BasicStream&)
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
assert(false); // Must not be called.
}
@@ -695,7 +701,7 @@ private:
void
IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPtr& threadPool)
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
ConnectionIPtr connection;
@@ -758,7 +764,8 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
try
{
- connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter);
+ connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection,
+ _threadPerConnectionStackSize);
}
catch(const LocalException&)
{
@@ -792,7 +799,7 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
void
IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool)
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -854,6 +861,10 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
_endpoint->compress(_instance->defaultsAndOverrides()->overrideCompressValue);
}
+ ObjectAdapterI* adapterImpl = dynamic_cast<ObjectAdapterI*>(_adapter.get());
+ _threadPerConnection = adapterImpl->getThreadPerConnection();
+ _threadPerConnectionStackSize = adapterImpl->getThreadPerConnectionStackSize();
+
const_cast<TransceiverPtr&>(_transceiver) = _endpoint->serverTransceiver(const_cast<EndpointIPtr&>(_endpoint));
if(_transceiver)
{
@@ -861,7 +872,8 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
try
{
- connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter);
+ connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter, _threadPerConnection,
+ _threadPerConnectionStackSize);
connection->validate();
}
catch(const LocalException&)
@@ -889,7 +901,7 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
__setNoDelete(true);
try
{
- if(_instance->threadPerConnection())
+ if(_threadPerConnection)
{
//
// If we are in thread per connection mode, we also use
@@ -897,16 +909,16 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
// accepts new connections on this endpoint.
//
_threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this);
- _threadPerIncomingConnectionFactory->start(_instance->threadPerConnectionStackSize());
+ _threadPerIncomingConnectionFactory->start(_threadPerConnectionStackSize);
}
else
{
- dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->incFdsInUse();
+ adapterImpl->getThreadPool()->incFdsInUse();
}
}
catch(const IceUtil::Exception& ex)
{
- if(_instance->threadPerConnection())
+ if(_threadPerConnection)
{
Error out(_instance->initializationData().logger);
out << "cannot create thread for incoming connection factory:\n" << ex;
@@ -952,7 +964,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
return;
}
- if(!_instance->threadPerConnection() && _acceptor)
+ if(!_threadPerConnection && _acceptor)
{
registerWithPool();
}
@@ -966,7 +978,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
return;
}
- if(!_instance->threadPerConnection() && _acceptor)
+ if(!_threadPerConnection && _acceptor)
{
unregisterWithPool();
}
@@ -978,7 +990,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
if(_acceptor)
{
- if(_instance->threadPerConnection())
+ if(_threadPerConnection)
{
//
// If we are in thread per connection mode, we connect
@@ -1018,7 +1030,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
void
IceInternal::IncomingConnectionFactory::registerWithPool()
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
assert(_acceptor); // Not for datagram connections.
if(!_registeredWithPool)
@@ -1031,7 +1043,7 @@ IceInternal::IncomingConnectionFactory::registerWithPool()
void
IceInternal::IncomingConnectionFactory::unregisterWithPool()
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
assert(_acceptor); // Not for datagram connections.
if(_registeredWithPool)
@@ -1132,7 +1144,8 @@ IceInternal::IncomingConnectionFactory::run()
{
try
{
- connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter);
+ connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection,
+ _threadPerConnectionStackSize);
}
catch(const LocalException&)
{