diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 1212 |
1 files changed, 910 insertions, 302 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index af2b506193d..6d69a36cb84 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -38,6 +38,7 @@ IceUtil::Shared* IceInternal::upCast(IncomingConnectionFactory* p) { return p; } namespace { + struct RandomNumberGenerator : public std::unary_function<ptrdiff_t, ptrdiff_t> { ptrdiff_t operator()(ptrdiff_t d) @@ -45,6 +46,22 @@ struct RandomNumberGenerator : public std::unary_function<ptrdiff_t, ptrdiff_t> return IceUtil::random(static_cast<int>(d)); } }; + +} + +bool +IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator<(const ConnectorInfo& other) const +{ + if(!threadPerConnection && other.threadPerConnection) + { + return true; + } + else if(other.threadPerConnection < threadPerConnection) + { + return false; + } + + return connector < other.connector; } void @@ -60,11 +77,11 @@ IceInternal::OutgoingConnectionFactory::destroy() #ifdef _STLP_BEGIN_NAMESPACE // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h for_each(_connections.begin(), _connections.end(), - voidbind2nd(Ice::secondVoidMemFun1<ConnectorPtr, ConnectionI, ConnectionI::DestructionReason> + voidbind2nd(Ice::secondVoidMemFun1<ConnectorInfo, ConnectionI, ConnectionI::DestructionReason> (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); #else for_each(_connections.begin(), _connections.end(), - bind2nd(Ice::secondVoidMemFun1<const ConnectorPtr, ConnectionI, ConnectionI::DestructionReason> + bind2nd(Ice::secondVoidMemFun1<const ConnectorInfo, ConnectionI, ConnectionI::DestructionReason> (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); #endif @@ -75,7 +92,7 @@ IceInternal::OutgoingConnectionFactory::destroy() void IceInternal::OutgoingConnectionFactory::waitUntilFinished() { - multimap<ConnectorPtr, ConnectionIPtr> connections; + multimap<ConnectorInfo, ConnectionIPtr> connections; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -85,7 +102,7 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() // until there are no pending connections anymore. Only then // we can be sure the _connections contains all connections. // - while(!_destroyed || !_pending.empty()) + while(!_destroyed || !_pending.empty() || !_pendingEndpoints.empty()) { wait(); } @@ -94,311 +111,187 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() // We want to wait until all connections are finished outside the // thread synchronization. // - connections.swap(_connections); + connections = _connections; } for_each(connections.begin(), connections.end(), - Ice::secondVoidMemFun<const ConnectorPtr, ConnectionI>(&ConnectionI::waitUntilFinished)); + Ice::secondVoidMemFun<const ConnectorInfo, ConnectionI>(&ConnectionI::waitUntilFinished)); + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + _connections.clear(); + } } ConnectionIPtr -IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool moreEndpts, +IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore, bool threadPerConnection, Ice::EndpointSelectionType selType, bool& compress) { assert(!endpts.empty()); - vector<pair<ConnectorPtr, EndpointIPtr> > connectors; + + // + // Apply the overrides. + // + vector<EndpointIPtr> endpoints = applyOverrides(endpts); + // + // Try to find a connection to one of the given endpoints. + // + Ice::ConnectionIPtr connection = findConnection(endpoints, threadPerConnection, compress); + if(connection) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + return connection; + } - if(_destroyed) - { - throw CommunicatorDestroyedException(__FILE__, __LINE__); - } + auto_ptr<Ice::LocalException> exception; + // + // If we didn't find a connection with the endpoints, we create the connectors + // for the endpoints. + // + vector<ConnectorInfo> connectors; + for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { // - // Reap connections for which destruction has completed. + // Create connectors for the endpoint. // - std::multimap<ConnectorPtr, ConnectionIPtr>::iterator p = _connections.begin(); - while(p != _connections.end()) - { - if(p->second->isFinished()) - { - _connections.erase(p++); - } - else - { - ++p; - } - } - - vector<EndpointIPtr> endpoints = endpts; - vector<EndpointIPtr>::iterator q; - for(q = endpoints.begin(); q != endpoints.end(); ++q) + try { - // - // Modify endpoints with overrides. - // - if(_instance->defaultsAndOverrides()->overrideTimeout) - { - *q = (*q)->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); - } - - // - // Create connectors for the endpoints. - // - vector<ConnectorPtr> cons = (*q)->connectors(); - assert(cons.size() > 0); - - // - // Shuffle connectors is endpoint selection type is Random. - // + vector<ConnectorPtr> cons = (*p)->connectors(); + assert(!cons.empty()); + if(selType == Random) { RandomNumberGenerator rng; random_shuffle(cons.begin(), cons.end(), rng); } - - vector<ConnectorPtr>::const_iterator r; - for(r = cons.begin(); r != cons.end(); ++r) - { - connectors.push_back(make_pair(*r, *q)); - } - } - - // - // Search for existing connections. - // - vector<pair<ConnectorPtr, EndpointIPtr> >::const_iterator r; - for(r = connectors.begin(); r != connectors.end(); ++r) - { - pair<multimap<ConnectorPtr, ConnectionIPtr>::iterator, - multimap<ConnectorPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range((*r).first); - while(pr.first != pr.second) + for(vector<ConnectorPtr>::const_iterator r = cons.begin(); r != cons.end(); ++r) { - // - // Don't return connections for which destruction has - // been initiated. The connection must also match the - // requested thread-per-connection setting. - // - if(!pr.first->second->isDestroyed() && - pr.first->second->threadPerConnection() == threadPerConnection) - { - if(_instance->defaultsAndOverrides()->overrideCompress) - { - compress = _instance->defaultsAndOverrides()->overrideCompressValue; - } - else - { - compress = (*r).second->compress(); - } - - return pr.first->second; - } - - ++pr.first; + assert(*r); + connectors.push_back(ConnectorInfo(*r, *p, threadPerConnection)); } } - - // - // If some other thread is currently trying to establish a - // connection to any of our endpoints, we wait until this - // thread is finished. - // - bool searchAgain = false; - while(!_destroyed) + catch(const Ice::LocalException& ex) { - for(r = connectors.begin(); r != connectors.end(); ++r) - { - if(_pending.find((*r).first) != _pending.end()) - { - break; - } - } - - if(r == connectors.end()) - { - break; - } - - searchAgain = true; - - wait(); - } - - if(_destroyed) - { - throw CommunicatorDestroyedException(__FILE__, __LINE__); - } - - // - // Search for existing connections again if we waited above, - // as new connections might have been added in the meantime. - // - if(searchAgain) - { - for(r = connectors.begin(); r != connectors.end(); ++r) - { - pair<multimap<ConnectorPtr, ConnectionIPtr>::iterator, - multimap<ConnectorPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range((*r).first); - - while(pr.first != pr.second) - { - // - // Don't return connections for which destruction has - // been initiated. The connection must also match the - // requested thread-per-connection setting. - // - if(!pr.first->second->isDestroyed() && - pr.first->second->threadPerConnection() == threadPerConnection) - { - if(_instance->defaultsAndOverrides()->overrideCompress) - { - compress = _instance->defaultsAndOverrides()->overrideCompressValue; - } - else - { - compress = (*r).second->compress(); - } - - return pr.first->second; - } - - ++pr.first; - } - } - } - - // - // No connection to any of our endpoints exists yet, so we - // will try to create one. To avoid that other threads try to - // create connections to the same endpoints, we add our - // endpoints to _pending. - // - for(r = connectors.begin(); r != connectors.end(); ++r) - { - _pending.insert((*r).first); + exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + handleException(ex, hasMore || p != endpoints.end() - 1); } } - ConnectorPtr connector; - ConnectionIPtr connection; - auto_ptr<LocalException> exception; + if(connectors.empty()) + { + assert(exception.get()); + exception->ice_throw(); + } - vector<pair<ConnectorPtr, EndpointIPtr> >::const_iterator q; - for(q = connectors.begin(); q != connectors.end(); ++q) + // + // Try to get a connection to one of the connectors. A null result indicates that no + // connection was found and that we should try to establish the connection (and that + // the connectors were added to _pending to prevent other threads from establishing + // the connection). + // + connection = getConnection(connectors, 0, compress); + if(connection) + { + return connection; + } + + // + // Try to establish the connection to the connectors. + // + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) { - connector = (*q).first; - EndpointIPtr endpoint = (*q).second; - try { - int timeout; - if(_instance->defaultsAndOverrides()->overrideConnectTimeout) + if(defaultsAndOverrides->overrideConnectTimeout) { - timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue; + timeout = defaultsAndOverrides->overrideConnectTimeoutValue; } - // It is not necessary to check for overrideTimeout, - // the endpoint has already been modified with this - // override, if set. else { - timeout = endpoint->timeout(); + // + // It is not necessary to check for overrideTimeout, the endpoint has already + // been modified with this override, if set. + // + timeout = p->endpoint->timeout(); } - TransceiverPtr transceiver = connector->connect(timeout); - assert(transceiver); - - connection = new ConnectionI(_instance, transceiver, endpoint->compress(false), 0, threadPerConnection, - _instance->threadPerConnectionStackSize()); - connection->start(); - connection->validate(); + connection = createConnection(p->connector->connect(timeout), *p); + connection->start(0); - if(_instance->defaultsAndOverrides()->overrideCompress) + if(defaultsAndOverrides->overrideCompress) { - compress = _instance->defaultsAndOverrides()->overrideCompressValue; + compress = defaultsAndOverrides->overrideCompressValue; } else { - compress = endpoint->compress(); + compress = p->endpoint->compress(); } + break; } - catch(const LocalException& ex) + catch(const Ice::CommunicatorDestroyedException& ex) { - exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); - - // - // If a connection object was constructed, then validate() - // must have raised the exception. - // - if(connection) - { - connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup. - connection = 0; - } + exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + handleException(*exception.get(), *p, connection, hasMore || p != connectors.end() - 1); + connection = 0; + break; // No need to continue } - - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->retry >= 2) + catch(const Ice::LocalException& ex) { - Trace out(_instance->initializationData().logger, traceLevels->retryCat); - - out << "connection to endpoint failed"; - if(moreEndpts || q + 1 != connectors.end()) - { - out << ", trying next endpoint\n"; - } - else - { - out << " and no more endpoints to try\n"; - } - out << *exception.get(); + exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + handleException(*exception.get(), *p, connection, hasMore || p != connectors.end() - 1); + connection = 0; } } - - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // Signal other threads that we are done with trying to - // establish connections to our endpoints. - // - for(q = connectors.begin(); q != connectors.end(); ++q) - { - _pending.erase((*q).first); - } - notifyAll(); - if(!connection) - { - assert(exception.get()); - exception->ice_throw(); - } - else - { - _connections.insert(_connections.end(), pair<const ConnectorPtr, ConnectionIPtr>(connector, connection)); + // + // Finish creating the connection (this removes the connectors from the _pending + // list and notifies any waiting threads). + // + finishGetConnection(connectors, 0, connection); - if(_destroyed) - { - connection->destroy(ConnectionI::CommunicatorDestroyed); - throw CommunicatorDestroyedException(__FILE__, __LINE__); - } - else - { - connection->activate(); - } - } + if(!connection) + { + assert(exception.get()); + exception->ice_throw(); } - assert(connection); return connection; } void +IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, bool hasMore, + bool tpc, Ice::EndpointSelectionType selType, + const CreateConnectionCallbackPtr& callback) +{ + assert(!endpts.empty()); + + // + // Apply the overrides. + // + vector<EndpointIPtr> endpoints = applyOverrides(endpts); + + // + // Try to find a connection to one of the given endpoints. + // + bool compress; + Ice::ConnectionIPtr connection = findConnection(endpoints, tpc, compress); + if(connection) + { + callback->setConnection(connection, compress); + return; + } + + ConnectCallbackPtr cb = new ConnectCallback(this, endpoints, hasMore, callback, selType, tpc); + cb->getConnection(); +} + +void IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& routerInfo) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -442,14 +335,14 @@ IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& route // endpoint = endpoint->compress(false); - multimap<ConnectorPtr, ConnectionIPtr>::const_iterator q; + multimap<ConnectorInfo, ConnectionIPtr>::const_iterator q; for(q = _connections.begin(); q != _connections.end(); ++q) { - if((*q).second->endpoint() == endpoint) + if(q->second->endpoint() == endpoint) { try { - (*q).second->setAdapter(adapter); + q->second->setAdapter(adapter); } catch(const Ice::LocalException&) { @@ -472,7 +365,7 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad return; } - for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p) + for(multimap<ConnectorInfo, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p) { if(p->second->getAdapter() == adapter) { @@ -497,9 +390,7 @@ IceInternal::OutgoingConnectionFactory::flushBatchRequests() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - for(std::multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); - p != _connections.end(); + for(multimap<ConnectorInfo, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p) { c.push_back(p->second); @@ -531,6 +422,668 @@ IceInternal::OutgoingConnectionFactory::~OutgoingConnectionFactory() assert(_connections.empty()); } +vector<EndpointIPtr> +IceInternal::OutgoingConnectionFactory::applyOverrides(const vector<EndpointIPtr>& endpts) +{ + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + vector<EndpointIPtr> endpoints = endpts; + for(vector<EndpointIPtr>::iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { + // + // Modify endpoints with overrides. + // + if(defaultsAndOverrides->overrideTimeout) + { + *p = (*p)->timeout(defaultsAndOverrides->overrideTimeoutValue); + } + } + return endpoints; +} + +ConnectionIPtr +IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr>& endpoints, bool tpc, bool& compress) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + assert(!endpoints.empty()); + for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { + pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, + multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connectionsByEndpoint.equal_range(*p); + + for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = pr.first; q != pr.second; ++q) + { + if(q->second->isActiveOrHolding() && + q->second->threadPerConnection() == tpc) // Don't return destroyed or un-validated connections + { + if(defaultsAndOverrides->overrideCompress) + { + compress = defaultsAndOverrides->overrideCompressValue; + } + else + { + compress = (*p)->compress(); + } + return q->second; + } + } + } + return 0; +} + +ConnectionIPtr +IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInfo>& connectors, bool& compress) +{ + DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator, + multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(*p); + + if(pr.first == pr.second) + { + continue; + } + + for(multimap<ConnectorInfo, ConnectionIPtr>::iterator q = pr.first; q != pr.second; ++q) + { + if(q->second->isActiveOrHolding()) // Don't return destroyed or un-validated connections + { + if(q->second->endpoint() != p->endpoint) + { + _connectionsByEndpoint.insert(make_pair(p->endpoint, q->second)); + } + + if(defaultsAndOverrides->overrideCompress) + { + compress = defaultsAndOverrides->overrideCompressValue; + } + else + { + compress = p->endpoint->compress(); + } + return q->second; + } + } + } + + return 0; +} + +void +IceInternal::OutgoingConnectionFactory::addPendingEndpoints(const vector<EndpointIPtr>& endpoints) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_destroyed) + { + throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); + } + _pendingEndpoints.insert(endpoints.begin(), endpoints.end()); +} + +void +IceInternal::OutgoingConnectionFactory::removePendingEndpoints(const vector<EndpointIPtr>& endpoints) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { + assert(_pendingEndpoints.find(*p) != _pendingEndpoints.end()); + _pendingEndpoints.erase(_pendingEndpoints.find(*p)); + } + + if(_destroyed) + { + notifyAll(); + } +} + +ConnectionIPtr +IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo>& connectors, + const ConnectCallbackPtr& cb, bool& compress) +{ + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_destroyed) + { + throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); + } + + // + // Reap connections for which destruction has completed. + // + multimap<ConnectorInfo, ConnectionIPtr>::iterator p = _connections.begin(); + while(p != _connections.end()) + { + if(p->second->isFinished()) + { + _connections.erase(p++); + } + else + { + ++p; + } + } + + multimap<EndpointIPtr, ConnectionIPtr>::iterator q = _connectionsByEndpoint.begin(); + while(q != _connectionsByEndpoint.end()) + { + if(q->second->isFinished()) + { + _connectionsByEndpoint.erase(q++); + } + else + { + ++q; + } + } + + // + // Try to get the connection. We may need to wait for other threads to + // finish if one of them is currently establishing a connection to one + // of our connectors. + // + while(!_destroyed) + { + // + // Search for a matching connection. If we find one, we're done. + // + Ice::ConnectionIPtr connection = findConnection(connectors, compress); + if(connection) + { + if(cb) + { + // + // This might not be the first getConnection call for the callback. We need + // to ensure that the callback isn't registered with any other pending + // connectors since we just found a connection and therefore don't need to + // wait anymore for other pending connectors. + // + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); + if(q != _pending.end()) + { + q->second.erase(cb); + } + } + } + return connection; + } + + // + // Determine whether another thread is currently attempting to connect to one of our endpoints; + // if so we wait until it's done. + // + bool found = false; + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); + if(q != _pending.end()) + { + found = true; + if(cb) + { + q->second.insert(cb); // Add the callback to each pending connector. + } + } + } + + if(!found) + { + // + // If no thread is currently establishing a connection to one of our connectors, + // we get out of this loop and start the connection establishment to one of the + // given connectors. + // + break; + } + else + { + // + // If a callback is not specified we wait until another thread notifies us about a + // change to the pending list. Otherwise, if a callback is provided we're done: + // when the pending list changes the callback will be notified and will try to + // get the connection again. + // + if(!cb) + { + wait(); + } + else + { + return 0; + } + } + } + + if(_destroyed) + { + throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); + } + + // + // No connection to any of our endpoints exists yet; we add the given connectors to + // the _pending set to indicate that we're attempting connection establishment to + // these connectors. + // + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + assert(_pending.find(*p) == _pending.end()); + _pending.insert(pair<ConnectorInfo, set<ConnectCallbackPtr> >(*p, set<ConnectCallbackPtr>())); + } + } + + // + // At this point, we're responsible for establishing the connection to one of + // the given connectors. If it's a non-blocking connect, calling nextConnector + // will start the connection establishment. Otherwise, we return null to get + // the caller to establish the connection. + // + if(cb) + { + cb->nextConnector(); + } + + return 0; +} + +ConnectionIPtr +IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& transceiver, const ConnectorInfo& ci) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_pending.find(ci) != _pending.end() && transceiver); + + // + // Create and add the connection to the connection map. Adding the connection to the map + // is necessary to support the interruption of the connection initialization and validation + // in case the communicator is destroyed. + // + try + { + if(_destroyed) + { + throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); + } + + Ice::ConnectionIPtr connection = new ConnectionI(_instance, transceiver, ci.endpoint->compress(false), + 0, ci.threadPerConnection, + _instance->threadPerConnectionStackSize()); + _connections.insert(make_pair(ci, connection)); + _connectionsByEndpoint.insert(make_pair(ci.endpoint, connection)); + return connection; + } + catch(const Ice::LocalException&) + { + try + { + transceiver->close(); + } + catch(const Ice::LocalException&) + { + // Ignore + } + throw; + } +} + +void +IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors, + const ConnectCallbackPtr& cb, + const ConnectionIPtr& connection) +{ + vector<ConnectCallbackPtr> callbacks; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // We're done trying to connect to the given connectors so we remove the + // connectors from the pending list and notify waiting threads. We also + // notify the pending connect callbacks (outside the synchronization). + // + + for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) + { + map<ConnectorInfo, set<ConnectCallbackPtr> >::iterator q = _pending.find(*p); + assert(q != _pending.end()); + callbacks.insert(callbacks.end(), q->second.begin(), q->second.end()); + _pending.erase(q); + } + notifyAll(); + + // + // If the connect attempt succeeded and the communicator is not destroyed, + // activate the connection! + // + if(connection && !_destroyed) + { + connection->activate(); + } + } + + // + // Notify any waiting callbacks. + // + for(vector<ConnectCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p) + { + (*p)->getConnection(); + } +} + +void +IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex, const ConnectorInfo& ci, + const ConnectionIPtr& connection, bool hasMore) +{ + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->retry >= 2) + { + Trace out(_instance->initializationData().logger, traceLevels->retryCat); + + out << "connection to endpoint failed"; + if(dynamic_cast<const CommunicatorDestroyedException*>(&ex)) + { + out << "\n"; + } + else + { + if(hasMore) + { + out << ", trying next endpoint\n"; + } + else + { + out << " and no more endpoints to try\n"; + } + } + out << ex; + } + + if(connection && connection->isFinished()) + { + // + // If the connection is finished, we remove it right away instead of + // waiting for the reaping. + // + // NOTE: it's possible for the connection to not be finished yet. That's + // for instance the case when using thread per connection and if it's the + // thread which is calling back the outgoing connection factory to notify + // it of the failure. + // + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + pair<multimap<ConnectorInfo, ConnectionIPtr>::iterator, + multimap<ConnectorInfo, ConnectionIPtr>::iterator> pr = _connections.equal_range(ci); + + for(multimap<ConnectorInfo, ConnectionIPtr>::iterator p = pr.first; p != pr.second; ++p) + { + if(p->second == connection) + { + _connections.erase(p); + break; + } + } + + pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, + multimap<EndpointIPtr, ConnectionIPtr>::iterator> qr = _connectionsByEndpoint.equal_range(ci.endpoint); + + for(multimap<EndpointIPtr, ConnectionIPtr>::iterator q = qr.first; q != qr.second; ++q) + { + if(q->second == connection) + { + _connectionsByEndpoint.erase(q); + break; + } + } + } + } +} + +void +IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex, bool hasMore) +{ + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->retry >= 2) + { + Trace out(_instance->initializationData().logger, traceLevels->retryCat); + + out << "couldn't resolve endpoint host"; + if(dynamic_cast<const CommunicatorDestroyedException*>(&ex)) + { + out << "\n"; + } + else + { + if(hasMore) + { + out << ", trying next endpoint\n"; + } + else + { + out << " and no more endpoints to try\n"; + } + } + out << ex; + } +} + +IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const OutgoingConnectionFactoryPtr& factory, + const vector<EndpointIPtr>& endpoints, + bool hasMore, + const CreateConnectionCallbackPtr& cb, + Ice::EndpointSelectionType selType, + bool threadPerConnection) : + _factory(factory), + _selectorThread(_factory->_instance->selectorThread()), + _endpoints(endpoints), + _hasMore(hasMore), + _callback(cb), + _selType(selType), + _threadPerConnection(threadPerConnection) +{ + _endpointsIter = _endpoints.begin(); +} + +// +// Methods from ConnectionI.StartCallback +// +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartCompleted(const ConnectionIPtr& connection) +{ + assert(!_exception.get() && connection == _connection); + + bool compress; + DefaultsAndOverridesPtr defaultsAndOverrides = _factory->_instance->defaultsAndOverrides(); + if(defaultsAndOverrides->overrideCompress) + { + compress = defaultsAndOverrides->overrideCompressValue; + } + else + { + compress = _iter->endpoint->compress(); + } + + _factory->finishGetConnection(_connectors, this, connection); + _factory->removePendingEndpoints(_endpoints); + _callback->setConnection(connection, compress); +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(const ConnectionIPtr& connection, + const LocalException& ex) +{ + assert(!_exception.get() && connection == _connection); + + _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); + handleException(); +} + +// +// Methods from EndpointI_connectors +// +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::connectors(const vector<ConnectorPtr>& connectors) +{ + vector<ConnectorPtr> cons = connectors; + if(_selType == Random) + { + RandomNumberGenerator rng; + random_shuffle(cons.begin(), cons.end(), rng); + } + + for(vector<ConnectorPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p) + { + _connectors.push_back(ConnectorInfo(*p, *_endpointsIter, _threadPerConnection)); + } + + if(++_endpointsIter != _endpoints.end()) + { + (*_endpointsIter)->connectors_async(this); + } + else + { + assert(!_connectors.empty()); + + // + // We now have all the connectors for the given endpoints. We can try to obtain the + // connection. + // + _iter = _connectors.begin(); + getConnection(); + } +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::exception(const Ice::LocalException& ex) +{ + _factory->handleException(ex, _hasMore || _endpointsIter != _endpoints.end() - 1); + if(++_endpointsIter != _endpoints.end()) + { + (*_endpointsIter)->connectors_async(this); + } + else if(!_connectors.empty()) + { + // + // We now have all the connectors for the given endpoints. We can try to obtain the + // connection. + // + _iter = _connectors.begin(); + getConnection(); + } + else + { + _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + _factory->_instance->clientThreadPool()->execute(this); + } +} + +// +// Methods from ThreadPoolWorkItem +// +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::execute(const ThreadPoolPtr& threadPool) +{ + threadPool->promoteFollower(); + assert(_exception.get()); + _factory->removePendingEndpoints(_endpoints); + _callback->setException(*_exception.get()); +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection() +{ + // + // First, get the connectors for all the endpoints. + // + if(_endpointsIter != _endpoints.end()) + { + try + { + _factory->addPendingEndpoints(_endpoints); + (*_endpointsIter)->connectors_async(this); + } + catch(const Ice::LocalException& ex) + { + _callback->setException(ex); + } + return; + } + + try + { + // + // If all the connectors have been created, we ask the factory to get a + // connection. + // + bool compress; + Ice::ConnectionIPtr connection = _factory->getConnection(_connectors, this, compress); + if(!connection) + { + // + // A null return value from getConnection indicates that the connection + // is being established and that everthing has been done to ensure that + // the callback will be notified when the connection establishment is + // done. + // + return; + } + + _factory->removePendingEndpoints(_endpoints); + _callback->setConnection(connection, compress); + } + catch(const Ice::LocalException& ex) + { + _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + _factory->_instance->clientThreadPool()->execute(this); + } +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector() +{ + try + { + _exception.reset(0); + _connection = _factory->createConnection(_iter->connector->connect(0), *_iter); + _connection->start(this); + } + catch(const Ice::LocalException& ex) + { + _exception.reset(dynamic_cast<Ice::LocalException*>(ex.ice_clone())); + handleException(); + } +} + +void +IceInternal::OutgoingConnectionFactory::ConnectCallback::handleException() +{ + assert(_iter != _connectors.end() && _exception.get()); + + _factory->handleException(*_exception.get(), *_iter, _connection, _hasMore || _iter != _connectors.end() - 1); + if(dynamic_cast<Ice::CommunicatorDestroyedException*>(_exception.get())) // No need to continue. + { + _factory->finishGetConnection(_connectors, this, 0); + _factory->removePendingEndpoints(_endpoints); + _callback->setException(*_exception.get()); + } + else if(++_iter != _connectors.end()) // Try the next connector. + { + nextConnector(); + } + else + { + _factory->finishGetConnection(_connectors, this, 0); + _factory->removePendingEndpoints(_endpoints); + _callback->setException(*_exception.get()); + } +} + +bool +IceInternal::OutgoingConnectionFactory::ConnectCallback::operator<(const ConnectCallback& rhs) const +{ + return this < &rhs; +} + void IceInternal::IncomingConnectionFactory::activate() { @@ -612,7 +1165,7 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished() // We want to wait until all connections are finished outside the // thread synchronization. // - connections.swap(_connections); + connections = _connections; } if(threadPerIncomingConnectionFactory) @@ -621,6 +1174,11 @@ IceInternal::IncomingConnectionFactory::waitUntilFinished() } for_each(connections.begin(), connections.end(), Ice::voidMemFun(&ConnectionI::waitUntilFinished)); + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + _connections.clear(); + } } EndpointIPtr @@ -641,7 +1199,7 @@ IceInternal::IncomingConnectionFactory::connections() const // Only copy connections which have not been destroyed. // remove_copy_if(_connections.begin(), _connections.end(), back_inserter(result), - Ice::constMemFun(&ConnectionI::isDestroyed)); + not1(Ice::constMemFun(&ConnectionI::isActiveOrHolding))); return result; } @@ -678,11 +1236,12 @@ IceInternal::IncomingConnectionFactory::readable() const return false; } -void +bool IceInternal::IncomingConnectionFactory::read(BasicStream&) { assert(!_threadPerConnection); // Only for use with a thread pool. - assert(false); // Must not be called. + assert(false); // Must not be called, readable() returns false. + return false; } class PromoteFollower @@ -772,10 +1331,23 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt { assert(!_threadPerConnection); connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, false, 0); - connection->start(); } - catch(const LocalException&) + catch(const LocalException& ex) { + try + { + transceiver->close(); + } + catch(const Ice::LocalException&) + { + // Ignore. + } + + if(_warn) + { + Warning out(_instance->initializationData().logger); + out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + } return; } @@ -784,23 +1356,7 @@ IceInternal::IncomingConnectionFactory::message(BasicStream&, const ThreadPoolPt assert(connection); - // - // We validate outside the thread synchronization, to not block - // the factory. - // - try - { - connection->validate(); - } - catch(const LocalException&) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup. - _connections.remove(connection); - return; - } - - connection->activate(); + connection->start(this); } void @@ -844,6 +1400,48 @@ IceInternal::IncomingConnectionFactory::toString() const return _acceptor->toString(); } +void +IceInternal::IncomingConnectionFactory::connectionStartCompleted(const Ice::ConnectionIPtr& connection) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // Initialy, connections are in the holding state. If the factory is active + // we activate the connection. + // + if(_state == StateActive) + { + connection->activate(); + } +} + +void +IceInternal::IncomingConnectionFactory::connectionStartFailed(const Ice::ConnectionIPtr& connection, + const Ice::LocalException& ex) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state == StateClosed) + { + return; + } + + if(_warn) + { + Warning out(_instance->initializationData().logger); + out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + } + + // + // If the connection is finished, remove it right away from + // the connection map. Otherwise, we keep it in the map, it + // will eventually be reaped. + // + if(connection->isFinished()) + { + _connections.remove(connection); + } +} + IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const InstancePtr& instance, const EndpointIPtr& endpoint, const ObjectAdapterPtr& adapter, @@ -881,23 +1479,22 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance { connection = new ConnectionI(_instance, _transceiver, _endpoint, _adapter, _threadPerConnection, _threadPerConnectionStackSize); - connection->start(); - connection->validate(); } catch(const LocalException&) { - // - // If a connection object was constructed, then validate() - // must have raised the exception. - // - if(connection) + try { - connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup. + _transceiver->close(); } - - return; + catch(const Ice::LocalException&) + { + // Ignore + } + throw; } + connection->start(0); + _connections.push_back(connection); } else @@ -1148,33 +1745,44 @@ IceInternal::IncomingConnectionFactory::run() // // Create a connection object for the connection. // - if(transceiver) + if(!transceiver) + { + continue; + } + + try + { + connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection, + _threadPerConnectionStackSize); + } + catch(const LocalException& ex) { try { - connection = new ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection, - _threadPerConnectionStackSize); - connection->start(); + transceiver->close(); } - catch(const LocalException&) + catch(const Ice::LocalException&) { - return; } - _connections.push_back(connection); + if(_warn) + { + Warning out(_instance->initializationData().logger); + out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); + } + continue; } + + _connections.push_back(connection); } // - // In thread per connection mode, the connection's thread will - // take care of connection validation and activation (for - // non-datagram connections). We don't want to block this - // thread waiting until validation is complete, because in - // contrast to thread pool mode, it is the only thread that - // can accept connections with this factory's - // acceptor. Therefore we don't call validate() and activate() - // from the connection factory in thread per connection mode. + // In thread-per-connection mode and regardless of the background mode, + // start() doesn't block. The connection thread is started and takes + // care of the connection validation and notifies the factory through + // the callback when it's done. // + connection->start(this); } } |