diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionFactory.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 221 |
1 files changed, 93 insertions, 128 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 6bcc4c9ca20..9bba761af6d 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -60,11 +60,11 @@ IceInternal::OutgoingConnectionFactory::destroy() #ifdef _STLP_BEGIN_NAMESPACE // voidbind2nd is an STLport extension for broken compilers in IceUtil/Functional.h for_each(_connections.begin(), _connections.end(), - voidbind2nd(Ice::secondVoidMemFun1<EndpointIPtr, ConnectionI, ConnectionI::DestructionReason> + voidbind2nd(Ice::secondVoidMemFun1<ConnectorPtr, ConnectionI, ConnectionI::DestructionReason> (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); #else for_each(_connections.begin(), _connections.end(), - bind2nd(Ice::secondVoidMemFun1<const EndpointIPtr, ConnectionI, ConnectionI::DestructionReason> + bind2nd(Ice::secondVoidMemFun1<const ConnectorPtr, ConnectionI, ConnectionI::DestructionReason> (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); #endif @@ -75,7 +75,7 @@ IceInternal::OutgoingConnectionFactory::destroy() void IceInternal::OutgoingConnectionFactory::waitUntilFinished() { - multimap<EndpointIPtr, ConnectionIPtr> connections; + multimap<ConnectorPtr, ConnectionIPtr> connections; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -98,7 +98,7 @@ IceInternal::OutgoingConnectionFactory::waitUntilFinished() } for_each(connections.begin(), connections.end(), - Ice::secondVoidMemFun<const EndpointIPtr, ConnectionI>(&ConnectionI::waitUntilFinished)); + Ice::secondVoidMemFun<const ConnectorPtr, ConnectionI>(&ConnectionI::waitUntilFinished)); } ConnectionIPtr @@ -107,7 +107,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt bool& compress) { assert(!endpts.empty()); - vector<EndpointIPtr> endpoints = endpts; + vector<pair<ConnectorPtr, EndpointIPtr> > connectors; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -120,7 +120,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // // Reap connections for which destruction has completed. // - std::multimap<EndpointIPtr, ConnectionIPtr>::iterator p = _connections.begin(); + std::multimap<ConnectorPtr, ConnectionIPtr>::iterator p = _connections.begin(); while(p != _connections.end()) { if(p->second->isFinished()) @@ -133,37 +133,48 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt } } - // - // Modify endpoints with overrides. - // + vector<EndpointIPtr> endpoints = endpts; vector<EndpointIPtr>::iterator q; for(q = endpoints.begin(); q != endpoints.end(); ++q) { + // + // Modify endpoints with overrides. + // if(_instance->defaultsAndOverrides()->overrideTimeout) { *q = (*q)->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); } // - // The Connection object does not take the compression flag of - // endpoints into account, but instead gets the information - // about whether messages should be compressed or not from - // other sources. In order to allow connection sharing for - // endpoints that differ in the value of the compression flag - // only, we always set the compression flag to false here in - // this connection factory. + // Create connectors for the endpoints. // - *q = (*q)->compress(false); + vector<ConnectorPtr> cons = (*q)->connectors(); + assert(cons.size() > 0); + + // + // Shuffle connectors is endpoint selection type is Random. + // + if(selType == Random) + { + RandomNumberGenerator rng; + random_shuffle(cons.begin(), cons.end(), rng); + } + + vector<ConnectorPtr>::const_iterator r; + for(r = cons.begin(); r != cons.end(); ++r) + { + connectors.push_back(make_pair(*r, *q)); + } } // // Search for existing connections. // - vector<EndpointIPtr>::const_iterator r; - for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r) + vector<pair<ConnectorPtr, EndpointIPtr> >::const_iterator r; + for(r = connectors.begin(); r != connectors.end(); ++r) { - pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, - multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(*q); + pair<multimap<ConnectorPtr, ConnectionIPtr>::iterator, + multimap<ConnectorPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range((*r).first); while(pr.first != pr.second) { @@ -181,7 +192,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt } else { - compress = (*r)->compress(); + compress = (*r).second->compress(); } return pr.first->second; @@ -199,15 +210,15 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt bool searchAgain = false; while(!_destroyed) { - for(q = endpoints.begin(); q != endpoints.end(); ++q) + for(r = connectors.begin(); r != connectors.end(); ++r) { - if(_pending.find(*q) != _pending.end()) + if(_pending.find((*r).first) != _pending.end()) { break; } } - if(q == endpoints.end()) + if(r == connectors.end()) { break; } @@ -228,10 +239,10 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // if(searchAgain) { - for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r) + for(r = connectors.begin(); r != connectors.end(); ++r) { - pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, - multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(*q); + pair<multimap<ConnectorPtr, ConnectionIPtr>::iterator, + multimap<ConnectorPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range((*r).first); while(pr.first != pr.second) { @@ -249,7 +260,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt } else { - compress = (*r)->compress(); + compress = (*r).second->compress(); } return pr.first->second; @@ -266,100 +277,45 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // create connections to the same endpoints, we add our // endpoints to _pending. // - _pending.insert(endpoints.begin(), endpoints.end()); + for(r = connectors.begin(); r != connectors.end(); ++r) + { + _pending.insert((*r).first); + } } + ConnectorPtr connector; ConnectionIPtr connection; auto_ptr<LocalException> exception; - vector<EndpointIPtr>::const_iterator q; - vector<EndpointIPtr>::const_iterator r; - for(q = endpoints.begin(), r = endpts.begin(); q != endpoints.end(); ++q, ++r) + vector<pair<ConnectorPtr, EndpointIPtr> >::const_iterator q; + for(q = connectors.begin(); q != connectors.end(); ++q) { - EndpointIPtr endpoint = *q; + connector = (*q).first; + EndpointIPtr endpoint = (*q).second; try { - vector<ConnectorPtr> connectors; - unsigned int size; - Int timeout; - vector<TransceiverPtr> transceivers = endpoint->clientTransceivers(); - if(transceivers.size() == 0) + int timeout; + if(_instance->defaultsAndOverrides()->overrideConnectTimeout) { - connectors = endpoint->connectors(); - size = connectors.size(); - assert(size > 0); - - if(selType == Random) - { - RandomNumberGenerator rng; - random_shuffle(connectors.begin(), connectors.end(), rng); - } - - if(_instance->defaultsAndOverrides()->overrideConnectTimeout) - { - timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue; - } - // It is not necessary to check for overrideTimeout, - // the endpoint has already been modified with this - // override, if set. - else - { - timeout = endpoint->timeout(); - } - } + timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue; + } + // It is not necessary to check for overrideTimeout, + // the endpoint has already been modified with this + // override, if set. else { - size = transceivers.size(); - if(selType == Random) - { - RandomNumberGenerator rng; - random_shuffle(transceivers.begin(), transceivers.end(), rng); - } + timeout = endpoint->timeout(); } - for(unsigned int i = 0; i < size; ++i) - { - try - { - TransceiverPtr transceiver; - if(transceivers.size() == size) - { - transceiver = transceivers[i]; - } - else - { - transceiver = connectors[i]->connect(timeout); - assert(transceiver); - } + TransceiverPtr transceiver = connector->connect(timeout); + assert(transceiver); - connection = new ConnectionI(_instance, transceiver, endpoint, 0, threadPerConnection, + connection = new ConnectionI(_instance, transceiver, endpoint->compress(false), 0, threadPerConnection, _instance->threadPerConnectionStackSize()); - connection->start(); - connection->validate(); - } - catch(const LocalException& ex) - { - // - // If a connection object was constructed, then validate() - // must have raised the exception. - // - if(connection) - { - connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup. - connection = 0; - } - - // - // Throw exception if this is last transceiver in list. - // - if(i == size - 1) - { - ex.ice_throw(); - } - } - } + connection->start(); + connection->validate(); if(_instance->defaultsAndOverrides()->overrideCompress) { @@ -367,13 +323,23 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt } else { - compress = (*r)->compress(); + compress = endpoint->compress(); } break; } catch(const LocalException& ex) { exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); + + // + // If a connection object was constructed, then validate() + // must have raised the exception. + // + if(connection) + { + connection->waitUntilFinished(); // We must call waitUntilFinished() for cleanup. + connection = 0; + } } TraceLevelsPtr traceLevels = _instance->traceLevels(); @@ -382,7 +348,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt Trace out(_instance->initializationData().logger, traceLevels->retryCat); out << "connection to endpoint failed"; - if(moreEndpts || q + 1 != endpoints.end()) + if(moreEndpts || q + 1 != connectors.end()) { out << ", trying next endpoint\n"; } @@ -401,9 +367,9 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // Signal other threads that we are done with trying to // establish connections to our endpoints. // - for(q = endpoints.begin(); q != endpoints.end(); ++q) + for(q = connectors.begin(); q != connectors.end(); ++q) { - _pending.erase(*q); + _pending.erase((*q).first); } notifyAll(); @@ -414,8 +380,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt } else { - _connections.insert(_connections.end(), - pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint(), connection)); + _connections.insert(_connections.end(), pair<const ConnectorPtr, ConnectionIPtr>(connector, connection)); if(_destroyed) { @@ -477,22 +442,22 @@ IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& route // endpoint = endpoint->compress(false); - pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator, - multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(endpoint); - - while(pr.first != pr.second) + multimap<ConnectorPtr, ConnectionIPtr>::const_iterator q; + for(q = _connections.begin(); q != _connections.end(); ++q) { - try + if((*q).second->endpoint() == endpoint) { - pr.first->second->setAdapter(adapter); - } - catch(const Ice::LocalException&) - { - // - // Ignore, the connection is being closed or closed. - // + try + { + (*q).second->setAdapter(adapter); + } + catch(const Ice::LocalException&) + { + // + // Ignore, the connection is being closed or closed. + // + } } - ++pr.first; } } } @@ -507,7 +472,7 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad return; } - for(multimap<EndpointIPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p) + for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p) { if(p->second->getAdapter() == adapter) { @@ -533,7 +498,7 @@ IceInternal::OutgoingConnectionFactory::flushBatchRequests() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - for(std::multimap<EndpointIPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); + for(std::multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p) { @@ -919,7 +884,7 @@ IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const Instance _threadPerConnection = adapterImpl->getThreadPerConnection(); _threadPerConnectionStackSize = adapterImpl->getThreadPerConnectionStackSize(); - const_cast<TransceiverPtr&>(_transceiver) = _endpoint->serverTransceiver(const_cast<EndpointIPtr&>(_endpoint)); + const_cast<TransceiverPtr&>(_transceiver) = _endpoint->transceiver(const_cast<EndpointIPtr&>(_endpoint)); if(_transceiver) { ConnectionIPtr connection; |