summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp104
1 files changed, 77 insertions, 27 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index f3a0ff704b8..c66e096b376 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -44,8 +44,7 @@ Ice::ConnectionI::validate()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_instance->threadPerConnection() &&
- _threadPerConnection->getThreadControl() != IceUtil::ThreadControl())
+ if(_threadPerConnection && _threadPerConnection->getThreadControl() != IceUtil::ThreadControl())
{
//
// In thread per connection mode, this connection's thread
@@ -319,6 +318,7 @@ bool
Ice::ConnectionI::isFinished() const
{
IceUtil::ThreadPtr threadPerConnection;
+ IceUtil::ThreadPtr secondThreadPerConnection;
{
//
@@ -333,8 +333,17 @@ Ice::ConnectionI::isFinished() const
return false;
}
- if(_transceiver || _dispatchCount != 0 ||
- (_threadPerConnection && _threadPerConnection->isAlive()))
+ if(_transceiver || _dispatchCount != 0)
+ {
+ return false;
+ }
+
+ if(_threadPerConnection && _threadPerConnection->isAlive())
+ {
+ return false;
+ }
+
+ if(_secondThreadPerConnection && _secondThreadPerConnection->isAlive())
{
return false;
}
@@ -343,6 +352,9 @@ Ice::ConnectionI::isFinished() const
threadPerConnection = _threadPerConnection;
_threadPerConnection = 0;
+
+ secondThreadPerConnection = _secondThreadPerConnection;
+ _secondThreadPerConnection = 0;
}
if(threadPerConnection)
@@ -350,6 +362,11 @@ Ice::ConnectionI::isFinished() const
threadPerConnection->getThreadControl().join();
}
+ if(secondThreadPerConnection)
+ {
+ secondThreadPerConnection->getThreadControl().join();
+ }
+
return true;
}
@@ -380,6 +397,7 @@ void
Ice::ConnectionI::waitUntilFinished()
{
IceUtil::ThreadPtr threadPerConnection;
+ IceUtil::ThreadPtr secondThreadPerConnection;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -442,12 +460,20 @@ Ice::ConnectionI::waitUntilFinished()
threadPerConnection = _threadPerConnection;
_threadPerConnection = 0;
+
+ secondThreadPerConnection = _secondThreadPerConnection;
+ _secondThreadPerConnection = 0;
}
if(threadPerConnection)
{
threadPerConnection->getThreadControl().join();
}
+
+ if(secondThreadPerConnection)
+ {
+ secondThreadPerConnection->getThreadControl().join();
+ }
}
void
@@ -1239,21 +1265,21 @@ Ice::ConnectionI::createProxy(const Identity& ident) const
bool
Ice::ConnectionI::datagram() const
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
return _endpoint->datagram(); // No mutex protection necessary, _endpoint is immutable.
}
bool
Ice::ConnectionI::readable() const
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
return true;
}
void
Ice::ConnectionI::read(BasicStream& stream)
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
_transceiver->read(stream, 0);
@@ -1267,7 +1293,7 @@ Ice::ConnectionI::read(BasicStream& stream)
void
Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
Byte compress = 0;
Int requestId = 0;
@@ -1321,7 +1347,7 @@ Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
void
Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool)
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
threadPool->promoteFollower();
@@ -1465,8 +1491,8 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
}
int& compressionLevel = const_cast<int&>(_compressionLevel);
- compressionLevel =
- _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Compression.Level", 1);
+ compressionLevel = \
+ _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Compression.Level", 1);
if(compressionLevel < 1)
{
compressionLevel = 1;
@@ -1485,7 +1511,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
__setNoDelete(true);
try
{
- if(!_instance->threadPerConnection())
+ if(_instance->threadPerConnection() == 0)
{
//
// Only set _threadPool if we really need it, i.e., if we are
@@ -1505,18 +1531,28 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
else
{
//
- // If we are in thread per connection mode, create the thread
- // for this connection.
+ // If we are in thread per connection mode, create the
+ // thread for this connection.
//
- _threadPerConnection = new ThreadPerConnection(this);
+ _threadPerConnection = new ThreadPerConnection(this, false);
_threadPerConnection->start(_instance->threadPerConnectionStackSize());
+
+ if(_instance->threadPerConnection() == 2)
+ {
+ //
+ // If we are in two threads per connection mode,
+ // create the second thread for this connection.
+ //
+ _secondThreadPerConnection = new ThreadPerConnection(this, true);
+ _secondThreadPerConnection->start(_instance->threadPerConnectionStackSize());
+ }
}
}
catch(const IceUtil::Exception& ex)
{
{
Error out(_logger);
- if(_instance->threadPerConnection())
+ if(_instance->threadPerConnection() > 0)
{
out << "cannot create thread for connection:\n" << ex;
}
@@ -1549,6 +1585,7 @@ Ice::ConnectionI::~ConnectionI()
assert(!_transceiver);
assert(_dispatchCount == 0);
assert(!_threadPerConnection);
+ assert(!_secondThreadPerConnection);
}
void
@@ -1649,7 +1686,7 @@ Ice::ConnectionI::setState(State state)
{
return;
}
- if(!_instance->threadPerConnection())
+ if(!_threadPerConnection)
{
registerWithPool();
}
@@ -1666,7 +1703,7 @@ Ice::ConnectionI::setState(State state)
{
return;
}
- if(!_instance->threadPerConnection())
+ if(!_threadPerConnection)
{
unregisterWithPool();
}
@@ -1682,7 +1719,7 @@ Ice::ConnectionI::setState(State state)
{
return;
}
- if(!_instance->threadPerConnection())
+ if(!_threadPerConnection)
{
registerWithPool(); // We need to continue to read in closing state.
}
@@ -1691,7 +1728,7 @@ Ice::ConnectionI::setState(State state)
case StateClosed:
{
- if(_instance->threadPerConnection())
+ if(_threadPerConnection)
{
//
// If we are in thread per connection mode, we
@@ -1834,7 +1871,7 @@ Ice::ConnectionI::initiateShutdown() const
void
Ice::ConnectionI::registerWithPool()
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
if(!_registeredWithPool)
{
@@ -1846,7 +1883,7 @@ Ice::ConnectionI::registerWithPool()
void
Ice::ConnectionI::unregisterWithPool()
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
if(_registeredWithPool)
{
@@ -2017,8 +2054,8 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
{
//
// We don't need to check magic and version here. This has
- // already been done by the ThreadPool or ThreadPerConnection,
- // which provides us with the stream.
+ // already been done by the ThreadPool or the
+ // ThreadPerConnection, which provides us with the stream.
//
assert(stream.i == stream.b.end());
stream.i = stream.b.begin() + 8;
@@ -2564,8 +2601,14 @@ Ice::ConnectionI::run()
}
}
-Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection) :
- _connection(connection)
+void
+Ice::ConnectionI::runSecond()
+{
+}
+
+Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection, bool second) :
+ _connection(connection),
+ _second(second)
{
}
@@ -2579,7 +2622,14 @@ Ice::ConnectionI::ThreadPerConnection::run()
try
{
- _connection->run();
+ if(_second)
+ {
+ _connection->runSecond();
+ }
+ else
+ {
+ _connection->run();
+ }
}
catch(const Exception& ex)
{