diff options
author | Marc Laukien <marc@zeroc.com> | 2006-08-15 18:11:38 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2006-08-15 18:11:38 +0000 |
commit | d3f3dff08b1302845a3d4ab08fd7e7fb7ecaaa13 (patch) | |
tree | df7c79f1e34a583e555d3c8a5e9efae10374d1ed /cpp/src/Ice/ConnectionI.cpp | |
parent | fixing ref-count bug for Communicator wrapper (diff) | |
download | ice-d3f3dff08b1302845a3d4ab08fd7e7fb7ecaaa13.tar.bz2 ice-d3f3dff08b1302845a3d4ab08fd7e7fb7ecaaa13.tar.xz ice-d3f3dff08b1302845a3d4ab08fd7e7fb7ecaaa13.zip |
started the 'two threads per connection'
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 104 |
1 files changed, 77 insertions, 27 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index f3a0ff704b8..c66e096b376 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -44,8 +44,7 @@ Ice::ConnectionI::validate() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_instance->threadPerConnection() && - _threadPerConnection->getThreadControl() != IceUtil::ThreadControl()) + if(_threadPerConnection && _threadPerConnection->getThreadControl() != IceUtil::ThreadControl()) { // // In thread per connection mode, this connection's thread @@ -319,6 +318,7 @@ bool Ice::ConnectionI::isFinished() const { IceUtil::ThreadPtr threadPerConnection; + IceUtil::ThreadPtr secondThreadPerConnection; { // @@ -333,8 +333,17 @@ Ice::ConnectionI::isFinished() const return false; } - if(_transceiver || _dispatchCount != 0 || - (_threadPerConnection && _threadPerConnection->isAlive())) + if(_transceiver || _dispatchCount != 0) + { + return false; + } + + if(_threadPerConnection && _threadPerConnection->isAlive()) + { + return false; + } + + if(_secondThreadPerConnection && _secondThreadPerConnection->isAlive()) { return false; } @@ -343,6 +352,9 @@ Ice::ConnectionI::isFinished() const threadPerConnection = _threadPerConnection; _threadPerConnection = 0; + + secondThreadPerConnection = _secondThreadPerConnection; + _secondThreadPerConnection = 0; } if(threadPerConnection) @@ -350,6 +362,11 @@ Ice::ConnectionI::isFinished() const threadPerConnection->getThreadControl().join(); } + if(secondThreadPerConnection) + { + secondThreadPerConnection->getThreadControl().join(); + } + return true; } @@ -380,6 +397,7 @@ void Ice::ConnectionI::waitUntilFinished() { IceUtil::ThreadPtr threadPerConnection; + IceUtil::ThreadPtr secondThreadPerConnection; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -442,12 +460,20 @@ Ice::ConnectionI::waitUntilFinished() threadPerConnection = _threadPerConnection; _threadPerConnection = 0; + + secondThreadPerConnection = _secondThreadPerConnection; + _secondThreadPerConnection = 0; } if(threadPerConnection) { threadPerConnection->getThreadControl().join(); } + + if(secondThreadPerConnection) + { + secondThreadPerConnection->getThreadControl().join(); + } } void @@ -1239,21 +1265,21 @@ Ice::ConnectionI::createProxy(const Identity& ident) const bool Ice::ConnectionI::datagram() const { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. return _endpoint->datagram(); // No mutex protection necessary, _endpoint is immutable. } bool Ice::ConnectionI::readable() const { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. return true; } void Ice::ConnectionI::read(BasicStream& stream) { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. _transceiver->read(stream, 0); @@ -1267,7 +1293,7 @@ Ice::ConnectionI::read(BasicStream& stream) void Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool) { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. Byte compress = 0; Int requestId = 0; @@ -1321,7 +1347,7 @@ Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool) void Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool) { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. threadPool->promoteFollower(); @@ -1465,8 +1491,8 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, } int& compressionLevel = const_cast<int&>(_compressionLevel); - compressionLevel = - _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Compression.Level", 1); + compressionLevel = \ + _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Compression.Level", 1); if(compressionLevel < 1) { compressionLevel = 1; @@ -1485,7 +1511,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, __setNoDelete(true); try { - if(!_instance->threadPerConnection()) + if(_instance->threadPerConnection() == 0) { // // Only set _threadPool if we really need it, i.e., if we are @@ -1505,18 +1531,28 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, else { // - // If we are in thread per connection mode, create the thread - // for this connection. + // If we are in thread per connection mode, create the + // thread for this connection. // - _threadPerConnection = new ThreadPerConnection(this); + _threadPerConnection = new ThreadPerConnection(this, false); _threadPerConnection->start(_instance->threadPerConnectionStackSize()); + + if(_instance->threadPerConnection() == 2) + { + // + // If we are in two threads per connection mode, + // create the second thread for this connection. + // + _secondThreadPerConnection = new ThreadPerConnection(this, true); + _secondThreadPerConnection->start(_instance->threadPerConnectionStackSize()); + } } } catch(const IceUtil::Exception& ex) { { Error out(_logger); - if(_instance->threadPerConnection()) + if(_instance->threadPerConnection() > 0) { out << "cannot create thread for connection:\n" << ex; } @@ -1549,6 +1585,7 @@ Ice::ConnectionI::~ConnectionI() assert(!_transceiver); assert(_dispatchCount == 0); assert(!_threadPerConnection); + assert(!_secondThreadPerConnection); } void @@ -1649,7 +1686,7 @@ Ice::ConnectionI::setState(State state) { return; } - if(!_instance->threadPerConnection()) + if(!_threadPerConnection) { registerWithPool(); } @@ -1666,7 +1703,7 @@ Ice::ConnectionI::setState(State state) { return; } - if(!_instance->threadPerConnection()) + if(!_threadPerConnection) { unregisterWithPool(); } @@ -1682,7 +1719,7 @@ Ice::ConnectionI::setState(State state) { return; } - if(!_instance->threadPerConnection()) + if(!_threadPerConnection) { registerWithPool(); // We need to continue to read in closing state. } @@ -1691,7 +1728,7 @@ Ice::ConnectionI::setState(State state) case StateClosed: { - if(_instance->threadPerConnection()) + if(_threadPerConnection) { // // If we are in thread per connection mode, we @@ -1834,7 +1871,7 @@ Ice::ConnectionI::initiateShutdown() const void Ice::ConnectionI::registerWithPool() { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. if(!_registeredWithPool) { @@ -1846,7 +1883,7 @@ Ice::ConnectionI::registerWithPool() void Ice::ConnectionI::unregisterWithPool() { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. if(_registeredWithPool) { @@ -2017,8 +2054,8 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request { // // We don't need to check magic and version here. This has - // already been done by the ThreadPool or ThreadPerConnection, - // which provides us with the stream. + // already been done by the ThreadPool or the + // ThreadPerConnection, which provides us with the stream. // assert(stream.i == stream.b.end()); stream.i = stream.b.begin() + 8; @@ -2564,8 +2601,14 @@ Ice::ConnectionI::run() } } -Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection) : - _connection(connection) +void +Ice::ConnectionI::runSecond() +{ +} + +Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection, bool second) : + _connection(connection), + _second(second) { } @@ -2579,7 +2622,14 @@ Ice::ConnectionI::ThreadPerConnection::run() try { - _connection->run(); + if(_second) + { + _connection->runSecond(); + } + else + { + _connection->run(); + } } catch(const Exception& ex) { |