diff options
author | Marc Laukien <marc@zeroc.com> | 2006-08-15 18:11:38 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2006-08-15 18:11:38 +0000 |
commit | d3f3dff08b1302845a3d4ab08fd7e7fb7ecaaa13 (patch) | |
tree | df7c79f1e34a583e555d3c8a5e9efae10374d1ed /cpp/src | |
parent | fixing ref-count bug for Communicator wrapper (diff) | |
download | ice-d3f3dff08b1302845a3d4ab08fd7e7fb7ecaaa13.tar.bz2 ice-d3f3dff08b1302845a3d4ab08fd7e7fb7ecaaa13.tar.xz ice-d3f3dff08b1302845a3d4ab08fd7e7fb7ecaaa13.zip |
started the 'two threads per connection'
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 104 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 7 | ||||
-rw-r--r-- | cpp/src/Ice/Instance.cpp | 17 | ||||
-rw-r--r-- | cpp/src/Ice/Instance.h | 4 |
4 files changed, 98 insertions, 34 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index f3a0ff704b8..c66e096b376 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -44,8 +44,7 @@ Ice::ConnectionI::validate() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_instance->threadPerConnection() && - _threadPerConnection->getThreadControl() != IceUtil::ThreadControl()) + if(_threadPerConnection && _threadPerConnection->getThreadControl() != IceUtil::ThreadControl()) { // // In thread per connection mode, this connection's thread @@ -319,6 +318,7 @@ bool Ice::ConnectionI::isFinished() const { IceUtil::ThreadPtr threadPerConnection; + IceUtil::ThreadPtr secondThreadPerConnection; { // @@ -333,8 +333,17 @@ Ice::ConnectionI::isFinished() const return false; } - if(_transceiver || _dispatchCount != 0 || - (_threadPerConnection && _threadPerConnection->isAlive())) + if(_transceiver || _dispatchCount != 0) + { + return false; + } + + if(_threadPerConnection && _threadPerConnection->isAlive()) + { + return false; + } + + if(_secondThreadPerConnection && _secondThreadPerConnection->isAlive()) { return false; } @@ -343,6 +352,9 @@ Ice::ConnectionI::isFinished() const threadPerConnection = _threadPerConnection; _threadPerConnection = 0; + + secondThreadPerConnection = _secondThreadPerConnection; + _secondThreadPerConnection = 0; } if(threadPerConnection) @@ -350,6 +362,11 @@ Ice::ConnectionI::isFinished() const threadPerConnection->getThreadControl().join(); } + if(secondThreadPerConnection) + { + secondThreadPerConnection->getThreadControl().join(); + } + return true; } @@ -380,6 +397,7 @@ void Ice::ConnectionI::waitUntilFinished() { IceUtil::ThreadPtr threadPerConnection; + IceUtil::ThreadPtr secondThreadPerConnection; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -442,12 +460,20 @@ Ice::ConnectionI::waitUntilFinished() threadPerConnection = _threadPerConnection; _threadPerConnection = 0; + + secondThreadPerConnection = _secondThreadPerConnection; + _secondThreadPerConnection = 0; } if(threadPerConnection) { threadPerConnection->getThreadControl().join(); } + + if(secondThreadPerConnection) + { + secondThreadPerConnection->getThreadControl().join(); + } } void @@ -1239,21 +1265,21 @@ Ice::ConnectionI::createProxy(const Identity& ident) const bool Ice::ConnectionI::datagram() const { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. return _endpoint->datagram(); // No mutex protection necessary, _endpoint is immutable. } bool Ice::ConnectionI::readable() const { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. return true; } void Ice::ConnectionI::read(BasicStream& stream) { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. _transceiver->read(stream, 0); @@ -1267,7 +1293,7 @@ Ice::ConnectionI::read(BasicStream& stream) void Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool) { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. Byte compress = 0; Int requestId = 0; @@ -1321,7 +1347,7 @@ Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool) void Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool) { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. threadPool->promoteFollower(); @@ -1465,8 +1491,8 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, } int& compressionLevel = const_cast<int&>(_compressionLevel); - compressionLevel = - _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Compression.Level", 1); + compressionLevel = \ + _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Compression.Level", 1); if(compressionLevel < 1) { compressionLevel = 1; @@ -1485,7 +1511,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, __setNoDelete(true); try { - if(!_instance->threadPerConnection()) + if(_instance->threadPerConnection() == 0) { // // Only set _threadPool if we really need it, i.e., if we are @@ -1505,18 +1531,28 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, else { // - // If we are in thread per connection mode, create the thread - // for this connection. + // If we are in thread per connection mode, create the + // thread for this connection. // - _threadPerConnection = new ThreadPerConnection(this); + _threadPerConnection = new ThreadPerConnection(this, false); _threadPerConnection->start(_instance->threadPerConnectionStackSize()); + + if(_instance->threadPerConnection() == 2) + { + // + // If we are in two threads per connection mode, + // create the second thread for this connection. + // + _secondThreadPerConnection = new ThreadPerConnection(this, true); + _secondThreadPerConnection->start(_instance->threadPerConnectionStackSize()); + } } } catch(const IceUtil::Exception& ex) { { Error out(_logger); - if(_instance->threadPerConnection()) + if(_instance->threadPerConnection() > 0) { out << "cannot create thread for connection:\n" << ex; } @@ -1549,6 +1585,7 @@ Ice::ConnectionI::~ConnectionI() assert(!_transceiver); assert(_dispatchCount == 0); assert(!_threadPerConnection); + assert(!_secondThreadPerConnection); } void @@ -1649,7 +1686,7 @@ Ice::ConnectionI::setState(State state) { return; } - if(!_instance->threadPerConnection()) + if(!_threadPerConnection) { registerWithPool(); } @@ -1666,7 +1703,7 @@ Ice::ConnectionI::setState(State state) { return; } - if(!_instance->threadPerConnection()) + if(!_threadPerConnection) { unregisterWithPool(); } @@ -1682,7 +1719,7 @@ Ice::ConnectionI::setState(State state) { return; } - if(!_instance->threadPerConnection()) + if(!_threadPerConnection) { registerWithPool(); // We need to continue to read in closing state. } @@ -1691,7 +1728,7 @@ Ice::ConnectionI::setState(State state) case StateClosed: { - if(_instance->threadPerConnection()) + if(_threadPerConnection) { // // If we are in thread per connection mode, we @@ -1834,7 +1871,7 @@ Ice::ConnectionI::initiateShutdown() const void Ice::ConnectionI::registerWithPool() { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. if(!_registeredWithPool) { @@ -1846,7 +1883,7 @@ Ice::ConnectionI::registerWithPool() void Ice::ConnectionI::unregisterWithPool() { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. if(_registeredWithPool) { @@ -2017,8 +2054,8 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request { // // We don't need to check magic and version here. This has - // already been done by the ThreadPool or ThreadPerConnection, - // which provides us with the stream. + // already been done by the ThreadPool or the + // ThreadPerConnection, which provides us with the stream. // assert(stream.i == stream.b.end()); stream.i = stream.b.begin() + 8; @@ -2564,8 +2601,14 @@ Ice::ConnectionI::run() } } -Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection) : - _connection(connection) +void +Ice::ConnectionI::runSecond() +{ +} + +Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection, bool second) : + _connection(connection), + _second(second) { } @@ -2579,7 +2622,14 @@ Ice::ConnectionI::ThreadPerConnection::run() try { - _connection->run(); + if(_second) + { + _connection->runSecond(); + } + else + { + _connection->run(); + } } catch(const Exception& ex) { diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index efd576c0fc5..7bbfa011d1b 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -131,22 +131,25 @@ private: void invokeAll(IceInternal::BasicStream&, Int, Int, Byte, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&); - void run(); + void run(); // For thread per connection. + void runSecond(); // For second thread per connection. class ThreadPerConnection : public IceUtil::Thread { public: - ThreadPerConnection(const ConnectionIPtr&); + ThreadPerConnection(const ConnectionIPtr&, bool); virtual void run(); private: ConnectionIPtr _connection; + const bool _second; }; friend class ThreadPerConnection; // Defined as mutable because "isFinished() const" sets this to 0. mutable IceUtil::ThreadPtr _threadPerConnection; + mutable IceUtil::ThreadPtr _secondThreadPerConnection; IceInternal::TransceiverPtr _transceiver; const std::string _desc; diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index 031d3a723bd..d72faa0b850 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -232,7 +232,7 @@ IceInternal::Instance::serverThreadPool() return _serverThreadPool; } -bool +int IceInternal::Instance::threadPerConnection() const { // No mutex lock, immutable. @@ -457,7 +457,7 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi _messageSizeMax(0), _clientACM(0), _serverACM(0), - _threadPerConnection(false), + _threadPerConnection(0), _threadPerConnectionStackSize(0), _defaultContext(new SharedContext(initData.defaultContext)) { @@ -629,7 +629,18 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi const_cast<Int&>(_clientACM) = _initData.properties->getPropertyAsIntWithDefault("Ice.ACM.Client", 60); const_cast<Int&>(_serverACM) = _initData.properties->getPropertyAsInt("Ice.ACM.Server"); - const_cast<bool&>(_threadPerConnection) = _initData.properties->getPropertyAsInt("Ice.ThreadPerConnection") > 0; + { + Int threadPerConnection = _initData.properties->getPropertyAsInt("Ice.ThreadPerConnection"); + if(threadPerConnection < 0) + { + threadPerConnection = 0; + } + if(threadPerConnection > 2) + { + threadPerConnection = 2; + } + const_cast<Int&>(_threadPerConnection) = threadPerConnection; + } { Int stackSize = _initData.properties->getPropertyAsInt("Ice.ThreadPerConnection.StackSize"); diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h index 3bfe37a1f63..b9bbca6a9fa 100644 --- a/cpp/src/Ice/Instance.h +++ b/cpp/src/Ice/Instance.h @@ -65,7 +65,7 @@ public: ObjectAdapterFactoryPtr objectAdapterFactory() const; ThreadPoolPtr clientThreadPool(); ThreadPoolPtr serverThreadPool(); - bool threadPerConnection() const; + Ice::Int threadPerConnection() const; size_t threadPerConnectionStackSize() const; EndpointFactoryManagerPtr endpointFactoryManager() const; DynamicLibraryListPtr dynamicLibraryList() const; @@ -110,7 +110,7 @@ private: ObjectAdapterFactoryPtr _objectAdapterFactory; ThreadPoolPtr _clientThreadPool; ThreadPoolPtr _serverThreadPool; - const bool _threadPerConnection; + const Ice::Int _threadPerConnection; const size_t _threadPerConnectionStackSize; EndpointFactoryManagerPtr _endpointFactoryManager; DynamicLibraryListPtr _dynamicLibraryList; |