diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 338 |
1 files changed, 30 insertions, 308 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 3b0da54812d..9ee0e53b924 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -52,15 +52,6 @@ struct RandomNumberGenerator : public std::unary_function<ptrdiff_t, ptrdiff_t> bool IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator<(const ConnectorInfo& other) const { - if(!threadPerConnection && other.threadPerConnection) - { - return true; - } - else if(other.threadPerConnection < threadPerConnection) - { - return false; - } - return connector < other.connector; } @@ -126,8 +117,7 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() ConnectionIPtr IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore, - bool threadPerConnection, Ice::EndpointSelectionType selType, - bool& compress) + Ice::EndpointSelectionType selType, bool& compress) { assert(!endpts.empty()); @@ -139,7 +129,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // // Try to find a connection to one of the given endpoints. // - Ice::ConnectionIPtr connection = findConnection(endpoints, threadPerConnection, compress); + Ice::ConnectionIPtr connection = findConnection(endpoints, compress); if(connection) { return connection; @@ -171,7 +161,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt for(vector<ConnectorPtr>::const_iterator r = cons.begin(); r != cons.end(); ++r) { assert(*r); - connectors.push_back(ConnectorInfo(*r, *p, threadPerConnection)); + connectors.push_back(ConnectorInfo(*r, *p)); } } catch(const Ice::LocalException& ex) @@ -207,21 +197,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt { try { - int timeout; - if(defaultsAndOverrides->overrideConnectTimeout) - { - timeout = defaultsAndOverrides->overrideConnectTimeoutValue; - } - else - { - // - // It is not necessary to check for overrideTimeout, the endpoint has already - // been modified with this override, if set. - // - timeout = q->endpoint->timeout(); - } - - connection = createConnection(q->connector->connect(timeout), *q); + connection = createConnection(q->connector->connect(), *q); connection->start(0); if(defaultsAndOverrides->overrideCompress) @@ -267,7 +243,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt void IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore, - bool tpc, Ice::EndpointSelectionType selType, + Ice::EndpointSelectionType selType, const CreateConnectionCallbackPtr& callback) { assert(!endpts.empty()); @@ -283,7 +259,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt try { bool compress; - Ice::ConnectionIPtr connection = findConnection(endpoints, tpc, compress); + Ice::ConnectionIPtr connection = findConnection(endpoints, compress); if(connection) { callback->setConnection(connection, compress); @@ -296,7 +272,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt return; } - ConnectCallbackPtr cb = new ConnectCallback(this, endpoints, hasMore, callback, selType, tpc); + ConnectCallbackPtr cb = new ConnectCallback(this, endpoints, hasMore, callback, selType); cb->getConnectors(); } @@ -454,7 +430,7 @@ IceInternal::OutgoingConnectionFactory::applyOverrides(const vector<EndpointIPtr } ConnectionIPtr -IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr>& endpoints, bool tpc, bool& compress) +IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr>& endpoints, bool& compress) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_destroyed) @@ -471,8 +447,7 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = pr.first; q != pr.second; ++q) { - if(q->second->isActiveOrHolding() && - q->second->threadPerConnection() == tpc) // Don't return destroyed or un-validated connections + if(q->second->isActiveOrHolding()) // Don't return destroyed or un-validated connections { if(defaultsAndOverrides->overrideCompress) { @@ -731,9 +706,7 @@ IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& t throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); } - Ice::ConnectionIPtr connection = new ConnectionI(_instance, transceiver, ci.endpoint->compress(false), - 0, ci.threadPerConnection, - _instance->threadPerConnectionStackSize()); + Ice::ConnectionIPtr connection = new ConnectionI(_instance, transceiver, ci.endpoint->compress(false), 0); _connections.insert(pair<const ConnectorInfo, ConnectionIPtr>(ci, connection)); _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(ci.endpoint, connection)); return connection; @@ -832,11 +805,6 @@ IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex // If the connection is finished, we remove it right away instead of // waiting for the reaping. // - // NOTE: it's possible for the connection to not be finished yet. That's - // for instance the case when using thread per connection and if it's the - // thread which is calling back the outgoing connection factory to notify - // it of the failure. - // { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator, @@ -898,15 +866,12 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const O const vector<EndpointIPtr>& endpoints, bool hasMore, const CreateConnectionCallbackPtr& cb, - Ice::EndpointSelectionType selType, - bool threadPerConnection) : + Ice::EndpointSelectionType selType) : _factory(factory), - _selectorThread(_factory->_instance->selectorThread()), _endpoints(endpoints), _hasMore(hasMore), _callback(cb), - _selType(selType), - _threadPerConnection(threadPerConnection) + _selType(selType) { _endpointsIter = _endpoints.begin(); } @@ -973,7 +938,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectors(const vector for(vector<ConnectorPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p) { - _connectors.push_back(ConnectorInfo(*p, *_endpointsIter, _threadPerConnection)); + _connectors.push_back(ConnectorInfo(*p, *_endpointsIter)); } if(++_endpointsIter != _endpoints.end()) @@ -1091,7 +1056,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector() try { assert(_iter != _connectors.end()); - connection = _factory->createConnection(_iter->connector->connect(0), *_iter); + connection = _factory->createConnection(_iter->connector->connect(), *_iter); connection->start(this); } catch(const Ice::LocalException& ex) @@ -1160,9 +1125,7 @@ IceInternal::IncomingConnectionFactory::waitUntilHolding() const void IceInternal::IncomingConnectionFactory::waitUntilFinished() { - IceUtil::ThreadPtr threadPerIncomingConnectionFactory; list<ConnectionIPtr> connections; - { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -1175,9 +1138,6 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished() wait(); } - threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory; - _threadPerIncomingConnectionFactory = 0; - // // Clear the OA. See bug 1673 for the details of why this is necessary. // @@ -1190,11 +1150,6 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished() connections = _connections; } - if(threadPerIncomingConnectionFactory) - { - threadPerIncomingConnectionFactory->getThreadControl().join(); - } - for_each(connections.begin(), connections.end(), Ice::voidMemFun(&ConnectionI::waitUntilFinished)); { @@ -1247,21 +1202,18 @@ IceInternal::IncomingConnectionFactory::flushBatchRequests() bool IceInternal::IncomingConnectionFactory::datagram() const { - assert(!_threadPerConnection); // Only for use with a thread pool. return _endpoint->datagram(); } bool IceInternal::IncomingConnectionFactory::readable() const { - assert(!_threadPerConnection); // Only for use with a thread pool. return false; } bool IceInternal::IncomingConnectionFactory::read(BasicStream&) { - assert(!_threadPerConnection); // Only for use with a thread pool. assert(false); // Must not be called, readable() returns false. return false; } @@ -1288,8 +1240,6 @@ private: void IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPtr& threadPool) { - assert(!_threadPerConnection); // Only for use with a thread pool. - ConnectionIPtr connection; { @@ -1324,7 +1274,7 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt TransceiverPtr transceiver; try { - transceiver = _acceptor->accept(0); + transceiver = _acceptor->accept(); } catch(const SocketException&) { @@ -1351,8 +1301,7 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt try { - assert(!_threadPerConnection); - connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, false, 0); + connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter); } catch(const LocalException& ex) { @@ -1384,22 +1333,17 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt void IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool) { - assert(!_threadPerConnection); // Only for use with a thread pool. - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); threadPool->promoteFollower(); assert(threadPool.get() == dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool().get()); - - --_finishedCount; + assert(_state == StateClosed); - if(_finishedCount == 0 && _state == StateClosed) - { - dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->decFdsInUse(); - _acceptor->close(); - _acceptor = 0; - notifyAll(); - } + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->decFdsInUse(); + _acceptor->close(); + _acceptor = 0; + _fd = 0; + notifyAll(); } void @@ -1471,8 +1415,6 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance EventHandler(instance), _endpoint(endpoint), _adapter(adapter), - _registeredWithPool(false), - _finishedCount(0), _warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0), _state(StateHolding) { @@ -1489,8 +1431,6 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance } ObjectAdapterI* adapterImpl = dynamic_cast<ObjectAdapterI*>(_adapter.get()); - _threadPerConnection = adapterImpl->getThreadPerConnection(); - _threadPerConnectionStackSize = adapterImpl->getThreadPerConnectionStackSize(); const_cast<TransceiverPtr&>(_transceiver) = _endpoint->transceiver(const_cast<EndpointIPtr&>(_endpoint)); if(_transceiver) @@ -1499,8 +1439,7 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance try { - connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter, _threadPerConnection, - _threadPerConnectionStackSize); + connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter); } catch(const LocalException&) { @@ -1524,33 +1463,15 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance _acceptor = _endpoint->acceptor(const_cast<EndpointIPtr&>(_endpoint), adapterName); assert(_acceptor); _acceptor->listen(); + _fd = _acceptor->fd(); __setNoDelete(true); try { - if(_threadPerConnection) - { - // - // If we are in thread per connection mode, we also use - // one thread per incoming connection factory, that - // accepts new connections on this endpoint. - // - _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this); - _threadPerIncomingConnectionFactory->start(_threadPerConnectionStackSize); - } - else - { - adapterImpl->getThreadPool()->incFdsInUse(); - } + adapterImpl->getThreadPool()->incFdsInUse(); } catch(const IceUtil::Exception& ex) { - if(_threadPerConnection) - { - Error out(_instance->initializationData().logger); - out << "cannot create thread for incoming connection factory:\n" << ex; - } - try { _acceptor->close(); @@ -1572,7 +1493,6 @@ IceInternal::IncomingConnectionFactory::~IncomingConnectionFactory() assert(_state == StateClosed); assert(!_acceptor); assert(_connections.empty()); - assert(!_threadPerIncomingConnectionFactory); } void @@ -1591,9 +1511,9 @@ IceInternal::IncomingConnectionFactory::setState(State state) { return; } - if(!_threadPerConnection && _acceptor) + if(_acceptor) { - registerWithPool(); + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->_register(this); } for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::activate)); break; @@ -1605,9 +1525,9 @@ IceInternal::IncomingConnectionFactory::setState(State state) { return; } - if(!_threadPerConnection && _acceptor) + if(_acceptor) { - unregisterWithPool(); + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->unregister(this); } for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::hold)); break; @@ -1617,25 +1537,7 @@ IceInternal::IncomingConnectionFactory::setState(State state) { if(_acceptor) { - if(_threadPerConnection) - { - // - // If we are in thread per connection mode, we connect - // to our own acceptor, which unblocks our thread per - // incoming connection factory stuck in accept(). - // - _acceptor->connectToSelf(); - } - else - { - // - // Otherwise we first must make sure that we are - // registered, then we unregister, and let finished() - // do the close. - // - registerWithPool(); - unregisterWithPool(); - } + dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->finish(this); } #ifdef _STLP_BEGIN_NAMESPACE @@ -1654,183 +1556,3 @@ IceInternal::IncomingConnectionFactory::setState(State state) notifyAll(); } -void -IceInternal::IncomingConnectionFactory::registerWithPool() -{ - assert(!_threadPerConnection); // Only for use with a thread pool. - assert(_acceptor); // Not for datagram connections. - - if(!_registeredWithPool) - { - dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->_register(_acceptor->fd(), this); - _registeredWithPool = true; - } -} - -void -IceInternal::IncomingConnectionFactory::unregisterWithPool() -{ - assert(!_threadPerConnection); // Only for use with a thread pool. - assert(_acceptor); // Not for datagram connections. - - if(_registeredWithPool) - { - dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->unregister(_acceptor->fd()); - _registeredWithPool = false; - ++_finishedCount; // For each unregistration, finished() is called once. - } -} - -void -IceInternal::IncomingConnectionFactory::run() -{ - assert(_acceptor); - - while(true) - { - // - // We must accept new connections outside the thread - // synchronization, because we use blocking accept. - // - TransceiverPtr transceiver; - try - { - transceiver = _acceptor->accept(-1); - } - catch(const SocketException&) - { - // Ignore socket exceptions. - } - catch(const TimeoutException&) - { - // Ignore timeouts. - } - catch(const LocalException& ex) - { - // Warn about other Ice local exceptions. - if(_warn) - { - Warning out(_instance->initializationData().logger); - out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); - } - } - - ConnectionIPtr connection; - - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - while(_state == StateHolding) - { - wait(); - } - - if(_state == StateClosed) - { - if(transceiver) - { - try - { - transceiver->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - } - - try - { - _acceptor->close(); - } - catch(const LocalException& ex) - { - _acceptor = 0; - notifyAll(); - ex.ice_throw(); - } - - _acceptor = 0; - notifyAll(); - return; - } - - assert(_state == StateActive); - - // - // Reap connections for which destruction has completed. - // - _connections.erase(remove_if(_connections.begin(), _connections.end(), - Ice::constMemFun(&ConnectionI::isFinished)), - _connections.end()); - - // - // Create a connection object for the connection. - // - if(!transceiver) - { - continue; - } - - try - { - connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection, - _threadPerConnectionStackSize); - } - catch(const LocalException& ex) - { - try - { - transceiver->close(); - } - catch(const Ice::LocalException&) - { - } - - if(_warn) - { - Warning out(_instance->initializationData().logger); - out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); - } - continue; - } - - _connections.push_back(connection); - } - - // - // In thread-per-connection mode and regardless of the background mode, - // start() doesn't block. The connection thread is started and takes - // care of the connection validation and notifies the factory through - // the callback when it's done. - // - connection->start(this); - } -} - -IceInternal::IncomingConnectionFactory::ThreadPerIncomingConnectionFactory::ThreadPerIncomingConnectionFactory( - const IncomingConnectionFactoryPtr& factory) : - _factory(factory) -{ -} - -void -IceInternal::IncomingConnectionFactory::ThreadPerIncomingConnectionFactory::run() -{ - try - { - _factory->run(); - } - catch(const std::exception& ex) - { - Error out(_factory->_instance->initializationData().logger); - out << "exception in thread per incoming connection factory:\n" << _factory->toString() << ex.what(); - } - catch(...) - { - Error out(_factory->_instance->initializationData().logger); - out << "unknown exception in thread per incoming connection factory:\n" << _factory->toString(); - } - - _factory = 0; // Resolve cyclic dependency. -} |