diff options
author | Mark Spruiell <mes@zeroc.com> | 2007-01-25 16:50:20 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2007-01-25 16:50:20 +0000 |
commit | 2af1be4b75d36ed2022c304c9030ff34162d44db (patch) | |
tree | 229182241b85b3dd9b57cf56a02e774b83fcd47a /cpp/src/Ice/ConnectionFactory.cpp | |
parent | IceGrid file cache fixes (diff) | |
download | ice-2af1be4b75d36ed2022c304c9030ff34162d44db.tar.bz2 ice-2af1be4b75d36ed2022c304c9030ff34162d44db.tar.xz ice-2af1be4b75d36ed2022c304c9030ff34162d44db.zip |
adding thread-per-connection settings for proxies & OAs
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 65 |
1 files changed, 39 insertions, 26 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index fd1a188387c..b82c780e1a2 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -93,7 +93,8 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() } ConnectionIPtr -IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool moreEndpts, bool& compress) +IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool moreEndpts, + bool threadPerConnection, bool& compress) { assert(!endpts.empty()); vector<EndpointIPtr> endpoints = endpts; @@ -158,9 +159,11 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt { // // Don't return connections for which destruction has - // been initiated. + // been initiated. The connection must also match the + // requested thread-per-connection setting. // - if(!pr.first->second->isDestroyed()) + if(!pr.first->second->isDestroyed() && + pr.first->second->threadPerConnection() == threadPerConnection) { if(_instance->defaultsAndOverrides()->overrideCompress) { @@ -222,11 +225,13 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt while(pr.first != pr.second) { - // - // Don't return connections for which destruction has - // been initiated. - // - if(!pr.first->second->isDestroyed()) + // + // Don't return connections for which destruction has + // been initiated. The connection must also match the + // requested thread-per-connection setting. + // + if(!pr.first->second->isDestroyed() && + pr.first->second->threadPerConnection() == threadPerConnection) { if(_instance->defaultsAndOverrides()->overrideCompress) { @@ -287,7 +292,8 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt transceiver = connector->connect(timeout); assert(transceiver); } - connection = new ConnectionI(_instance, transceiver, endpoint, 0); + connection = new ConnectionI(_instance, transceiver, endpoint, 0, threadPerConnection, + _instance->threadPerConnectionStackSize()); connection->validate(); if(_instance->defaultsAndOverrides()->overrideCompress) @@ -655,21 +661,21 @@ IceInternal::IncomingConnectionFactory::flushBatchRequests() bool IceInternal::IncomingConnectionFactory::datagram() const { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. return _endpoint->datagram(); } bool IceInternal::IncomingConnectionFactory::readable() const { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. return false; } void IceInternal::IncomingConnectionFactory::read(BasicStream&) { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. assert(false); // Must not be called. } @@ -695,7 +701,7 @@ private: void IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPtr& threadPool) { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. ConnectionIPtr connection; @@ -758,7 +764,8 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt try { - connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter); + connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection, + _threadPerConnectionStackSize); } catch(const LocalException&) { @@ -792,7 +799,7 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt void IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool) { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -854,6 +861,10 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance _endpoint->compress(_instance->defaultsAndOverrides()->overrideCompressValue); } + ObjectAdapterI* adapterImpl = dynamic_cast<ObjectAdapterI*>(_adapter.get()); + _threadPerConnection = adapterImpl->getThreadPerConnection(); + _threadPerConnectionStackSize = adapterImpl->getThreadPerConnectionStackSize(); + const_cast<TransceiverPtr&>(_transceiver) = _endpoint->serverTransceiver(const_cast<EndpointIPtr&>(_endpoint)); if(_transceiver) { @@ -861,7 +872,8 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance try { - connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter); + connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter, _threadPerConnection, + _threadPerConnectionStackSize); connection->validate(); } catch(const LocalException&) @@ -889,7 +901,7 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance __setNoDelete(true); try { - if(_instance->threadPerConnection()) + if(_threadPerConnection) { // // If we are in thread per connection mode, we also use @@ -897,16 +909,16 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance // accepts new connections on this endpoint. // _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this); - _threadPerIncomingConnectionFactory->start(_instance->threadPerConnectionStackSize()); + _threadPerIncomingConnectionFactory->start(_threadPerConnectionStackSize); } else { - dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->incFdsInUse(); + adapterImpl->getThreadPool()->incFdsInUse(); } } catch(const IceUtil::Exception& ex) { - if(_instance->threadPerConnection()) + if(_threadPerConnection) { Error out(_instance->initializationData().logger); out << "cannot create thread for incoming connection factory:\n" << ex; @@ -952,7 +964,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) { return; } - if(!_instance->threadPerConnection() && _acceptor) + if(!_threadPerConnection && _acceptor) { registerWithPool(); } @@ -966,7 +978,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) { return; } - if(!_instance->threadPerConnection() && _acceptor) + if(!_threadPerConnection && _acceptor) { unregisterWithPool(); } @@ -978,7 +990,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) { if(_acceptor) { - if(_instance->threadPerConnection()) + if(_threadPerConnection) { // // If we are in thread per connection mode, we connect @@ -1018,7 +1030,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) void IceInternal::IncomingConnectionFactory::registerWithPool() { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. assert(_acceptor); // Not for datagram connections. if(!_registeredWithPool) @@ -1031,7 +1043,7 @@ IceInternal::IncomingConnectionFactory::registerWithPool() void IceInternal::IncomingConnectionFactory::unregisterWithPool() { - assert(!_instance->threadPerConnection()); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. assert(_acceptor); // Not for datagram connections. if(_registeredWithPool) @@ -1132,7 +1144,8 @@ IceInternal::IncomingConnectionFactory::run() { try { - connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter); + connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection, + _threadPerConnectionStackSize); } catch(const LocalException&) { |