summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2006-08-15 18:11:38 +0000
committerMarc Laukien <marc@zeroc.com>2006-08-15 18:11:38 +0000
commitd3f3dff08b1302845a3d4ab08fd7e7fb7ecaaa13 (patch)
treedf7c79f1e34a583e555d3c8a5e9efae10374d1ed /cpp/src
parentfixing ref-count bug for Communicator wrapper (diff)
downloadice-d3f3dff08b1302845a3d4ab08fd7e7fb7ecaaa13.tar.bz2
ice-d3f3dff08b1302845a3d4ab08fd7e7fb7ecaaa13.tar.xz
ice-d3f3dff08b1302845a3d4ab08fd7e7fb7ecaaa13.zip
started the 'two threads per connection'
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp104
-rw-r--r--cpp/src/Ice/ConnectionI.h7
-rw-r--r--cpp/src/Ice/Instance.cpp17
-rw-r--r--cpp/src/Ice/Instance.h4
4 files changed, 98 insertions, 34 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index f3a0ff704b8..c66e096b376 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -44,8 +44,7 @@ Ice::ConnectionI::validate()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_instance->threadPerConnection() &&
- _threadPerConnection->getThreadControl() != IceUtil::ThreadControl())
+ if(_threadPerConnection && _threadPerConnection->getThreadControl() != IceUtil::ThreadControl())
{
//
// In thread per connection mode, this connection's thread
@@ -319,6 +318,7 @@ bool
Ice::ConnectionI::isFinished() const
{
IceUtil::ThreadPtr threadPerConnection;
+ IceUtil::ThreadPtr secondThreadPerConnection;
{
//
@@ -333,8 +333,17 @@ Ice::ConnectionI::isFinished() const
return false;
}
- if(_transceiver || _dispatchCount != 0 ||
- (_threadPerConnection && _threadPerConnection->isAlive()))
+ if(_transceiver || _dispatchCount != 0)
+ {
+ return false;
+ }
+
+ if(_threadPerConnection && _threadPerConnection->isAlive())
+ {
+ return false;
+ }
+
+ if(_secondThreadPerConnection && _secondThreadPerConnection->isAlive())
{
return false;
}
@@ -343,6 +352,9 @@ Ice::ConnectionI::isFinished() const
threadPerConnection = _threadPerConnection;
_threadPerConnection = 0;
+
+ secondThreadPerConnection = _secondThreadPerConnection;
+ _secondThreadPerConnection = 0;
}
if(threadPerConnection)
@@ -350,6 +362,11 @@ Ice::ConnectionI::isFinished() const
threadPerConnection->getThreadControl().join();
}
+ if(secondThreadPerConnection)
+ {
+ secondThreadPerConnection->getThreadControl().join();
+ }
+
return true;
}
@@ -380,6 +397,7 @@ void
Ice::ConnectionI::waitUntilFinished()
{
IceUtil::ThreadPtr threadPerConnection;
+ IceUtil::ThreadPtr secondThreadPerConnection;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -442,12 +460,20 @@ Ice::ConnectionI::waitUntilFinished()
threadPerConnection = _threadPerConnection;
_threadPerConnection = 0;
+
+ secondThreadPerConnection = _secondThreadPerConnection;
+ _secondThreadPerConnection = 0;
}
if(threadPerConnection)
{
threadPerConnection->getThreadControl().join();
}
+
+ if(secondThreadPerConnection)
+ {
+ secondThreadPerConnection->getThreadControl().join();
+ }
}
void
@@ -1239,21 +1265,21 @@ Ice::ConnectionI::createProxy(const Identity& ident) const
bool
Ice::ConnectionI::datagram() const
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
return _endpoint->datagram(); // No mutex protection necessary, _endpoint is immutable.
}
bool
Ice::ConnectionI::readable() const
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
return true;
}
void
Ice::ConnectionI::read(BasicStream& stream)
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
_transceiver->read(stream, 0);
@@ -1267,7 +1293,7 @@ Ice::ConnectionI::read(BasicStream& stream)
void
Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
Byte compress = 0;
Int requestId = 0;
@@ -1321,7 +1347,7 @@ Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool)
void
Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool)
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
threadPool->promoteFollower();
@@ -1465,8 +1491,8 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
}
int& compressionLevel = const_cast<int&>(_compressionLevel);
- compressionLevel =
- _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Compression.Level", 1);
+ compressionLevel = \
+ _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Compression.Level", 1);
if(compressionLevel < 1)
{
compressionLevel = 1;
@@ -1485,7 +1511,7 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
__setNoDelete(true);
try
{
- if(!_instance->threadPerConnection())
+ if(_instance->threadPerConnection() == 0)
{
//
// Only set _threadPool if we really need it, i.e., if we are
@@ -1505,18 +1531,28 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance,
else
{
//
- // If we are in thread per connection mode, create the thread
- // for this connection.
+ // If we are in thread per connection mode, create the
+ // thread for this connection.
//
- _threadPerConnection = new ThreadPerConnection(this);
+ _threadPerConnection = new ThreadPerConnection(this, false);
_threadPerConnection->start(_instance->threadPerConnectionStackSize());
+
+ if(_instance->threadPerConnection() == 2)
+ {
+ //
+ // If we are in two threads per connection mode,
+ // create the second thread for this connection.
+ //
+ _secondThreadPerConnection = new ThreadPerConnection(this, true);
+ _secondThreadPerConnection->start(_instance->threadPerConnectionStackSize());
+ }
}
}
catch(const IceUtil::Exception& ex)
{
{
Error out(_logger);
- if(_instance->threadPerConnection())
+ if(_instance->threadPerConnection() > 0)
{
out << "cannot create thread for connection:\n" << ex;
}
@@ -1549,6 +1585,7 @@ Ice::ConnectionI::~ConnectionI()
assert(!_transceiver);
assert(_dispatchCount == 0);
assert(!_threadPerConnection);
+ assert(!_secondThreadPerConnection);
}
void
@@ -1649,7 +1686,7 @@ Ice::ConnectionI::setState(State state)
{
return;
}
- if(!_instance->threadPerConnection())
+ if(!_threadPerConnection)
{
registerWithPool();
}
@@ -1666,7 +1703,7 @@ Ice::ConnectionI::setState(State state)
{
return;
}
- if(!_instance->threadPerConnection())
+ if(!_threadPerConnection)
{
unregisterWithPool();
}
@@ -1682,7 +1719,7 @@ Ice::ConnectionI::setState(State state)
{
return;
}
- if(!_instance->threadPerConnection())
+ if(!_threadPerConnection)
{
registerWithPool(); // We need to continue to read in closing state.
}
@@ -1691,7 +1728,7 @@ Ice::ConnectionI::setState(State state)
case StateClosed:
{
- if(_instance->threadPerConnection())
+ if(_threadPerConnection)
{
//
// If we are in thread per connection mode, we
@@ -1834,7 +1871,7 @@ Ice::ConnectionI::initiateShutdown() const
void
Ice::ConnectionI::registerWithPool()
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
if(!_registeredWithPool)
{
@@ -1846,7 +1883,7 @@ Ice::ConnectionI::registerWithPool()
void
Ice::ConnectionI::unregisterWithPool()
{
- assert(!_instance->threadPerConnection()); // Only for use with a thread pool.
+ assert(!_threadPerConnection); // Only for use with a thread pool.
if(_registeredWithPool)
{
@@ -2017,8 +2054,8 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
{
//
// We don't need to check magic and version here. This has
- // already been done by the ThreadPool or ThreadPerConnection,
- // which provides us with the stream.
+ // already been done by the ThreadPool or the
+ // ThreadPerConnection, which provides us with the stream.
//
assert(stream.i == stream.b.end());
stream.i = stream.b.begin() + 8;
@@ -2564,8 +2601,14 @@ Ice::ConnectionI::run()
}
}
-Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection) :
- _connection(connection)
+void
+Ice::ConnectionI::runSecond()
+{
+}
+
+Ice::ConnectionI::ThreadPerConnection::ThreadPerConnection(const ConnectionIPtr& connection, bool second) :
+ _connection(connection),
+ _second(second)
{
}
@@ -2579,7 +2622,14 @@ Ice::ConnectionI::ThreadPerConnection::run()
try
{
- _connection->run();
+ if(_second)
+ {
+ _connection->runSecond();
+ }
+ else
+ {
+ _connection->run();
+ }
}
catch(const Exception& ex)
{
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index efd576c0fc5..7bbfa011d1b 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -131,22 +131,25 @@ private:
void invokeAll(IceInternal::BasicStream&, Int, Int, Byte,
const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&);
- void run();
+ void run(); // For thread per connection.
+ void runSecond(); // For second thread per connection.
class ThreadPerConnection : public IceUtil::Thread
{
public:
- ThreadPerConnection(const ConnectionIPtr&);
+ ThreadPerConnection(const ConnectionIPtr&, bool);
virtual void run();
private:
ConnectionIPtr _connection;
+ const bool _second;
};
friend class ThreadPerConnection;
// Defined as mutable because "isFinished() const" sets this to 0.
mutable IceUtil::ThreadPtr _threadPerConnection;
+ mutable IceUtil::ThreadPtr _secondThreadPerConnection;
IceInternal::TransceiverPtr _transceiver;
const std::string _desc;
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp
index 031d3a723bd..d72faa0b850 100644
--- a/cpp/src/Ice/Instance.cpp
+++ b/cpp/src/Ice/Instance.cpp
@@ -232,7 +232,7 @@ IceInternal::Instance::serverThreadPool()
return _serverThreadPool;
}
-bool
+int
IceInternal::Instance::threadPerConnection() const
{
// No mutex lock, immutable.
@@ -457,7 +457,7 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi
_messageSizeMax(0),
_clientACM(0),
_serverACM(0),
- _threadPerConnection(false),
+ _threadPerConnection(0),
_threadPerConnectionStackSize(0),
_defaultContext(new SharedContext(initData.defaultContext))
{
@@ -629,7 +629,18 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi
const_cast<Int&>(_clientACM) = _initData.properties->getPropertyAsIntWithDefault("Ice.ACM.Client", 60);
const_cast<Int&>(_serverACM) = _initData.properties->getPropertyAsInt("Ice.ACM.Server");
- const_cast<bool&>(_threadPerConnection) = _initData.properties->getPropertyAsInt("Ice.ThreadPerConnection") > 0;
+ {
+ Int threadPerConnection = _initData.properties->getPropertyAsInt("Ice.ThreadPerConnection");
+ if(threadPerConnection < 0)
+ {
+ threadPerConnection = 0;
+ }
+ if(threadPerConnection > 2)
+ {
+ threadPerConnection = 2;
+ }
+ const_cast<Int&>(_threadPerConnection) = threadPerConnection;
+ }
{
Int stackSize = _initData.properties->getPropertyAsInt("Ice.ThreadPerConnection.StackSize");
diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h
index 3bfe37a1f63..b9bbca6a9fa 100644
--- a/cpp/src/Ice/Instance.h
+++ b/cpp/src/Ice/Instance.h
@@ -65,7 +65,7 @@ public:
ObjectAdapterFactoryPtr objectAdapterFactory() const;
ThreadPoolPtr clientThreadPool();
ThreadPoolPtr serverThreadPool();
- bool threadPerConnection() const;
+ Ice::Int threadPerConnection() const;
size_t threadPerConnectionStackSize() const;
EndpointFactoryManagerPtr endpointFactoryManager() const;
DynamicLibraryListPtr dynamicLibraryList() const;
@@ -110,7 +110,7 @@ private:
ObjectAdapterFactoryPtr _objectAdapterFactory;
ThreadPoolPtr _clientThreadPool;
ThreadPoolPtr _serverThreadPool;
- const bool _threadPerConnection;
+ const Ice::Int _threadPerConnection;
const size_t _threadPerConnectionStackSize;
EndpointFactoryManagerPtr _endpointFactoryManager;
DynamicLibraryListPtr _dynamicLibraryList;