diff options
author | Mark Spruiell <mes@zeroc.com> | 2007-01-25 16:50:20 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2007-01-25 16:50:20 +0000 |
commit | 2af1be4b75d36ed2022c304c9030ff34162d44db (patch) | |
tree | 229182241b85b3dd9b57cf56a02e774b83fcd47a /cpp | |
parent | IceGrid file cache fixes (diff) | |
download | ice-2af1be4b75d36ed2022c304c9030ff34162d44db.tar.bz2 ice-2af1be4b75d36ed2022c304c9030ff34162d44db.tar.xz ice-2af1be4b75d36ed2022c304c9030ff34162d44db.zip |
adding thread-per-connection settings for proxies & OAs
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/CHANGES | 3 | ||||
-rw-r--r-- | cpp/config/PropertyNames.def | 2 | ||||
-rw-r--r-- | cpp/include/Ice/Proxy.h | 3 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 65 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.h | 5 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 86 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 11 | ||||
-rw-r--r-- | cpp/src/Ice/Instance.cpp | 17 | ||||
-rw-r--r-- | cpp/src/Ice/Instance.h | 4 | ||||
-rw-r--r-- | cpp/src/Ice/ObjectAdapterI.cpp | 76 | ||||
-rw-r--r-- | cpp/src/Ice/ObjectAdapterI.h | 4 | ||||
-rw-r--r-- | cpp/src/Ice/PropertyNames.cpp | 4 | ||||
-rw-r--r-- | cpp/src/Ice/PropertyNames.h | 2 | ||||
-rw-r--r-- | cpp/src/Ice/Proxy.cpp | 22 | ||||
-rw-r--r-- | cpp/src/Ice/Reference.cpp | 65 | ||||
-rw-r--r-- | cpp/src/Ice/Reference.h | 7 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 6 |
17 files changed, 235 insertions, 147 deletions
diff --git a/cpp/CHANGES b/cpp/CHANGES index b96177474cb..b93cbf2ac7c 100644 --- a/cpp/CHANGES +++ b/cpp/CHANGES @@ -1,6 +1,9 @@ Changes since version 3.1.1 --------------------------- +- Proxies and object adapters can now be configured to use + thread-per-connection. + - IceBox services no longer inherit the properties of the container by default. If this is the desired behavior set the property IceBox.InheritContainerProperties to 1. diff --git a/cpp/config/PropertyNames.def b/cpp/config/PropertyNames.def index 86d880c861c..a18f94b95f4 100644 --- a/cpp/config/PropertyNames.def +++ b/cpp/config/PropertyNames.def @@ -157,6 +157,8 @@ Ice: OA.<any>.RegisterProcess OA.<any>.ReplicaGroupId OA.<any>.Router + OA.<any>.ThreadPerConnection + OA.<any>.ThreadPerConnection.StackSize OA.<any>.ThreadPool.Size OA.<any>.ThreadPool.SizeMax OA.<any>.ThreadPool.SizeWarn diff --git a/cpp/include/Ice/Proxy.h b/cpp/include/Ice/Proxy.h index e488e5d722b..3c7d1b3912a 100644 --- a/cpp/include/Ice/Proxy.h +++ b/cpp/include/Ice/Proxy.h @@ -231,6 +231,9 @@ public: ::Ice::ObjectPrx ice_timeout(int) const; ::Ice::ObjectPrx ice_connectionId(const ::std::string&) const; + bool ice_isThreadPerConnection() const; + ::Ice::ObjectPrx ice_threadPerConnection(bool) const; + ICE_DEPRECATED_API ::Ice::ConnectionPtr ice_connection(); ::Ice::ConnectionPtr ice_getConnection(); ::Ice::ConnectionPtr ice_getCachedConnection() const; diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index fd1a188387c..b82c780e1a2 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -93,7 +93,8 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() } ConnectionIPtr -IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool moreEndpts, bool& compress) +IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool moreEndpts, + bool threadPerConnection, bool& compress) { assert(!endpts.empty()); vector<EndpointIPtr> endpoints = endpts; @@ -158,9 +159,11 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt { // // Don't return connections for which destruction has - // been initiated. + // been initiated. The connection must also match the + // requested thread-per-connection setting. // - if(!pr.first->second->isDestroyed()) + if(!pr.first->second->isDestroyed() && + pr.first->second->threadPerConnection() == threadPerConnection) { if(_instance->defaultsAndOverrides()->overrideCompress) { @@ -222,11 +225,13 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt while(pr.first != pr.second) { - // - // Don't return connections for which destruction has - // been initiated. - // - if(!pr.first->second->isDestroyed()) + // + // Don't return connections for which destruction has + // been initiated. The connection must also match the + // requested thread-per-connection setting. + // + if(!pr.first->second->isDestroyed() && + pr.first->second->threadPerConnection() == threadPerConnection) { if(_instance->defaultsAndOverrides()->overrideCompress) { @@ -287,7 +292,8 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt transceiver = connector->connect(timeout); assert(transceiver); } - connection = new ConnectionI(_instance, transceiver, endpoint, 0); + connection = new ConnectionI(_instance, transceiver, endpoint, 0, threadPerConnection, + _instance->threadPerConnectionStackSize()); connection->validate(); if(_instance->defaultsAndOverrides()->overrideCompress) @@ -655,21 +661,21 @@ IceInternal::IncomingConnectionFactory::flushBatchRequests() bool IceInternal::IncomingConnectionFactory::datagram() const { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. return _endpoint->datagram(); } bool IceInternal::IncomingConnectionFactory::readable() const { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. return false; } void IceInternal::IncomingConnectionFactory::read(BasicStream&) { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. assert(false); // Must not be called. } @@ -695,7 +701,7 @@ private: void IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPtr& threadPool) { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. ConnectionIPtr connection; @@ -758,7 +764,8 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt try { - connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter); + connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection, + _threadPerConnectionStackSize); } catch(const LocalException&) { @@ -792,7 +799,7 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt void IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool) { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -854,6 +861,10 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance _endpoint->compress(_instance->defaultsAndOverrides()->overrideCompressValue); } + ObjectAdapterI* adapterImpl = dynamic_cast<ObjectAdapterI*>(_adapter.get()); + _threadPerConnection = adapterImpl->getThreadPerConnection(); + _threadPerConnectionStackSize = adapterImpl->getThreadPerConnectionStackSize(); + const_cast<TransceiverPtr&>(_transceiver) = _endpoint->serverTransceiver(const_cast<EndpointIPtr&>(_endpoint)); if(_transceiver) { @@ -861,7 +872,8 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance try { - connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter); + connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter, _threadPerConnection, + _threadPerConnectionStackSize); connection->validate(); } catch(const LocalException&) @@ -889,7 +901,7 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance __setNoDelete(true); try { - if(_instance->threadPerConnection()) + if(_threadPerConnection) { // // If we are in thread per connection mode, we also use @@ -897,16 +909,16 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance // accepts new connections on this endpoint. // _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this); - _threadPerIncomingConnectionFactory->start(_instance->threadPerConnectionStackSize()); + _threadPerIncomingConnectionFactory->start(_threadPerConnectionStackSize); } else { - dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->incFdsInUse(); + adapterImpl->getThreadPool()->incFdsInUse(); } } catch(const IceUtil::Exception& ex) { - if(_instance->threadPerConnection()) + if(_threadPerConnection) { Error out(_instance->initializationData().logger); out << "cannot create thread for incoming connection factory:\n" << ex; @@ -952,7 +964,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) { return; } - if(!_instance->threadPerConnection() && _acceptor) + if(!_threadPerConnection && _acceptor) { registerWithPool(); } @@ -966,7 +978,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) { return; } - if(!_instance->threadPerConnection() && _acceptor) + if(!_threadPerConnection && _acceptor) { unregisterWithPool(); } @@ -978,7 +990,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) { if(_acceptor) { - if(_instance->threadPerConnection()) + if(_threadPerConnection) { // // If we are in thread per connection mode, we connect @@ -1018,7 +1030,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) void IceInternal::IncomingConnectionFactory::registerWithPool() { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. assert(_acceptor); // Not for datagram connections. if(!_registeredWithPool) @@ -1031,7 +1043,7 @@ IceInternal::IncomingConnectionFactory::registerWithPool() void IceInternal::IncomingConnectionFactory::unregisterWithPool() { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. assert(_acceptor); // Not for datagram connections. if(_registeredWithPool) @@ -1132,7 +1144,8 @@ IceInternal::IncomingConnectionFactory::run() { try { - connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter); + connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection, + _threadPerConnectionStackSize); } catch(const LocalException&) { diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index 830175f28fd..d7e58df05f5 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -44,7 +44,7 @@ public: void waitUntilFinished(); - Ice::ConnectionIPtr create(const std::vector<EndpointIPtr>&, bool, bool&); + Ice::ConnectionIPtr create(const std::vector<EndpointIPtr>&, bool, bool, bool&); void setRouterInfo(const RouterInfoPtr&); void removeAdapter(const Ice::ObjectAdapterPtr&); void flushBatchRequests(); @@ -136,6 +136,9 @@ private: std::list<Ice::ConnectionIPtr> _connections; State _state; + + bool _threadPerConnection; + size_t _threadPerConnectionStackSize; }; } diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 13d107d238e..84156e8662a 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -44,7 +44,7 @@ Ice::ConnectionI::validate() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_threadPerConnection && _threadPerConnection->getThreadControl() != IceUtil::ThreadControl()) + if(_thread && _thread->getThreadControl() != IceUtil::ThreadControl()) { // // In thread per connection mode, this connection's thread @@ -318,7 +318,6 @@ bool Ice::ConnectionI::isFinished() const { IceUtil::ThreadPtr threadPerConnection; - IceUtil::ThreadPtr secondThreadPerConnection; { // @@ -338,23 +337,15 @@ Ice::ConnectionI::isFinished() const return false; } - if(_threadPerConnection && _threadPerConnection->isAlive()) - { - return false; - } - - if(_secondThreadPerConnection && _secondThreadPerConnection->isAlive()) + if(_thread && _thread->isAlive()) { return false; } assert(_state == StateClosed); - threadPerConnection = _threadPerConnection; - _threadPerConnection = 0; - - secondThreadPerConnection = _secondThreadPerConnection; - _secondThreadPerConnection = 0; + threadPerConnection = _thread; + _thread = 0; } if(threadPerConnection) @@ -362,11 +353,6 @@ Ice::ConnectionI::isFinished() const threadPerConnection->getThreadControl().join(); } - if(secondThreadPerConnection) - { - secondThreadPerConnection->getThreadControl().join(); - } - return true; } @@ -397,7 +383,6 @@ void Ice::ConnectionI::waitUntilFinished() { IceUtil::ThreadPtr threadPerConnection; - IceUtil::ThreadPtr secondThreadPerConnection; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -458,11 +443,8 @@ Ice::ConnectionI::waitUntilFinished() assert(_state == StateClosed); - threadPerConnection = _threadPerConnection; - _threadPerConnection = 0; - - secondThreadPerConnection = _secondThreadPerConnection; - _secondThreadPerConnection = 0; + threadPerConnection = _thread; + _thread = 0; // // Clear the OA. See @@ -476,11 +458,6 @@ Ice::ConnectionI::waitUntilFinished() { threadPerConnection->getThreadControl().join(); } - - if(secondThreadPerConnection) - { - secondThreadPerConnection->getThreadControl().join(); - } } void @@ -1306,6 +1283,12 @@ Ice::ConnectionI::endpoint() const return _endpoint; // No mutex protection necessary, _endpoint is immutable. } +bool +Ice::ConnectionI::threadPerConnection() const +{ + return _threadPerConnection; // No mutex protection necessary, _threadPerConnection is immutable. +} + void Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter) { @@ -1572,8 +1555,11 @@ Ice::ConnectionI::getTransceiver() const Ice::ConnectionI::ConnectionI(const InstancePtr& instance, const TransceiverPtr& transceiver, const EndpointIPtr& endpoint, - const ObjectAdapterPtr& adapter) : + const ObjectAdapterPtr& adapter, + bool threadPerConnection, + size_t threadPerConnectionStackSize) : EventHandler(instance), + _threadPerConnection(threadPerConnection), _transceiver(transceiver), _desc(transceiver->toString()), _type(transceiver->type()), @@ -1638,7 +1624,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, __setNoDelete(true); try { - if(_instance->threadPerConnection() == 0) + if(!threadPerConnection) { // // Only set _threadPool if we really need it, i.e., if we are @@ -1662,25 +1648,15 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, // If we are in thread per connection mode, create the // thread for this connection. // - _threadPerConnection = new ThreadPerConnection(this, false); - _threadPerConnection->start(_instance->threadPerConnectionStackSize()); - - if(_instance->threadPerConnection() == 2) - { - // - // If we are in two threads per connection mode, - // create the second thread for this connection. - // - _secondThreadPerConnection = new ThreadPerConnection(this, true); - _secondThreadPerConnection->start(_instance->threadPerConnectionStackSize()); - } + _thread = new ThreadPerConnection(this); + _thread->start(threadPerConnectionStackSize); } } catch(const IceUtil::Exception& ex) { { Error out(_logger); - if(_instance->threadPerConnection() > 0) + if(threadPerConnection) { out << "cannot create thread for connection:\n" << ex; } @@ -1708,8 +1684,7 @@ Ice::ConnectionI::~ConnectionI() assert(_state == StateClosed); assert(!_transceiver); assert(_dispatchCount == 0); - assert(!_threadPerConnection); - assert(!_secondThreadPerConnection); + assert(!_thread); } void @@ -2695,14 +2670,8 @@ Ice::ConnectionI::run() } } -void -Ice::ConnectionI::runSecond() -{ -} - -Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection, bool second) : - _connection(connection), - _second(second) +Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection) : + _connection(connection) { } @@ -2716,14 +2685,7 @@ Ice::ConnectionI::ThreadPerConnection::run() try { - if(_second) - { - _connection->runSecond(); - } - else - { - _connection->run(); - } + _connection->run(); } catch(const Exception& ex) { diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index bf8f09ce2df..626f1c776be 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -77,6 +77,7 @@ public: void sendNoResponse(); IceInternal::EndpointIPtr endpoint() const; + bool threadPerConnection() const; virtual void setAdapter(const ObjectAdapterPtr&); // From Connection. virtual ObjectAdapterPtr getAdapter() const; // From Connection. @@ -102,7 +103,7 @@ public: private: ConnectionI(const IceInternal::InstancePtr&, const IceInternal::TransceiverPtr&, - const IceInternal::EndpointIPtr&, const ObjectAdapterPtr&); + const IceInternal::EndpointIPtr&, const ObjectAdapterPtr&, bool, size_t); virtual ~ConnectionI(); friend class IceInternal::IncomingConnectionFactory; friend class IceInternal::OutgoingConnectionFactory; @@ -136,24 +137,22 @@ private: const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&); void run(); // For thread per connection. - void runSecond(); // For second thread per connection. class ThreadPerConnection : public IceUtil::Thread { public: - ThreadPerConnection(const ConnectionIPtr&, bool); + ThreadPerConnection(const ConnectionIPtr&); virtual void run(); private: ConnectionIPtr _connection; - const bool _second; }; friend class ThreadPerConnection; // Defined as mutable because "isFinished() const" sets this to 0. - mutable IceUtil::ThreadPtr _threadPerConnection; - mutable IceUtil::ThreadPtr _secondThreadPerConnection; + mutable IceUtil::ThreadPtr _thread; + const bool _threadPerConnection; IceInternal::TransceiverPtr _transceiver; const std::string _desc; diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index b6a13245d53..5548d792c9b 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -232,7 +232,7 @@ IceInternal::Instance::serverThreadPool() return _serverThreadPool; } -int +bool IceInternal::Instance::threadPerConnection() const { // No mutex lock, immutable. @@ -457,7 +457,7 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi _messageSizeMax(0), _clientACM(0), _serverACM(0), - _threadPerConnection(0), + _threadPerConnection(false), _threadPerConnectionStackSize(0), _defaultContext(new SharedContext), _implicitContext(0) @@ -624,18 +624,7 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi const_cast<Int&>(_clientACM) = _initData.properties->getPropertyAsIntWithDefault("Ice.ACM.Client", 60); const_cast<Int&>(_serverACM) = _initData.properties->getPropertyAsInt("Ice.ACM.Server"); - { - Int threadPerConnection = _initData.properties->getPropertyAsInt("Ice.ThreadPerConnection"); - if(threadPerConnection < 0) - { - threadPerConnection = 0; - } - if(threadPerConnection > 2) - { - threadPerConnection = 2; - } - const_cast<Int&>(_threadPerConnection) = threadPerConnection; - } + const_cast<bool&>(_threadPerConnection) = _initData.properties->getPropertyAsInt("Ice.ThreadPerConnection") > 0; { Int stackSize = _initData.properties->getPropertyAsInt("Ice.ThreadPerConnection.StackSize"); diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h index c876ace11bb..3a4245ff2b7 100644 --- a/cpp/src/Ice/Instance.h +++ b/cpp/src/Ice/Instance.h @@ -64,7 +64,7 @@ public: ObjectAdapterFactoryPtr objectAdapterFactory() const; ThreadPoolPtr clientThreadPool(); ThreadPoolPtr serverThreadPool(); - Ice::Int threadPerConnection() const; + bool threadPerConnection() const; size_t threadPerConnectionStackSize() const; EndpointFactoryManagerPtr endpointFactoryManager() const; DynamicLibraryListPtr dynamicLibraryList() const; @@ -115,7 +115,7 @@ private: ObjectAdapterFactoryPtr _objectAdapterFactory; ThreadPoolPtr _clientThreadPool; ThreadPoolPtr _serverThreadPool; - const Ice::Int _threadPerConnection; + const bool _threadPerConnection; const size_t _threadPerConnectionStackSize; EndpointFactoryManagerPtr _endpointFactoryManager; DynamicLibraryListPtr _dynamicLibraryList; diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp index 9338f35bd70..1ee53d618e5 100644 --- a/cpp/src/Ice/ObjectAdapterI.cpp +++ b/cpp/src/Ice/ObjectAdapterI.cpp @@ -645,7 +645,7 @@ Ice::ObjectAdapterI::isLocal(const ObjectPrx& proxy) const } } } - + return false; } @@ -717,6 +717,24 @@ Ice::ObjectAdapterI::getServantManager() const return _servantManager; } +bool +Ice::ObjectAdapterI::getThreadPerConnection() const +{ + // + // No mutex lock necessary, _threadPerConnection is immutable. + // + return _threadPerConnection; +} + +size_t +Ice::ObjectAdapterI::getThreadPerConnectionStackSize() const +{ + // + // No mutex lock necessary, _threadPerConnectionStackSize is immutable. + // + return _threadPerConnectionStackSize; +} + Ice::ObjectAdapterI::ObjectAdapterI(const InstancePtr& instance, const CommunicatorPtr& communicator, const ObjectAdapterFactoryPtr& objectAdapterFactory, const string& name, const string& endpointInfo, const RouterPrx& router, bool noConfig) : @@ -777,30 +795,52 @@ Ice::ObjectAdapterI::ObjectAdapterI(const InstancePtr& instance, const Communica __setNoDelete(true); try { - // First create the per-adapter thread pool, if - // necessary. This is done before the creation of the incoming - // connection factory as the thread pool is needed during - // creation for the call to incFdsInUse. - if(!_instance->threadPerConnection()) + _threadPerConnection = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPerConnection") > 0; + + int threadPoolSize = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.Size"); + if(threadPoolSize == 0) + { + threadPoolSize = properties->getPropertyAsInt(_name + ".ThreadPool.Size"); + } + int threadPoolSizeMax = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.SizeMax"); + if(threadPoolSizeMax == 0) + { + threadPoolSizeMax = properties->getPropertyAsInt(_name + ".ThreadPool.SizeMax"); + } + + if(_threadPerConnection && (threadPoolSize > 0 || threadPoolSizeMax > 0)) + { + InitializationException ex(__FILE__, __LINE__); + ex.reason = "adapter cannot be configured for both thread pool and thread per connection"; + throw ex; + } + + if(!_threadPerConnection && threadPoolSize == 0 && threadPoolSizeMax == 0) + { + _threadPerConnection = _instance->threadPerConnection(); + } + + if(_threadPerConnection) + { + _threadPerConnectionStackSize = + properties->getPropertyAsIntWithDefault(_propertyPrefix + _name + ".ThreadPerConnection.StackSize", + _instance->threadPerConnectionStackSize()); + } + + // + // Create the per-adapter thread pool, if necessary. This is done before the creation of the incoming + // connection factory as the thread pool is needed during creation for the call to incFdsInUse. + // + if(threadPoolSize > 0 || threadPoolSizeMax > 0) { if(!properties->getProperty(_propertyPrefix + _name + ".ThreadPool.Size").empty() || !properties->getProperty(_propertyPrefix + _name + ".ThreadPool.SizeMax").empty()) { - int size = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.Size"); - int sizeMax = properties->getPropertyAsInt(_propertyPrefix + _name + ".ThreadPool.SizeMax"); - if(size > 0 || sizeMax > 0) - { - _threadPool = new ThreadPool(_instance, _propertyPrefix + _name + ".ThreadPool", 0); - } + _threadPool = new ThreadPool(_instance, _propertyPrefix + _name + ".ThreadPool", 0); } else { - int size = properties->getPropertyAsInt(_name + ".ThreadPool.Size"); - int sizeMax = properties->getPropertyAsInt(_name + ".ThreadPool.SizeMax"); - if(size > 0 || sizeMax > 0) - { - _threadPool = new ThreadPool(_instance, _name + ".ThreadPool", 0); - } + _threadPool = new ThreadPool(_instance, _name + ".ThreadPool", 0); } } diff --git a/cpp/src/Ice/ObjectAdapterI.h b/cpp/src/Ice/ObjectAdapterI.h index 04d686ac4c0..e9b6fe0cb6f 100644 --- a/cpp/src/Ice/ObjectAdapterI.h +++ b/cpp/src/Ice/ObjectAdapterI.h @@ -83,6 +83,8 @@ public: IceInternal::ThreadPoolPtr getThreadPool() const; IceInternal::ServantManagerPtr getServantManager() const; + bool getThreadPerConnection() const; + size_t getThreadPerConnectionStackSize() const; private: @@ -121,6 +123,8 @@ private: bool _destroying; bool _destroyed; bool _noConfig; + bool _threadPerConnection; + size_t _threadPerConnectionStackSize; static std::string _propertyPrefix; diff --git a/cpp/src/Ice/PropertyNames.cpp b/cpp/src/Ice/PropertyNames.cpp index 1ee73935685..b4a8a7904f9 100644 --- a/cpp/src/Ice/PropertyNames.cpp +++ b/cpp/src/Ice/PropertyNames.cpp @@ -7,7 +7,7 @@ // // ********************************************************************** -// Generated by makeprops.py from file `../../config/PropertyNames.def', Thu Jan 25 13:02:24 2007 +// Generated by makeprops.py from file `../config/PropertyNames.def', Thu Jan 25 08:38:23 2007 // IMPORTANT: Do not edit this file -- any edits made here will be lost! @@ -59,6 +59,8 @@ const char* IceInternal::PropertyNames::IceProps[] = "Ice.OA.*.RegisterProcess", "Ice.OA.*.ReplicaGroupId", "Ice.OA.*.Router", + "Ice.OA.*.ThreadPerConnection", + "Ice.OA.*.ThreadPerConnection.StackSize", "Ice.OA.*.ThreadPool.Size", "Ice.OA.*.ThreadPool.SizeMax", "Ice.OA.*.ThreadPool.SizeWarn", diff --git a/cpp/src/Ice/PropertyNames.h b/cpp/src/Ice/PropertyNames.h index a864075c5b5..bd9b42c1da9 100644 --- a/cpp/src/Ice/PropertyNames.h +++ b/cpp/src/Ice/PropertyNames.h @@ -7,7 +7,7 @@ // // ********************************************************************** -// Generated by makeprops.py from file `../../config/PropertyNames.def', Thu Jan 25 13:02:24 2007 +// Generated by makeprops.py from file `../config/PropertyNames.def', Thu Jan 25 08:38:23 2007 // IMPORTANT: Do not edit this file -- any edits made here will be lost! diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index 4f983eb98be..eb20297fb6a 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -800,6 +800,28 @@ IceProxy::Ice::Object::ice_connectionId(const string& id) const } } +bool +IceProxy::Ice::Object::ice_isThreadPerConnection() const +{ + return _reference->getThreadPerConnection(); +} + +ObjectPrx +IceProxy::Ice::Object::ice_threadPerConnection(bool b) const +{ + ReferencePtr ref = _reference->changeThreadPerConnection(b); + if(ref == _reference) + { + return ObjectPrx(const_cast< ::IceProxy::Ice::Object*>(this)); + } + else + { + ObjectPrx proxy(new ::IceProxy::Ice::Object()); + proxy->setup(ref); + return proxy; + } +} + ConnectionPtr IceProxy::Ice::Object::ice_connection() { diff --git a/cpp/src/Ice/Reference.cpp b/cpp/src/Ice/Reference.cpp index 9846f91e86b..2bd51e2da2d 100644 --- a/cpp/src/Ice/Reference.cpp +++ b/cpp/src/Ice/Reference.cpp @@ -492,6 +492,12 @@ IceInternal::FixedReference::getEndpointSelection() const return Random; } +bool +IceInternal::FixedReference::getThreadPerConnection() const +{ + return false; +} + ReferencePtr IceInternal::FixedReference::changeSecure(bool) const { @@ -586,6 +592,13 @@ IceInternal::FixedReference::changeEndpointSelection(EndpointSelectionType) cons return 0; // Keep the compiler happy. } +ReferencePtr +IceInternal::FixedReference::changeThreadPerConnection(bool) const +{ + throw FixedProxyException(__FILE__, __LINE__); + return 0; // Keep the compiler happy. +} + void IceInternal::FixedReference::streamWrite(BasicStream* s) const { @@ -783,6 +796,12 @@ IceInternal::RoutableReference::getEndpointSelection() const return _endpointSelection; } +bool +IceInternal::RoutableReference::getThreadPerConnection() const +{ + return _threadPerConnection; +} + ReferencePtr IceInternal::RoutableReference::changeSecure(bool newSecure) const { @@ -894,6 +913,18 @@ IceInternal::RoutableReference::changeEndpointSelection(EndpointSelectionType ne return r; } +ReferencePtr +IceInternal::RoutableReference::changeThreadPerConnection(bool newValue) const +{ + if(newValue == _threadPerConnection) + { + return RoutableReferencePtr(const_cast<RoutableReference*>(this)); + } + RoutableReferencePtr r = RoutableReferencePtr::dynamicCast(getInstance()->referenceFactory()->copy(this)); + r->_threadPerConnection = newValue; + return r; +} + int IceInternal::RoutableReference::hash() const { @@ -944,6 +975,10 @@ IceInternal::RoutableReference::operator==(const Reference& r) const { return false; } + if(_threadPerConnection != rhs->_threadPerConnection) + { + return false; + } return _routerInfo == rhs->_routerInfo; } @@ -1054,6 +1089,14 @@ IceInternal::RoutableReference::operator<(const Reference& r) const return false; } } + if(!_threadPerConnection && rhs->_threadPerConnection) + { + return true; + } + else if(rhs->_threadPerConnection < _threadPerConnection) + { + return false; + } return _routerInfo < rhs->_routerInfo; } return false; @@ -1073,7 +1116,8 @@ IceInternal::RoutableReference::RoutableReference(const InstancePtr& inst, const _overrideCompress(false), _compress(false), _overrideTimeout(false), - _timeout(-1) + _timeout(-1), + _threadPerConnection(inst->threadPerConnection()) { } @@ -1089,7 +1133,8 @@ IceInternal::RoutableReference::RoutableReference(const RoutableReference& r) : _overrideCompress(r._overrideCompress), _compress(r._compress), _overrideTimeout(r._overrideTimeout), - _timeout(r._timeout) + _timeout(r._timeout), + _threadPerConnection(r._threadPerConnection) { } @@ -1205,7 +1250,7 @@ IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& all // Get an existing connection or create one if there's no // existing connection to one of the given endpoints. // - return factory->create(endpoints, false, comp); + return factory->create(endpoints, false, _threadPerConnection, comp); } else { @@ -1226,7 +1271,7 @@ IceInternal::RoutableReference::createConnection(const vector<EndpointIPtr>& all try { endpoint.back() = *p; - return factory->create(endpoint, p + 1 == endpoints.end(), comp); + return factory->create(endpoint, p + 1 == endpoints.end(), _threadPerConnection, comp); } catch(const LocalException& ex) { @@ -1348,6 +1393,12 @@ IceInternal::DirectReference::changeConnectionId(const string& newConnectionId) } ReferencePtr +IceInternal::DirectReference::changeLocatorCacheTimeout(int) const +{ + return DirectReferencePtr(const_cast<DirectReference*>(this)); +} + +ReferencePtr IceInternal::DirectReference::changeAdapterId(const string& newAdapterId) const { if(!newAdapterId.empty()) @@ -1378,12 +1429,6 @@ IceInternal::DirectReference::changeEndpoints(const vector<EndpointIPtr>& newEnd return r; } -ReferencePtr -IceInternal::DirectReference::changeLocatorCacheTimeout(int) const -{ - return DirectReferencePtr(const_cast<DirectReference*>(this)); -} - void IceInternal::DirectReference::streamWrite(BasicStream* s) const { diff --git a/cpp/src/Ice/Reference.h b/cpp/src/Ice/Reference.h index ad2d32a551b..ec0e14c2736 100644 --- a/cpp/src/Ice/Reference.h +++ b/cpp/src/Ice/Reference.h @@ -72,6 +72,7 @@ public: virtual int getLocatorCacheTimeout() const = 0; virtual bool getCacheConnection() const = 0; virtual Ice::EndpointSelectionType getEndpointSelection() const = 0; + virtual bool getThreadPerConnection() const = 0; // // The change* methods (here and in derived classes) create @@ -96,6 +97,7 @@ public: virtual ReferencePtr changeLocatorCacheTimeout(int) const = 0; virtual ReferencePtr changeCacheConnection(bool) const = 0; virtual ReferencePtr changeEndpointSelection(Ice::EndpointSelectionType) const = 0; + virtual ReferencePtr changeThreadPerConnection(bool) const = 0; virtual int hash() const; // Conceptually const. @@ -159,6 +161,7 @@ public: virtual int getLocatorCacheTimeout() const; virtual bool getCacheConnection() const; virtual Ice::EndpointSelectionType getEndpointSelection() const; + virtual bool getThreadPerConnection() const; virtual ReferencePtr changeSecure(bool) const; virtual ReferencePtr changePreferSecure(bool) const; @@ -173,6 +176,7 @@ public: virtual ReferencePtr changeEndpoints(const std::vector<EndpointIPtr>&) const; virtual ReferencePtr changeCacheConnection(bool) const; virtual ReferencePtr changeEndpointSelection(Ice::EndpointSelectionType) const; + virtual ReferencePtr changeThreadPerConnection(bool) const; virtual void streamWrite(BasicStream*) const; virtual std::string toString() const; @@ -208,6 +212,7 @@ public: virtual bool getCollocationOptimization() const; virtual bool getCacheConnection() const; virtual Ice::EndpointSelectionType getEndpointSelection() const; + virtual bool getThreadPerConnection() const; virtual ReferencePtr changeSecure(bool) const; virtual ReferencePtr changePreferSecure(bool) const; @@ -218,6 +223,7 @@ public: virtual ReferencePtr changeConnectionId(const std::string&) const; virtual ReferencePtr changeCacheConnection(bool) const; virtual ReferencePtr changeEndpointSelection(Ice::EndpointSelectionType) const; + virtual ReferencePtr changeThreadPerConnection(bool) const; virtual Ice::ConnectionIPtr getConnection(bool&) const = 0; @@ -252,6 +258,7 @@ private: bool _compress; // Only used if _overrideCompress == true bool _overrideTimeout; int _timeout; // Only used if _overrideTimeout == true + bool _threadPerConnection; }; class DirectReference : public RoutableReference diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 5240889b537..87b315b9cba 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -45,12 +45,6 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _promote(true), _warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0) { - // - // If we are in thread per connection mode, no thread pool should - // ever be created. - // - assert(!_instance->threadPerConnection()); - SOCKET fds[2]; createPipe(fds); _fdIntrRead = fds[0]; |