summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/CHANGES3
-rw-r--r--cpp/config/PropertyNames.def2
-rw-r--r--cpp/include/Ice/Proxy.h3
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp65
-rw-r--r--cpp/src/Ice/ConnectionFactory.h5
-rw-r--r--cpp/src/Ice/ConnectionI.cpp86
-rw-r--r--cpp/src/Ice/ConnectionI.h11
-rw-r--r--cpp/src/Ice/Instance.cpp17
-rw-r--r--cpp/src/Ice/Instance.h4
-rw-r--r--cpp/src/Ice/ObjectAdapterI.cpp76
-rw-r--r--cpp/src/Ice/ObjectAdapterI.h4
-rw-r--r--cpp/src/Ice/PropertyNames.cpp4
-rw-r--r--cpp/src/Ice/PropertyNames.h2
-rw-r--r--cpp/src/Ice/Proxy.cpp22
-rw-r--r--cpp/src/Ice/Reference.cpp65
-rw-r--r--cpp/src/Ice/Reference.h7
-rw-r--r--cpp/src/Ice/ThreadPool.cpp6
17 files changed, 235 insertions, 147 deletions
diff --git a/cpp/CHANGES b/cpp/CHANGES
index b96177474cb..b93cbf2ac7c 100644
--- a/cpp/CHANGES
+++ b/cpp/CHANGES
@@ -1,6 +1,9 @@
Changes since version 3.1.1
---------------------------
+- Proxies and object adapters can now be configured to use
+ thread-per-connection.
+
- IceBox services no longer inherit the properties of the container
by default. If this is the desired behavior set the property
IceBox.InheritContainerProperties to 1.
diff --git a/cpp/config/PropertyNames.def b/cpp/config/PropertyNames.def
index 86d880c861c..a18f94b95f4 100644
--- a/cpp/config/PropertyNames.def
+++ b/cpp/config/PropertyNames.def
@@ -157,6 +157,8 @@ Ice:
OA.<any>.RegisterProcess
OA.<any>.ReplicaGroupId
OA.<any>.Router
+ OA.<any>.ThreadPerConnection
+ OA.<any>.ThreadPerConnection.StackSize
OA.<any>.ThreadPool.Size
OA.<any>.ThreadPool.SizeMax
OA.<any>.ThreadPool.SizeWarn
diff --git a/cpp/include/Ice/Proxy.h b/cpp/include/Ice/Proxy.h
index e488e5d722b..3c7d1b3912a 100644
--- a/cpp/include/Ice/Proxy.h
+++ b/cpp/include/Ice/Proxy.h
@@ -231,6 +231,9 @@ public:
::Ice::ObjectPrx ice_timeout(int) const;
::Ice::ObjectPrx ice_connectionId(const ::std::string&) const;
+ bool ice_isThreadPerConnection() const;
+ ::Ice::ObjectPrx ice_threadPerConnection(bool) const;
+
ICE_DEPRECATED_API ::Ice::ConnectionPtr ice_connection();
::Ice::ConnectionPtr ice_getConnection();
::Ice::ConnectionPtr ice_getCachedConnection() const;
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index fd1a188387c..b82c780e1a2 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -93,7 +93,8 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
}
ConnectionIPtr
-IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool moreEndpts, bool& compress)
+IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool moreEndpts,
+ bool threadPerConnection, bool& compress)
{
assert(!endpts.empty());
vector<EndpointIPtr> endpoints = endpts;
@@ -158,9 +159,11 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
{
//
// Don't return connections for which destruction has
- // been initiated.
+ // been initiated. The connection must also match the
+ // requested thread-per-connection setting.
//
- if(!pr.first->second->isDestroyed())
+ if(!pr.first->second->isDestroyed() &&
+ pr.first->second->threadPerConnection() == threadPerConnection)
{
if(_instance->defaultsAndOverrides()->overrideCompress)
{
@@ -222,11 +225,13 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
while(pr.first != pr.second)
{
- //
- // Don't return connections for which destruction has
- // been initiated.
- //
- if(!pr.first->second->isDestroyed())
+ //
+ // Don't return connections for which destruction has
+ // been initiated. The connection must also match the
+ // requested thread-per-connection setting.
+ //
+ if(!pr.first->second->isDestroyed() &&
+ pr.first->second->threadPerConnection() == threadPerConnection)
{
if(_instance->defaultsAndOverrides()->overrideCompress)
{
@@ -287,7 +292,8 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
transceiver = connector->connect(timeout);
assert(transceiver);
}
- connection = new ConnectionI(_instance, transceiver, endpoint, 0);
+ connection = new ConnectionI(_instance, transceiver, endpoint, 0, threadPerConnection,
+ _instance->threadPerConnectionStackSize());
connection->validate();
if(_instance->defaultsAndOverrides()->overrideCompress)
@@ -655,21 +661,21 @@ IceInternal::IncomingConnectionFactory::flushBatchRequests()
bool
IceInternal::IncomingConnectionFactory::datagram() const
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
return _endpoint->datagram();
}
bool
IceInternal::IncomingConnectionFactory::readable() const
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
return false;
}
void
IceInternal::IncomingConnectionFactory::read(BasicStream&)
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
assert(false); // Must not be called.
}
@@ -695,7 +701,7 @@ private:
void
IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPtr& threadPool)
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
ConnectionIPtr connection;
@@ -758,7 +764,8 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
try
{
- connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter);
+ connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection,
+ _threadPerConnectionStackSize);
}
catch(const LocalException&)
{
@@ -792,7 +799,7 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
void
IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool)
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -854,6 +861,10 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
_endpoint->compress(_instance->defaultsAndOverrides()->overrideCompressValue);
}
+ ObjectAdapterI* adapterImpl = dynamic_cast<ObjectAdapterI*>(_adapter.get());
+ _threadPerConnection = adapterImpl->getThreadPerConnection();
+ _threadPerConnectionStackSize = adapterImpl->getThreadPerConnectionStackSize();
+
const_cast<TransceiverPtr&>(_transceiver) = _endpoint->serverTransceiver(const_cast<EndpointIPtr&>(_endpoint));
if(_transceiver)
{
@@ -861,7 +872,8 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
try
{
- connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter);
+ connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter, _threadPerConnection,
+ _threadPerConnectionStackSize);
connection->validate();
}
catch(const LocalException&)
@@ -889,7 +901,7 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
__setNoDelete(true);
try
{
- if(_instance->threadPerConnection())
+ if(_threadPerConnection)
{
//
// If we are in thread per connection mode, we also use
@@ -897,16 +909,16 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
// accepts new connections on this endpoint.
//
_threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this);
- _threadPerIncomingConnectionFactory->start(_instance->threadPerConnectionStackSize());
+ _threadPerIncomingConnectionFactory->start(_threadPerConnectionStackSize);
}
else
{
- dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->incFdsInUse();
+ adapterImpl->getThreadPool()->incFdsInUse();
}
}
catch(const IceUtil::Exception& ex)
{
- if(_instance->threadPerConnection())
+ if(_threadPerConnection)
{
Error out(_instance->initializationData().logger);
out << "cannot create thread for incoming connection factory:\n" << ex;
@@ -952,7 +964,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
return;
}
- if(!_instance->threadPerConnection() && _acceptor)
+ if(!_threadPerConnection && _acceptor)
{
registerWithPool();
}
@@ -966,7 +978,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
return;
}
- if(!_instance->threadPerConnection() && _acceptor)
+ if(!_threadPerConnection && _acceptor)
{
unregisterWithPool();
}
@@ -978,7 +990,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
if(_acceptor)
{
- if(_instance->threadPerConnection())
+ if(_threadPerConnection)
{
//
// If we are in thread per connection mode, we connect
@@ -1018,7 +1030,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
void
IceInternal::IncomingConnectionFactory::registerWithPool()
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
assert(_acceptor); // Not for datagram connections.
if(!_registeredWithPool)
@@ -1031,7 +1043,7 @@ IceInternal::IncomingConnectionFactory::registerWithPool()
void
IceInternal::IncomingConnectionFactory::unregisterWithPool()
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
assert(_acceptor); // Not for datagram connections.
if(_registeredWithPool)
@@ -1132,7 +1144,8 @@ IceInternal::IncomingConnectionFactory::run()
{
try
{
- connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter);
+ connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection,
+ _threadPerConnectionStackSize);
}
catch(const LocalException&)
{
diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h
index 830175f28fd..d7e58df05f5 100644
--- a/cpp/src/Ice/ConnectionFactory.h
+++ b/cpp/src/Ice/ConnectionFactory.h
@@ -44,7 +44,7 @@ public:
void waitUntilFinished();
- Ice::ConnectionIPtr create(const std::vector<EndpointIPtr>&, bool, bool&);
+ Ice::ConnectionIPtr create(const std::vector<EndpointIPtr>&, bool, bool, bool&);
void setRouterInfo(const RouterInfoPtr&);
void removeAdapter(const Ice::ObjectAdapterPtr&);
void flushBatchRequests();
@@ -136,6 +136,9 @@ private:
std::list<Ice::ConnectionIPtr> _connections;
State _state;
+
+ bool _threadPerConnection;
+ size_t _threadPerConnectionStackSize;
};
}
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 13d107d238e..84156e8662a 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -44,7 +44,7 @@ Ice::ConnectionI::validate()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_threadPerConnection && _threadPerConnection->getThreadControl() != IceUtil::ThreadControl())
+ if(_thread && _thread->getThreadControl() != IceUtil::ThreadControl())
{
//
// In thread per connection mode, this connection's thread
@@ -318,7 +318,6 @@ bool
Ice::ConnectionI::isFinished() const
{
IceUtil::ThreadPtr threadPerConnection;
- IceUtil::ThreadPtr secondThreadPerConnection;
{
//
@@ -338,23 +337,15 @@ Ice::ConnectionI::isFinished() const
return false;
}
- if(_threadPerConnection && _threadPerConnection->isAlive())
- {
- return false;
- }
-
- if(_secondThreadPerConnection && _secondThreadPerConnection->isAlive())
+ if(_thread && _thread->isAlive())
{
return false;
}
assert(_state == StateClosed);
- threadPerConnection = _threadPerConnection;
- _threadPerConnection = 0;
-
- secondThreadPerConnection = _secondThreadPerConnection;
- _secondThreadPerConnection = 0;
+ threadPerConnection = _thread;
+ _thread = 0;
}
if(threadPerConnection)
@@ -362,11 +353,6 @@ Ice::ConnectionI::isFinished() const
threadPerConnection->getThreadControl().join();
}
- if(secondThreadPerConnection)
- {
- secondThreadPerConnection->getThreadControl().join();
- }
-
return true;
}
@@ -397,7 +383,6 @@ void
Ice::ConnectionI::waitUntilFinished()
{
IceUtil::ThreadPtr threadPerConnection;
- IceUtil::ThreadPtr secondThreadPerConnection;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -458,11 +443,8 @@ Ice::ConnectionI::waitUntilFinished()
assert(_state == StateClosed);
- threadPerConnection = _threadPerConnection;
- _threadPerConnection = 0;
-
- secondThreadPerConnection = _secondThreadPerConnection;
- _secondThreadPerConnection = 0;
+ threadPerConnection = _thread;
+ _thread = 0;
//
// Clear the OA. See
@@ -476,11 +458,6 @@ Ice::ConnectionI::waitUntilFinished()
{
threadPerConnection->getThreadControl().join();
}
-
- if(secondThreadPerConnection)
- {
- secondThreadPerConnection->getThreadControl().join();
- }
}
void
@@ -1306,6 +1283,12 @@ Ice::ConnectionI::endpoint() const
return _endpoint; // No mutex protection necessary, _endpoint is immutable.
}
+bool
+Ice::ConnectionI::threadPerConnection() const
+{
+ return _threadPerConnection; // No mutex protection necessary, _threadPerConnection is immutable.
+}
+
void
Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter)
{
@@ -1572,8 +1555,11 @@ Ice::ConnectionI::getTransceiver() const
Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
const TransceiverPtr& transceiver,
const EndpointIPtr& endpoint,
- const ObjectAdapterPtr& adapter) :
+ const ObjectAdapterPtr& adapter,
+ bool threadPerConnection,
+ size_t threadPerConnectionStackSize) :
EventHandler(instance),
+ _threadPerConnection(threadPerConnection),
_transceiver(transceiver),
_desc(transceiver->toString()),
_type(transceiver->type()),
@@ -1638,7 +1624,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
__setNoDelete(true);
try
{
- if(_instance->threadPerConnection() == 0)
+ if(!threadPerConnection)
{
//
// Only set _threadPool if we really need it, i.e., if we are
@@ -1662,25 +1648,15 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
// If we are in thread per connection mode, create the
// thread for this connection.
//
- _threadPerConnection = new ThreadPerConnection(this, false);
- _threadPerConnection->start(_instance->threadPerConnectionStackSize());
-
- if(_instance->threadPerConnection() == 2)
- {
- //
- // If we are in two threads per connection mode,
- // create the second thread for this connection.
- //
- _secondThreadPerConnection = new ThreadPerConnection(this, true);
- _secondThreadPerConnection->start(_instance->threadPerConnectionStackSize());
- }
+ _thread = new ThreadPerConnection(this);
+ _thread->start(threadPerConnectionStackSize);
}
}
catch(const IceUtil::Exception& ex)
{
{
Error out(_logger);
- if(_instance->threadPerConnection() > 0)
+ if(threadPerConnection)
{
out << "cannot create thread for connection:\n" << ex;
}
@@ -1708,8 +1684,7 @@ Ice::ConnectionI::~ConnectionI()
assert(_state == StateClosed);
assert(!_transceiver);
assert(_dispatchCount == 0);
- assert(!_threadPerConnection);
- assert(!_secondThreadPerConnection);
+ assert(!_thread);
}
void
@@ -2695,14 +2670,8 @@ Ice::ConnectionI::run()
}
}
-void
-Ice::ConnectionI::runSecond()
-{
-}
-
-Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection, bool second) :
- _connection(connection),
- _second(second)
+Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection) :
+ _connection(connection)
{
}
@@ -2716,14 +2685,7 @@ Ice::ConnectionI::ThreadPerConnection::run()
try
{
- if(_second)
- {
- _connection->runSecond();
- }
- else
- {
- _connection->run();
- }
+ _connection->run();
}
catch(const Exception& ex)
{
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index bf8f09ce2df..626f1c776be 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -77,6 +77,7 @@ public:
void sendNoResponse();
IceInternal::EndpointIPtr endpoint() const;
+ bool threadPerConnection() const;
virtual void setAdapter(const ObjectAdapterPtr&); // From Connection.
virtual ObjectAdapterPtr getAdapter() const; // From Connection.
@@ -102,7 +103,7 @@ public:
private:
ConnectionI(const IceInternal::InstancePtr&, const IceInternal::TransceiverPtr&,
- const IceInternal::EndpointIPtr&, const ObjectAdapterPtr&);
+ const IceInternal::EndpointIPtr&, const ObjectAdapterPtr&, bool, size_t);
virtual ~ConnectionI();
friend class IceInternal::IncomingConnectionFactory;
friend class IceInternal::OutgoingConnectionFactory;
@@ -136,24 +137,22 @@ private:
const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&);
void run(); // For thread per connection.
- void runSecond(); // For second thread per connection.
class ThreadPerConnection : public IceUtil::Thread
{
public:
- ThreadPerConnection(const ConnectionIPtr&, bool);
+ ThreadPerConnection(const ConnectionIPtr&);
virtual void run();
private:
ConnectionIPtr _connection;
- const bool _second;
};
friend class ThreadPerConnection;
// Defined as mutable because "isFinished() const" sets this to 0.
- mutable IceUtil::ThreadPtr _threadPerConnection;
- mutable IceUtil::ThreadPtr _secondThreadPerConnection;
+ mutable IceUtil::ThreadPtr _thread;
+ const bool _threadPerConnection;
IceInternal::TransceiverPtr _transceiver;
const std::string _desc;
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp
index b6a13245d53..5548d792c9b 100644
--- a/cpp/src/Ice/Instance.cpp
+++ b/cpp/src/Ice/Instance.cpp
@@ -232,7 +232,7 @@ IceInternal::Instance::serverThreadPool()
return _serverThreadPool;
}
-int
+bool
IceInternal::Instance::threadPerConnection() const
{
// No mutex lock, immutable.
@@ -457,7 +457,7 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi
_messageSizeMax(0),
_clientACM(0),
_serverACM(0),
- _threadPerConnection(0),
+ _threadPerConnection(false),
_threadPerConnectionStackSize(0),
_defaultContext(new SharedContext),
_implicitContext(0)
@@ -624,18 +624,7 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi
const_cast<Int&>(_clientACM) = _initData.properties->getPropertyAsIntWithDefault("Ice.ACM.Client", 60);
const_cast<Int&>(_serverACM) = _initData.properties->getPropertyAsInt("Ice.ACM.Server");
- {
- Int threadPerConnection = _initData.properties->getPropertyAsInt("Ice.ThreadPerConnection");
- if(threadPerConnection < 0)
- {
- threadPerConnection = 0;
- }
- if(threadPerConnection > 2)
- {
- threadPerConnection = 2;
- }
- const_cast<Int&>(_threadPerConnection) = threadPerConnection;
- }
+ const_cast<bool&>(_threadPerConnection) = _initData.properties->getPropertyAsInt("Ice.ThreadPerConnection") > 0;
{
Int stackSize = _initData.properties->getPropertyAsInt("Ice.ThreadPerConnection.StackSize");
diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h
index c876ace11bb..3a4245ff2b7 100644
--- a/cpp/src/Ice/Instance.h
+++ b/cpp/src/Ice/Instance.h
@@ -64,7 +64,7 @@ public:
ObjectAdapterFactoryPtr objectAdapterFactory() const;
ThreadPoolPtr clientThreadPool();
ThreadPoolPtr serverThreadPool();
- Ice::Int threadPerConnection() const;
+ bool threadPerConnection() const;
size_t threadPerConnectionStackSize() const;
EndpointFactoryManagerPtr endpointFactoryManager() const;
DynamicLibraryListPtr dynamicLibraryList() const;
@@ -115,7 +115,7 @@ private:
ObjectAdapterFactoryPtr _objectAdapterFactory;
ThreadPoolPtr _clientThreadPool;
ThreadPoolPtr _serverThreadPool;
- const Ice::Int _threadPerConnection;
+ const bool _threadPerConnection;
const size_t _threadPerConnectionStackSize;
EndpointFactoryManagerPtr _endpointFactoryManager;
DynamicLibraryListPtr _dynamicLibraryList;
diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp
index 9338f35bd70..1ee53d618e5 100644
--- a/cpp/src/Ice/ObjectAdapterI.cpp
+++ b/cpp/src/Ice/ObjectAdapterI.cpp
@@ -645,7 +645,7 @@ Ice::ObjectAdapterI::isLocal(const ObjectPrx& proxy) const
}
}
}
-
+
return false;
}
@@ -717,6 +717,24 @@ Ice::ObjectAdapterI::getServantManager() const
return _servantManager;
}
+bool
+Ice::ObjectAdapterI::getThreadPerConnection() const
+{
+ //
+ // No mutex lock necessary, _threadPerConnection is immutable.
+ //
+ return _threadPerConnection;
+}
+
+size_t
+Ice::ObjectAdapterI::getThreadPerConnectionStackSize() const
+{
+ //
+ // No mutex lock necessary, _threadPerConnectionStackSize is immutable.
+ //
+ return _threadPerConnectionStackSize;
+}
+
Ice::ObjectAdapterI::ObjectAdapterI(const InstancePtr& instance, const CommunicatorPtr& communicator,
const ObjectAdapterFactoryPtr& objectAdapterFactory, const string& name,
const string& endpointInfo, const RouterPrx& router, bool noConfig) :
@@ -777,30 +795,52 @@ Ice::ObjectAdapterI::ObjectAdapterI(const InstancePtr& instance, const Communica
__setNoDelete(true);
try
{
- // First create the per-adapter thread pool, if
- // necessary. This is done before the creation of the incoming
- // connection factory as the thread pool is needed during
- // creation for the call to incFdsInUse.
- if(!_instance->threadPerConnection())
+ _threadPerConnection = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPerConnection") > 0;
+
+ int threadPoolSize = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.Size");
+ if(threadPoolSize == 0)
+ {
+ threadPoolSize = properties->getPropertyAsInt(_name + ".ThreadPool.Size");
+ }
+ int threadPoolSizeMax = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.SizeMax");
+ if(threadPoolSizeMax == 0)
+ {
+ threadPoolSizeMax = properties->getPropertyAsInt(_name + ".ThreadPool.SizeMax");
+ }
+
+ if(_threadPerConnection && (threadPoolSize > 0 || threadPoolSizeMax > 0))
+ {
+ InitializationException ex(__FILE__, __LINE__);
+ ex.reason = "adapter cannot be configured for both thread pool and thread per connection";
+ throw ex;
+ }
+
+ if(!_threadPerConnection && threadPoolSize == 0 && threadPoolSizeMax == 0)
+ {
+ _threadPerConnection = _instance->threadPerConnection();
+ }
+
+ if(_threadPerConnection)
+ {
+ _threadPerConnectionStackSize =
+ properties->getPropertyAsIntWithDefault(_propertyPrefix + _name + ".ThreadPerConnection.StackSize",
+ _instance->threadPerConnectionStackSize());
+ }
+
+ //
+ // Create the per-adapter thread pool, if necessary. This is done before the creation of the incoming
+ // connection factory as the thread pool is needed during creation for the call to incFdsInUse.
+ //
+ if(threadPoolSize > 0 || threadPoolSizeMax > 0)
{
if(!properties->getProperty(_propertyPrefix + _name + ".ThreadPool.Size").empty() ||
!properties->getProperty(_propertyPrefix + _name + ".ThreadPool.SizeMax").empty())
{
- int size = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.Size");
- int sizeMax = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.SizeMax");
- if(size > 0 || sizeMax > 0)
- {
- _threadPool = new ThreadPool(_instance, _propertyPrefix + _name + ".ThreadPool", 0);
- }
+ _threadPool = new ThreadPool(_instance, _propertyPrefix + _name + ".ThreadPool", 0);
}
else
{
- int size = properties->getPropertyAsInt(_name + ".ThreadPool.Size");
- int sizeMax = properties->getPropertyAsInt(_name + ".ThreadPool.SizeMax");
- if(size > 0 || sizeMax > 0)
- {
- _threadPool = new ThreadPool(_instance, _name + ".ThreadPool", 0);
- }
+ _threadPool = new ThreadPool(_instance, _name + ".ThreadPool", 0);
}
}
diff --git a/cpp/src/Ice/ObjectAdapterI.h b/cpp/src/Ice/ObjectAdapterI.h
index 04d686ac4c0..e9b6fe0cb6f 100644
--- a/cpp/src/Ice/ObjectAdapterI.h
+++ b/cpp/src/Ice/ObjectAdapterI.h
@@ -83,6 +83,8 @@ public:
IceInternal::ThreadPoolPtr getThreadPool() const;
IceInternal::ServantManagerPtr getServantManager() const;
+ bool getThreadPerConnection() const;
+ size_t getThreadPerConnectionStackSize() const;
private:
@@ -121,6 +123,8 @@ private:
bool _destroying;
bool _destroyed;
bool _noConfig;
+ bool _threadPerConnection;
+ size_t _threadPerConnectionStackSize;
static std::string _propertyPrefix;
diff --git a/cpp/src/Ice/PropertyNames.cpp b/cpp/src/Ice/PropertyNames.cpp
index 1ee73935685..b4a8a7904f9 100644
--- a/cpp/src/Ice/PropertyNames.cpp
+++ b/cpp/src/Ice/PropertyNames.cpp
@@ -7,7 +7,7 @@
//
// **********************************************************************
-// Generated by makeprops.py from file `../../config/PropertyNames.def', Thu Jan 25 13:02:24 2007
+// Generated by makeprops.py from file `../config/PropertyNames.def', Thu Jan 25 08:38:23 2007
// IMPORTANT: Do not edit this file -- any edits made here will be lost!
@@ -59,6 +59,8 @@ const char* IceInternal::PropertyNames::IceProps[] =
"Ice.OA.*.RegisterProcess",
"Ice.OA.*.ReplicaGroupId",
"Ice.OA.*.Router",
+ "Ice.OA.*.ThreadPerConnection",
+ "Ice.OA.*.ThreadPerConnection.StackSize",
"Ice.OA.*.ThreadPool.Size",
"Ice.OA.*.ThreadPool.SizeMax",
"Ice.OA.*.ThreadPool.SizeWarn",
diff --git a/cpp/src/Ice/PropertyNames.h b/cpp/src/Ice/PropertyNames.h
index a864075c5b5..bd9b42c1da9 100644
--- a/cpp/src/Ice/PropertyNames.h
+++ b/cpp/src/Ice/PropertyNames.h
@@ -7,7 +7,7 @@
//
// **********************************************************************
-// Generated by makeprops.py from file `../../config/PropertyNames.def', Thu Jan 25 13:02:24 2007
+// Generated by makeprops.py from file `../config/PropertyNames.def', Thu Jan 25 08:38:23 2007
// IMPORTANT: Do not edit this file -- any edits made here will be lost!
diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp
index 4f983eb98be..eb20297fb6a 100644
--- a/cpp/src/Ice/Proxy.cpp
+++ b/cpp/src/Ice/Proxy.cpp
@@ -800,6 +800,28 @@ IceProxy::Ice::Object::ice_connectionId(const string& id) const
}
}
+bool
+IceProxy::Ice::Object::ice_isThreadPerConnection() const
+{
+ return _reference->getThreadPerConnection();
+}
+
+ObjectPrx
+IceProxy::Ice::Object::ice_threadPerConnection(bool b) const
+{
+ ReferencePtr ref = _reference->changeThreadPerConnection(b);
+ if(ref == _reference)
+ {
+ return ObjectPrx(const_cast< ::IceProxy::Ice::Object*>(this));
+ }
+ else
+ {
+ ObjectPrx proxy(new ::IceProxy::Ice::Object());
+ proxy->setup(ref);
+ return proxy;
+ }
+}
+
ConnectionPtr
IceProxy::Ice::Object::ice_connection()
{
diff --git a/cpp/src/Ice/Reference.cpp b/cpp/src/Ice/Reference.cpp
index 9846f91e86b..2bd51e2da2d 100644
--- a/cpp/src/Ice/Reference.cpp
+++ b/cpp/src/Ice/Reference.cpp
@@ -492,6 +492,12 @@ IceInternal::FixedReference::getEndpointSelection() const
return Random;
}
+bool
+IceInternal::FixedReference::getThreadPerConnection() const
+{
+ return false;
+}
+
ReferencePtr
IceInternal::FixedReference::changeSecure(bool) const
{
@@ -586,6 +592,13 @@ IceInternal::FixedReference::changeEndpointSelection(EndpointSelectionType) cons
return 0; // Keep the compiler happy.
}
+ReferencePtr
+IceInternal::FixedReference::changeThreadPerConnection(bool) const
+{
+ throw FixedProxyException(__FILE__, __LINE__);
+ return 0; // Keep the compiler happy.
+}
+
void
IceInternal::FixedReference::streamWrite(BasicStream* s) const
{
@@ -783,6 +796,12 @@ IceInternal::RoutableReference::getEndpointSelection() const
return _endpointSelection;
}
+bool
+IceInternal::RoutableReference::getThreadPerConnection() const
+{
+ return _threadPerConnection;
+}
+
ReferencePtr
IceInternal::RoutableReference::changeSecure(bool newSecure) const
{
@@ -894,6 +913,18 @@ IceInternal::RoutableReference::changeEndpointSelection(EndpointSelectionType ne
return r;
}
+ReferencePtr
+IceInternal::RoutableReference::changeThreadPerConnection(bool newValue) const
+{
+ if(newValue == _threadPerConnection)
+ {
+ return RoutableReferencePtr(const_cast<RoutableReference*>(this));
+ }
+ RoutableReferencePtr r = RoutableReferencePtr::dynamicCast(getInstance()->referenceFactory()->copy(this));
+ r->_threadPerConnection = newValue;
+ return r;
+}
+
int
IceInternal::RoutableReference::hash() const
{
@@ -944,6 +975,10 @@ IceInternal::RoutableReference::operator==(const Reference& r) const
{
return false;
}
+ if(_threadPerConnection != rhs->_threadPerConnection)
+ {
+ return false;
+ }
return _routerInfo == rhs->_routerInfo;
}
@@ -1054,6 +1089,14 @@ IceInternal::RoutableReference::operator<(const Reference& r) const
return false;
}
}
+ if(!_threadPerConnection && rhs->_threadPerConnection)
+ {
+ return true;
+ }
+ else if(rhs->_threadPerConnection < _threadPerConnection)
+ {
+ return false;
+ }
return _routerInfo < rhs->_routerInfo;
}
return false;
@@ -1073,7 +1116,8 @@ IceInternal::RoutableReference::RoutableReference(const InstancePtr& inst, const
_overrideCompress(false),
_compress(false),
_overrideTimeout(false),
- _timeout(-1)
+ _timeout(-1),
+ _threadPerConnection(inst->threadPerConnection())
{
}
@@ -1089,7 +1133,8 @@ IceInternal::RoutableReference::RoutableReference(const RoutableReference& r) :
_overrideCompress(r._overrideCompress),
_compress(r._compress),
_overrideTimeout(r._overrideTimeout),
- _timeout(r._timeout)
+ _timeout(r._timeout),
+ _threadPerConnection(r._threadPerConnection)
{
}
@@ -1205,7 +1250,7 @@ IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& all
// Get an existing connection or create one if there's no
// existing connection to one of the given endpoints.
//
- return factory->create(endpoints, false, comp);
+ return factory->create(endpoints, false, _threadPerConnection, comp);
}
else
{
@@ -1226,7 +1271,7 @@ IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& all
try
{
endpoint.back() = *p;
- return factory->create(endpoint, p + 1 == endpoints.end(), comp);
+ return factory->create(endpoint, p + 1 == endpoints.end(), _threadPerConnection, comp);
}
catch(const LocalException& ex)
{
@@ -1348,6 +1393,12 @@ IceInternal::DirectReference::changeConnectionId(const string& newConnectionId)
}
ReferencePtr
+IceInternal::DirectReference::changeLocatorCacheTimeout(int) const
+{
+ return DirectReferencePtr(const_cast<DirectReference*>(this));
+}
+
+ReferencePtr
IceInternal::DirectReference::changeAdapterId(const string& newAdapterId) const
{
if(!newAdapterId.empty())
@@ -1378,12 +1429,6 @@ IceInternal::DirectReference::changeEndpoints(const vector<EndpointIPtr>& newEnd
return r;
}
-ReferencePtr
-IceInternal::DirectReference::changeLocatorCacheTimeout(int) const
-{
- return DirectReferencePtr(const_cast<DirectReference*>(this));
-}
-
void
IceInternal::DirectReference::streamWrite(BasicStream* s) const
{
diff --git a/cpp/src/Ice/Reference.h b/cpp/src/Ice/Reference.h
index ad2d32a551b..ec0e14c2736 100644
--- a/cpp/src/Ice/Reference.h
+++ b/cpp/src/Ice/Reference.h
@@ -72,6 +72,7 @@ public:
virtual int getLocatorCacheTimeout() const = 0;
virtual bool getCacheConnection() const = 0;
virtual Ice::EndpointSelectionType getEndpointSelection() const = 0;
+ virtual bool getThreadPerConnection() const = 0;
//
// The change* methods (here and in derived classes) create
@@ -96,6 +97,7 @@ public:
virtual ReferencePtr changeLocatorCacheTimeout(int) const = 0;
virtual ReferencePtr changeCacheConnection(bool) const = 0;
virtual ReferencePtr changeEndpointSelection(Ice::EndpointSelectionType) const = 0;
+ virtual ReferencePtr changeThreadPerConnection(bool) const = 0;
virtual int hash() const; // Conceptually const.
@@ -159,6 +161,7 @@ public:
virtual int getLocatorCacheTimeout() const;
virtual bool getCacheConnection() const;
virtual Ice::EndpointSelectionType getEndpointSelection() const;
+ virtual bool getThreadPerConnection() const;
virtual ReferencePtr changeSecure(bool) const;
virtual ReferencePtr changePreferSecure(bool) const;
@@ -173,6 +176,7 @@ public:
virtual ReferencePtr changeEndpoints(const std::vector<EndpointIPtr>&) const;
virtual ReferencePtr changeCacheConnection(bool) const;
virtual ReferencePtr changeEndpointSelection(Ice::EndpointSelectionType) const;
+ virtual ReferencePtr changeThreadPerConnection(bool) const;
virtual void streamWrite(BasicStream*) const;
virtual std::string toString() const;
@@ -208,6 +212,7 @@ public:
virtual bool getCollocationOptimization() const;
virtual bool getCacheConnection() const;
virtual Ice::EndpointSelectionType getEndpointSelection() const;
+ virtual bool getThreadPerConnection() const;
virtual ReferencePtr changeSecure(bool) const;
virtual ReferencePtr changePreferSecure(bool) const;
@@ -218,6 +223,7 @@ public:
virtual ReferencePtr changeConnectionId(const std::string&) const;
virtual ReferencePtr changeCacheConnection(bool) const;
virtual ReferencePtr changeEndpointSelection(Ice::EndpointSelectionType) const;
+ virtual ReferencePtr changeThreadPerConnection(bool) const;
virtual Ice::ConnectionIPtr getConnection(bool&) const = 0;
@@ -252,6 +258,7 @@ private:
bool _compress; // Only used if _overrideCompress == true
bool _overrideTimeout;
int _timeout; // Only used if _overrideTimeout == true
+ bool _threadPerConnection;
};
class DirectReference : public RoutableReference
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 5240889b537..87b315b9cba 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -45,12 +45,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
_promote(true),
_warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0)
{
- //
- // If we are in thread per connection mode, no thread pool should
- // ever be created.
- //
- assert(!_instance->threadPerConnection());
-
SOCKET fds[2];
createPipe(fds);
_fdIntrRead = fds[0];