diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 317 |
1 files changed, 170 insertions, 147 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index d9ea5f2849f..7088fe259a4 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -94,119 +94,132 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() ConnectionPtr IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpoints) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + ConnectionPtr connection; - if(!_instance) { - throw CommunicatorDestroyedException(__FILE__, __LINE__); - } - - assert(!endpoints.empty()); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // Reap connections for which destruction has completed. - // - std::map<EndpointPtr, ConnectionPtr>::iterator p = _connections.begin(); - while(p != _connections.end()) - { - if(p->second->isFinished()) - { - _connections.erase(p++); - } - else + if(!_instance) { - ++p; + throw CommunicatorDestroyedException(__FILE__, __LINE__); } - } - // - // Search for existing connections. - // - DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); - vector<EndpointPtr>::const_iterator q; - for(q = endpoints.begin(); q != endpoints.end(); ++q) - { - EndpointPtr endpoint = *q; - if(defaultsAndOverrides->overrideTimeout) - { - endpoint = endpoint->timeout(defaultsAndOverrides->overrideTimeoutValue); - } + assert(!endpoints.empty()); - map<EndpointPtr, ConnectionPtr>::const_iterator r = _connections.find(endpoint); - if(r != _connections.end()) + // + // Reap connections for which destruction has completed. + // + std::map<EndpointPtr, ConnectionPtr>::iterator p = _connections.begin(); + while(p != _connections.end()) { - // - // Don't return connections for which destruction has been - // initiated. - // - if(!r->second->isDestroyed()) + if(p->second->isFinished()) { - return r->second; + _connections.erase(p++); + } + else + { + ++p; } } - } - // - // No connections exist, try to create one. - // - TraceLevelsPtr traceLevels = _instance->traceLevels(); - LoggerPtr logger = _instance->logger(); - - ConnectionPtr connection; - auto_ptr<LocalException> exception; - q = endpoints.begin(); - while(q != endpoints.end()) - { - EndpointPtr endpoint = *q; - if(defaultsAndOverrides->overrideTimeout) + // + // Search for existing connections. + // + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + vector<EndpointPtr>::const_iterator q; + for(q = endpoints.begin(); q != endpoints.end(); ++q) { - endpoint = endpoint->timeout(defaultsAndOverrides->overrideTimeoutValue); - } + EndpointPtr endpoint = *q; + if(defaultsAndOverrides->overrideTimeout) + { + endpoint = endpoint->timeout(defaultsAndOverrides->overrideTimeoutValue); + } - try - { - TransceiverPtr transceiver = endpoint->clientTransceiver(); - if(!transceiver) + map<EndpointPtr, ConnectionPtr>::const_iterator r = _connections.find(endpoint); + if(r != _connections.end()) { - ConnectorPtr connector = endpoint->connector(); - assert(connector); - transceiver = connector->connect(endpoint->timeout()); - assert(transceiver); - } - connection = new Connection(_instance, transceiver, endpoint, 0); - connection->validate(); - connection->activate(); - _connections.insert(make_pair(endpoint, connection)); - break; - } - catch(const LocalException& ex) - { - exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); + // + // Don't return connections for which destruction has been + // initiated. + // + if(!r->second->isDestroyed()) + { + connection = r->second; + break; + } + } } - ++q; - - if(traceLevels->retry >= 2) + // + // No connections exist, try to create one. + // + if(!connection) { - Trace out(logger, traceLevels->retryCat); - out << "connection to endpoint failed"; - if(q != endpoints.end()) + TraceLevelsPtr traceLevels = _instance->traceLevels(); + LoggerPtr logger = _instance->logger(); + + auto_ptr<LocalException> exception; + q = endpoints.begin(); + while(q != endpoints.end()) { - out << ", trying next endpoint\n"; + EndpointPtr endpoint = *q; + if(defaultsAndOverrides->overrideTimeout) + { + endpoint = endpoint->timeout(defaultsAndOverrides->overrideTimeoutValue); + } + + try + { + TransceiverPtr transceiver = endpoint->clientTransceiver(); + if(!transceiver) + { + ConnectorPtr connector = endpoint->connector(); + assert(connector); + transceiver = connector->connect(endpoint->timeout()); + assert(transceiver); + } + connection = new Connection(_instance, transceiver, endpoint, 0); + _connections.insert(make_pair(endpoint, connection)); + break; + } + catch(const LocalException& ex) + { + exception = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); + } + + ++q; + + if(traceLevels->retry >= 2) + { + Trace out(logger, traceLevels->retryCat); + out << "connection to endpoint failed"; + if(q != endpoints.end()) + { + out << ", trying next endpoint\n"; + } + else + { + out << " and no more endpoints to try\n"; + } + out << *exception.get(); + } } - else + + if(!connection) { - out << " and no more endpoints to try\n"; + assert(exception.get()); + exception->ice_throw(); } - out << *exception.get(); } } - if(!connection) - { - assert(exception.get()); - exception->ice_throw(); - } + // + // We validate and activate outside the thread synchronization, to + // not block the factory. + // + assert(connection); + connection->validate(); + connection->activate(); return connection; } @@ -397,80 +410,90 @@ IceInternal::IncomingConnectionFactory::read(BasicStream&) void IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPtr& threadPool) { - ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this); - - if(_state != StateActive) - { - IceUtil::ThreadControl::yield(); - threadPool->promoteFollower(); - return; - } - - // - // Reap connections for which destruction has completed. - // - _connections.erase(remove_if(_connections.begin(), _connections.end(), - ::Ice::constMemFun(&Connection::isFinished)), - _connections.end()); + ConnectionPtr connection; - // - // Now accept a new connection. - // - TransceiverPtr transceiver; - try - { - transceiver = _acceptor->accept(0); - } - catch(const SocketException&) - { - // TODO: bandaid. Takes care of SSL Handshake problems during - // creation of a Transceiver. Ignore, nothing we can do here. - threadPool->promoteFollower(); - return; - } - catch(const TimeoutException&) - { - // Ignore timeouts. - threadPool->promoteFollower(); - return; - } - catch(const LocalException& ex) { - // Warn about other Ice local exceptions. - if(_warn) + ::IceUtil::Monitor< ::IceUtil::Mutex>::Lock sync(*this); + + if(_state != StateActive) { - Warning out(_instance->logger()); - out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + IceUtil::ThreadControl::yield(); + threadPool->promoteFollower(); + return; } + + // + // Reap connections for which destruction has completed. + // + _connections.erase(remove_if(_connections.begin(), _connections.end(), + ::Ice::constMemFun(&Connection::isFinished)), + _connections.end()); + + // + // Now accept a new connection. + // + TransceiverPtr transceiver; + try + { + transceiver = _acceptor->accept(0); + } + catch(const SocketException&) + { + // TODO: bandaid. Takes care of SSL Handshake problems during + // creation of a Transceiver. Ignore, nothing we can do here. + threadPool->promoteFollower(); + return; + } + catch(const TimeoutException&) + { + // Ignore timeouts. + threadPool->promoteFollower(); + return; + } + catch(const LocalException& ex) + { + // Warn about other Ice local exceptions. + if(_warn) + { + Warning out(_instance->logger()); + out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + } + threadPool->promoteFollower(); + return; + } + catch(...) + { + threadPool->promoteFollower(); + throw; + } + + // + // We must promote a follower after we accepted a new connection. + // threadPool->promoteFollower(); - return; - } - catch(...) - { - threadPool->promoteFollower(); - throw; + + // + // Create a connection object for the connection. + // + assert(transceiver); + connection = new Connection(_instance, transceiver, _endpoint, _adapter); + _connections.push_back(connection); } - - // - // We must promote a follower after we accepted a new connection. - // - threadPool->promoteFollower(); - + // - // Create and activate a connection object for the connection. + // We validate and activate outside the thread synchronization, to + // not block the factory. // try { - assert(transceiver); - ConnectionPtr connection = new Connection(_instance, transceiver, _endpoint, _adapter); + assert(connection); connection->validate(); - connection->activate(); - _connections.push_back(connection); + connection->activate(); // The factory must be active at this point, so we activate the connection, too. } catch(const LocalException&) { // - // Ignore all exceptions while creating or activating the + // Ignore all exceptions while activating or validating the // connection object. Warning or error messages for such // exceptions must be printed directly in the connection // object code. |