diff options
Diffstat (limited to 'cpp/src/IceDiscovery/LookupI.cpp')
-rw-r--r-- | cpp/src/IceDiscovery/LookupI.cpp | 222 |
1 files changed, 127 insertions, 95 deletions
diff --git a/cpp/src/IceDiscovery/LookupI.cpp b/cpp/src/IceDiscovery/LookupI.cpp index 0f554e397d8..76fb6525f03 100644 --- a/cpp/src/IceDiscovery/LookupI.cpp +++ b/cpp/src/IceDiscovery/LookupI.cpp @@ -56,24 +56,31 @@ AdapterRequest::response(const Ice::ObjectPrxPtr& proxy, bool isReplicaGroup) return true; } -#ifdef ICE_CPP11_MAPPING void -AdapterRequest::finished(const Ice::ObjectPrxPtr& proxy) +AdapterRequest::finished(const ObjectPrxPtr& proxy) { if(proxy || _proxies.empty()) { +#ifdef ICE_CPP11_MAPPING Request<string>::finished(proxy); +#else + RequestT<string, AMD_Locator_findAdapterByIdPtr>::finished(proxy); +#endif return; } else if(_proxies.size() == 1) { +#ifdef ICE_CPP11_MAPPING Request<string>::finished(_proxies[0]); +#else + RequestT<string, AMD_Locator_findAdapterByIdPtr>::finished(_proxies[0]); +#endif return; } EndpointSeq endpoints; - shared_ptr<ObjectPrx> prx; - for(vector<shared_ptr<ObjectPrx>>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p) + ObjectPrxPtr prx; + for(vector<ObjectPrxPtr>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p) { if(!prx) { @@ -82,37 +89,12 @@ AdapterRequest::finished(const Ice::ObjectPrxPtr& proxy) Ice::EndpointSeq endpts = (*p)->ice_getEndpoints(); copy(endpts.begin(), endpts.end(), back_inserter(endpoints)); } +#ifdef ICE_CPP11_MAPPING Request<string>::finished(prx->ice_endpoints(endpoints)); -} #else -void -AdapterRequest::finished(const Ice::ObjectPrxPtr& proxy) -{ - if(proxy || _proxies.empty()) - { - RequestT<std::string, Ice::AMD_Locator_findAdapterByIdPtr>::finished(proxy); - return; - } - else if(_proxies.size() == 1) - { - RequestT<std::string, Ice::AMD_Locator_findAdapterByIdPtr>::finished(_proxies[0]); - return; - } - - Ice::EndpointSeq endpoints; - Ice::ObjectPrxPtr prx; - for(vector<Ice::ObjectPrxPtr>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p) - { - if(!prx) - { - prx = *p; - } - Ice::EndpointSeq endpts = (*p)->ice_getEndpoints(); - copy(endpts.begin(), endpts.end(), back_inserter(endpoints)); - } - RequestT<std::string, Ice::AMD_Locator_findAdapterByIdPtr>::finished(prx->ice_endpoints(endpoints)); -} + RequestT<string, AMD_Locator_findAdapterByIdPtr>::finished(prx->ice_endpoints(endpoints)); #endif +} void AdapterRequest::runTimerTask() @@ -134,13 +116,52 @@ ObjectRequest::runTimerTask() LookupI::LookupI(const LocatorRegistryIPtr& registry, const LookupPrxPtr& lookup, const Ice::PropertiesPtr& properties) : _registry(registry), - _lookup(lookup), _timeout(IceUtil::Time::milliSeconds(properties->getPropertyAsIntWithDefault("IceDiscovery.Timeout", 300))), _retryCount(properties->getPropertyAsIntWithDefault("IceDiscovery.RetryCount", 3)), _latencyMultiplier(properties->getPropertyAsIntWithDefault("IceDiscovery.LatencyMultiplier", 1)), _domainId(properties->getProperty("IceDiscovery.DomainId")), _timer(IceInternal::getInstanceTimer(lookup->ice_getCommunicator())) { +#ifndef ICE_CPP11_MAPPING + __setNoDelete(true); +#endif + try + { + // Ensure we can establish a connection to the multicast proxy + lookup->ice_getConnection(); + } + catch(const Ice::LocalException& ex) + { + ostringstream os; + os << "IceDiscovery is unable to establish a multicast connection:\n"; + os << "proxy = " << lookup << '\n'; + os << ex; + throw Ice::PluginInitializationException(__FILE__, __LINE__, os.str()); + } + + // + // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast + // datagram on each endpoint. + // + EndpointSeq endpoints = lookup->ice_getEndpoints(); + for(vector<EndpointPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { + try + { + EndpointSeq single; + single.push_back(*p); + LookupPrxPtr l = lookup->ice_endpoints(single); + l->ice_getConnection(); + _lookup.push_back(make_pair(l, LookupReplyPrxPtr())); + } + catch(const Ice::LocalException&) + { + } + } + assert(!_lookup.empty()); +#ifndef ICE_CPP11_MAPPING + __setNoDelete(false); +#endif } LookupI::~LookupI() @@ -168,7 +189,32 @@ LookupI::destroy() void LookupI::setLookupReply(const LookupReplyPrxPtr& lookupReply) { - _lookupReply = lookupReply; + // + // Use a lookup reply proxy whose adress matches the interface used to send multicast datagrams. + // + for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::iterator p = _lookup.begin(); p != _lookup.end(); ++p) + { + UDPEndpointInfoPtr info = ICE_DYNAMIC_CAST(UDPEndpointInfo, p->first->ice_getEndpoints()[0]->getInfo()); + if(info && !info->mcastInterface.empty()) + { + EndpointSeq endpts = lookupReply->ice_getEndpoints(); + for(EndpointSeq::const_iterator q = endpts.begin(); q != endpts.end(); ++q) + { + IPEndpointInfoPtr r = ICE_DYNAMIC_CAST(IPEndpointInfo, (*q)->getInfo()); + if(r && r->host == info->mcastInterface) + { + EndpointSeq single; + single.push_back(*q); + p->second = lookupReply->ice_endpoints(single); + } + } + } + + if(!p->second) + { + p->second = lookupReply; // Fallback: just use the given lookup reply proxy if no matching endpoint found. + } + } } void @@ -242,107 +288,85 @@ LookupI::findAdapterById(const string& domainId, const string& adapterId, const } } -#ifdef ICE_CPP11_MAPPING void -LookupI::findObject(function<void(const shared_ptr<Ice::ObjectPrx>&)> response, const Ice::Identity& id) -{ - Lock sync(*this); - map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(id); - if(p == _objectRequests.end()) - { - p = _objectRequests.insert(make_pair(id, make_shared<ObjectRequest>(shared_from_this(), id, _retryCount))).first; - } - - if(p->second->addCallback(response)) - { - try - { - _lookup->findObjectByIdAsync(_domainId, id, _lookupReply); - _timer->schedule(p->second, _timeout); - } - catch(const Ice::LocalException&) - { - p->second->finished(nullptr); - _objectRequests.erase(p); - } - } -} - -void -LookupI::findAdapter(function<void(const shared_ptr<Ice::ObjectPrx>&)> response, const std::string& adapterId) -{ - Lock sync(*this); - map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(adapterId); - if(p == _adapterRequests.end()) - { - p = _adapterRequests.insert(make_pair(adapterId, make_shared<AdapterRequest>(shared_from_this(), adapterId, _retryCount))).first; - } - - if(p->second->addCallback(response)) - { - try - { - _lookup->findAdapterByIdAsync(_domainId, adapterId, _lookupReply); - _timer->schedule(p->second, _timeout); - } - catch(const Ice::LocalException&) - { - p->second->finished(0); - _adapterRequests.erase(p); - } - } -} +#ifdef ICE_CPP11_MAPPING +LookupI::findObject(function<void(const shared_ptr<Ice::ObjectPrx>&)> cb, const Ice::Identity& id) #else -void LookupI::findObject(const Ice::AMD_Locator_findObjectByIdPtr& cb, const Ice::Identity& id) +#endif { Lock sync(*this); map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(id); if(p == _objectRequests.end()) { - p = _objectRequests.insert(make_pair(id, new ObjectRequest(this, id, _retryCount))).first; + p = _objectRequests.insert(make_pair(id, ICE_MAKE_SHARED(ObjectRequest, + ICE_SHARED_FROM_THIS, + id, + _retryCount))).first; } if(p->second->addCallback(cb)) { try { - _lookup->begin_findObjectById(_domainId, id, _lookupReply); + for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin(); l != _lookup.end(); + ++l) + { +#ifdef ICE_CPP11_MAPPING + l->first->findObjectByIdAsync(_domainId, id, l->second); +#else + l->first->begin_findObjectById(_domainId, id, l->second); +#endif + } _timer->schedule(p->second, _timeout); } catch(const Ice::LocalException&) { - p->second->finished(0); + p->second->finished(ICE_NULLPTR); _objectRequests.erase(p); } } } void +#ifdef ICE_CPP11_MAPPING +LookupI::findAdapter(function<void(const shared_ptr<Ice::ObjectPrx>&)> cb, const std::string& adapterId) +#else LookupI::findAdapter(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const std::string& adapterId) +#endif { Lock sync(*this); map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(adapterId); if(p == _adapterRequests.end()) { - p = _adapterRequests.insert(make_pair(adapterId, new AdapterRequest(this, adapterId, _retryCount))).first; + p = _adapterRequests.insert(make_pair(adapterId, ICE_MAKE_SHARED(AdapterRequest, + ICE_SHARED_FROM_THIS, + adapterId, + _retryCount))).first; } if(p->second->addCallback(cb)) { try { - _lookup->begin_findAdapterById(_domainId, adapterId, _lookupReply); + for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin(); l != _lookup.end(); + ++l) + { +#ifdef ICE_CPP11_MAPPING + l->first->findAdapterByIdAsync(_domainId, adapterId, l->second); +#else + l->first->begin_findAdapterById(_domainId, adapterId, l->second); +#endif + } _timer->schedule(p->second, _timeout); } catch(const Ice::LocalException&) { - p->second->finished(0); + p->second->finished(ICE_NULLPTR); _adapterRequests.erase(p); } } } -#endif void LookupI::foundObject(const Ice::Identity& id, const Ice::ObjectPrxPtr& proxy) @@ -390,11 +414,15 @@ LookupI::objectRequestTimedOut(const ObjectRequestPtr& request) { try { + for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin(); l != _lookup.end(); + ++l) + { #ifdef ICE_CPP11_MAPPING - _lookup->findObjectByIdAsync(_domainId, request->getId(), _lookupReply); + l->first->findObjectByIdAsync(_domainId, request->getId(), l->second); #else - _lookup->begin_findObjectById(_domainId, request->getId(), _lookupReply); + l->first->begin_findObjectById(_domainId, request->getId(), l->second); #endif + } _timer->schedule(p->second, _timeout); return; } @@ -422,11 +450,15 @@ LookupI::adapterRequestTimedOut(const AdapterRequestPtr& request) { try { + for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin(); l != _lookup.end(); + ++l) + { #ifdef ICE_CPP11_MAPPING - _lookup->findAdapterByIdAsync(_domainId, request->getId(), _lookupReply); + l->first->findAdapterByIdAsync(_domainId, request->getId(), l->second); #else - _lookup->begin_findAdapterById(_domainId, request->getId(), _lookupReply); + l->first->begin_findAdapterById(_domainId, request->getId(), l->second); #endif + } _timer->schedule(p->second, _timeout); return; } |