summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionFactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp338
1 files changed, 30 insertions, 308 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 3b0da54812d..9ee0e53b924 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -52,15 +52,6 @@ struct RandomNumberGenerator : public std::unary_function<ptrdiff_t, ptrdiff_t>
bool
IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator<(const ConnectorInfo& other) const
{
- if(!threadPerConnection && other.threadPerConnection)
- {
- return true;
- }
- else if(other.threadPerConnection < threadPerConnection)
- {
- return false;
- }
-
return connector < other.connector;
}
@@ -126,8 +117,7 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished()
ConnectionIPtr
IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore,
- bool threadPerConnection, Ice::EndpointSelectionType selType,
- bool& compress)
+ Ice::EndpointSelectionType selType, bool& compress)
{
assert(!endpts.empty());
@@ -139,7 +129,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
//
// Try to find a connection to one of the given endpoints.
//
- Ice::ConnectionIPtr connection = findConnection(endpoints, threadPerConnection, compress);
+ Ice::ConnectionIPtr connection = findConnection(endpoints, compress);
if(connection)
{
return connection;
@@ -171,7 +161,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
for(vector<ConnectorPtr>::const_iterator r = cons.begin(); r != cons.end(); ++r)
{
assert(*r);
- connectors.push_back(ConnectorInfo(*r, *p, threadPerConnection));
+ connectors.push_back(ConnectorInfo(*r, *p));
}
}
catch(const Ice::LocalException& ex)
@@ -207,21 +197,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
{
try
{
- int timeout;
- if(defaultsAndOverrides->overrideConnectTimeout)
- {
- timeout = defaultsAndOverrides->overrideConnectTimeoutValue;
- }
- else
- {
- //
- // It is not necessary to check for overrideTimeout, the endpoint has already
- // been modified with this override, if set.
- //
- timeout = q->endpoint->timeout();
- }
-
- connection = createConnection(q->connector->connect(timeout), *q);
+ connection = createConnection(q->connector->connect(), *q);
connection->start(0);
if(defaultsAndOverrides->overrideCompress)
@@ -267,7 +243,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
void
IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore,
- bool tpc, Ice::EndpointSelectionType selType,
+ Ice::EndpointSelectionType selType,
const CreateConnectionCallbackPtr& callback)
{
assert(!endpts.empty());
@@ -283,7 +259,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
try
{
bool compress;
- Ice::ConnectionIPtr connection = findConnection(endpoints, tpc, compress);
+ Ice::ConnectionIPtr connection = findConnection(endpoints, compress);
if(connection)
{
callback->setConnection(connection, compress);
@@ -296,7 +272,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
return;
}
- ConnectCallbackPtr cb = new ConnectCallback(this, endpoints, hasMore, callback, selType, tpc);
+ ConnectCallbackPtr cb = new ConnectCallback(this, endpoints, hasMore, callback, selType);
cb->getConnectors();
}
@@ -454,7 +430,7 @@ IceInternal::OutgoingConnectionFactory::applyOverrides(const vector<EndpointIPtr
}
ConnectionIPtr
-IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr>& endpoints, bool tpc, bool& compress)
+IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr>& endpoints, bool& compress)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(_destroyed)
@@ -471,8 +447,7 @@ IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr
for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = pr.first; q != pr.second; ++q)
{
- if(q->second->isActiveOrHolding() &&
- q->second->threadPerConnection() == tpc) // Don't return destroyed or un-validated connections
+ if(q->second->isActiveOrHolding()) // Don't return destroyed or un-validated connections
{
if(defaultsAndOverrides->overrideCompress)
{
@@ -731,9 +706,7 @@ IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& t
throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
}
- Ice::ConnectionIPtr connection = new ConnectionI(_instance, transceiver, ci.endpoint->compress(false),
- 0, ci.threadPerConnection,
- _instance->threadPerConnectionStackSize());
+ Ice::ConnectionIPtr connection = new ConnectionI(_instance, transceiver, ci.endpoint->compress(false), 0);
_connections.insert(pair<const ConnectorInfo, ConnectionIPtr>(ci, connection));
_connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(ci.endpoint, connection));
return connection;
@@ -832,11 +805,6 @@ IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex
// If the connection is finished, we remove it right away instead of
// waiting for the reaping.
//
- // NOTE: it's possible for the connection to not be finished yet. That's
- // for instance the case when using thread per connection and if it's the
- // thread which is calling back the outgoing connection factory to notify
- // it of the failure.
- //
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator,
@@ -898,15 +866,12 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const O
const vector<EndpointIPtr>& endpoints,
bool hasMore,
const CreateConnectionCallbackPtr& cb,
- Ice::EndpointSelectionType selType,
- bool threadPerConnection) :
+ Ice::EndpointSelectionType selType) :
_factory(factory),
- _selectorThread(_factory->_instance->selectorThread()),
_endpoints(endpoints),
_hasMore(hasMore),
_callback(cb),
- _selType(selType),
- _threadPerConnection(threadPerConnection)
+ _selType(selType)
{
_endpointsIter = _endpoints.begin();
}
@@ -973,7 +938,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::connectors(const vector
for(vector<ConnectorPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p)
{
- _connectors.push_back(ConnectorInfo(*p, *_endpointsIter, _threadPerConnection));
+ _connectors.push_back(ConnectorInfo(*p, *_endpointsIter));
}
if(++_endpointsIter != _endpoints.end())
@@ -1091,7 +1056,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector()
try
{
assert(_iter != _connectors.end());
- connection = _factory->createConnection(_iter->connector->connect(0), *_iter);
+ connection = _factory->createConnection(_iter->connector->connect(), *_iter);
connection->start(this);
}
catch(const Ice::LocalException& ex)
@@ -1160,9 +1125,7 @@ IceInternal::IncomingConnectionFactory::waitUntilHolding() const
void
IceInternal::IncomingConnectionFactory::waitUntilFinished()
{
- IceUtil::ThreadPtr threadPerIncomingConnectionFactory;
list<ConnectionIPtr> connections;
-
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -1175,9 +1138,6 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished()
wait();
}
- threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory;
- _threadPerIncomingConnectionFactory = 0;
-
//
// Clear the OA. See bug 1673 for the details of why this is necessary.
//
@@ -1190,11 +1150,6 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished()
connections = _connections;
}
- if(threadPerIncomingConnectionFactory)
- {
- threadPerIncomingConnectionFactory->getThreadControl().join();
- }
-
for_each(connections.begin(), connections.end(), Ice::voidMemFun(&ConnectionI::waitUntilFinished));
{
@@ -1247,21 +1202,18 @@ IceInternal::IncomingConnectionFactory::flushBatchRequests()
bool
IceInternal::IncomingConnectionFactory::datagram() const
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
return _endpoint->datagram();
}
bool
IceInternal::IncomingConnectionFactory::readable() const
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
return false;
}
bool
IceInternal::IncomingConnectionFactory::read(BasicStream&)
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
assert(false); // Must not be called, readable() returns false.
return false;
}
@@ -1288,8 +1240,6 @@ private:
void
IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPtr& threadPool)
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
ConnectionIPtr connection;
{
@@ -1324,7 +1274,7 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
TransceiverPtr transceiver;
try
{
- transceiver = _acceptor->accept(0);
+ transceiver = _acceptor->accept();
}
catch(const SocketException&)
{
@@ -1351,8 +1301,7 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
try
{
- assert(!_threadPerConnection);
- connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, false, 0);
+ connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter);
}
catch(const LocalException& ex)
{
@@ -1384,22 +1333,17 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt
void
IceInternal::IncomingConnectionFactory::finished(const ThreadPoolPtr& threadPool)
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
threadPool->promoteFollower();
assert(threadPool.get() == dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool().get());
-
- --_finishedCount;
+ assert(_state == StateClosed);
- if(_finishedCount == 0 && _state == StateClosed)
- {
- dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->decFdsInUse();
- _acceptor->close();
- _acceptor = 0;
- notifyAll();
- }
+ dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->decFdsInUse();
+ _acceptor->close();
+ _acceptor = 0;
+ _fd = 0;
+ notifyAll();
}
void
@@ -1471,8 +1415,6 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
EventHandler(instance),
_endpoint(endpoint),
_adapter(adapter),
- _registeredWithPool(false),
- _finishedCount(0),
_warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0),
_state(StateHolding)
{
@@ -1489,8 +1431,6 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
}
ObjectAdapterI* adapterImpl = dynamic_cast<ObjectAdapterI*>(_adapter.get());
- _threadPerConnection = adapterImpl->getThreadPerConnection();
- _threadPerConnectionStackSize = adapterImpl->getThreadPerConnectionStackSize();
const_cast<TransceiverPtr&>(_transceiver) = _endpoint->transceiver(const_cast<EndpointIPtr&>(_endpoint));
if(_transceiver)
@@ -1499,8 +1439,7 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
try
{
- connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter, _threadPerConnection,
- _threadPerConnectionStackSize);
+ connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter);
}
catch(const LocalException&)
{
@@ -1524,33 +1463,15 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance
_acceptor = _endpoint->acceptor(const_cast<EndpointIPtr&>(_endpoint), adapterName);
assert(_acceptor);
_acceptor->listen();
+ _fd = _acceptor->fd();
__setNoDelete(true);
try
{
- if(_threadPerConnection)
- {
- //
- // If we are in thread per connection mode, we also use
- // one thread per incoming connection factory, that
- // accepts new connections on this endpoint.
- //
- _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(this);
- _threadPerIncomingConnectionFactory->start(_threadPerConnectionStackSize);
- }
- else
- {
- adapterImpl->getThreadPool()->incFdsInUse();
- }
+ adapterImpl->getThreadPool()->incFdsInUse();
}
catch(const IceUtil::Exception& ex)
{
- if(_threadPerConnection)
- {
- Error out(_instance->initializationData().logger);
- out << "cannot create thread for incoming connection factory:\n" << ex;
- }
-
try
{
_acceptor->close();
@@ -1572,7 +1493,6 @@ IceInternal::IncomingConnectionFactory::~IncomingConnectionFactory()
assert(_state == StateClosed);
assert(!_acceptor);
assert(_connections.empty());
- assert(!_threadPerIncomingConnectionFactory);
}
void
@@ -1591,9 +1511,9 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
return;
}
- if(!_threadPerConnection && _acceptor)
+ if(_acceptor)
{
- registerWithPool();
+ dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->_register(this);
}
for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::activate));
break;
@@ -1605,9 +1525,9 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
return;
}
- if(!_threadPerConnection && _acceptor)
+ if(_acceptor)
{
- unregisterWithPool();
+ dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->unregister(this);
}
for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::hold));
break;
@@ -1617,25 +1537,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
{
if(_acceptor)
{
- if(_threadPerConnection)
- {
- //
- // If we are in thread per connection mode, we connect
- // to our own acceptor, which unblocks our thread per
- // incoming connection factory stuck in accept().
- //
- _acceptor->connectToSelf();
- }
- else
- {
- //
- // Otherwise we first must make sure that we are
- // registered, then we unregister, and let finished()
- // do the close.
- //
- registerWithPool();
- unregisterWithPool();
- }
+ dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->finish(this);
}
#ifdef _STLP_BEGIN_NAMESPACE
@@ -1654,183 +1556,3 @@ IceInternal::IncomingConnectionFactory::setState(State state)
notifyAll();
}
-void
-IceInternal::IncomingConnectionFactory::registerWithPool()
-{
- assert(!_threadPerConnection); // Only for use with a thread pool.
- assert(_acceptor); // Not for datagram connections.
-
- if(!_registeredWithPool)
- {
- dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->_register(_acceptor->fd(), this);
- _registeredWithPool = true;
- }
-}
-
-void
-IceInternal::IncomingConnectionFactory::unregisterWithPool()
-{
- assert(!_threadPerConnection); // Only for use with a thread pool.
- assert(_acceptor); // Not for datagram connections.
-
- if(_registeredWithPool)
- {
- dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->unregister(_acceptor->fd());
- _registeredWithPool = false;
- ++_finishedCount; // For each unregistration, finished() is called once.
- }
-}
-
-void
-IceInternal::IncomingConnectionFactory::run()
-{
- assert(_acceptor);
-
- while(true)
- {
- //
- // We must accept new connections outside the thread
- // synchronization, because we use blocking accept.
- //
- TransceiverPtr transceiver;
- try
- {
- transceiver = _acceptor->accept(-1);
- }
- catch(const SocketException&)
- {
- // Ignore socket exceptions.
- }
- catch(const TimeoutException&)
- {
- // Ignore timeouts.
- }
- catch(const LocalException& ex)
- {
- // Warn about other Ice local exceptions.
- if(_warn)
- {
- Warning out(_instance->initializationData().logger);
- out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
- }
- }
-
- ConnectionIPtr connection;
-
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- while(_state == StateHolding)
- {
- wait();
- }
-
- if(_state == StateClosed)
- {
- if(transceiver)
- {
- try
- {
- transceiver->close();
- }
- catch(const LocalException&)
- {
- // Here we ignore any exceptions in close().
- }
- }
-
- try
- {
- _acceptor->close();
- }
- catch(const LocalException& ex)
- {
- _acceptor = 0;
- notifyAll();
- ex.ice_throw();
- }
-
- _acceptor = 0;
- notifyAll();
- return;
- }
-
- assert(_state == StateActive);
-
- //
- // Reap connections for which destruction has completed.
- //
- _connections.erase(remove_if(_connections.begin(), _connections.end(),
- Ice::constMemFun(&ConnectionI::isFinished)),
- _connections.end());
-
- //
- // Create a connection object for the connection.
- //
- if(!transceiver)
- {
- continue;
- }
-
- try
- {
- connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection,
- _threadPerConnectionStackSize);
- }
- catch(const LocalException& ex)
- {
- try
- {
- transceiver->close();
- }
- catch(const Ice::LocalException&)
- {
- }
-
- if(_warn)
- {
- Warning out(_instance->initializationData().logger);
- out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
- }
- continue;
- }
-
- _connections.push_back(connection);
- }
-
- //
- // In thread-per-connection mode and regardless of the background mode,
- // start() doesn't block. The connection thread is started and takes
- // care of the connection validation and notifies the factory through
- // the callback when it's done.
- //
- connection->start(this);
- }
-}
-
-IceInternal::IncomingConnectionFactory::ThreadPerIncomingConnectionFactory::ThreadPerIncomingConnectionFactory(
- const IncomingConnectionFactoryPtr& factory) :
- _factory(factory)
-{
-}
-
-void
-IceInternal::IncomingConnectionFactory::ThreadPerIncomingConnectionFactory::run()
-{
- try
- {
- _factory->run();
- }
- catch(const std::exception& ex)
- {
- Error out(_factory->_instance->initializationData().logger);
- out << "exception in thread per incoming connection factory:\n" << _factory->toString() << ex.what();
- }
- catch(...)
- {
- Error out(_factory->_instance->initializationData().logger);
- out << "unknown exception in thread per incoming connection factory:\n" << _factory->toString();
- }
-
- _factory = 0; // Resolve cyclic dependency.
-}