summaryrefslogtreecommitdiff
path: root/cpp/src/IceDiscovery/LookupI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceDiscovery/LookupI.cpp')
-rw-r--r--cpp/src/IceDiscovery/LookupI.cpp222
1 files changed, 127 insertions, 95 deletions
diff --git a/cpp/src/IceDiscovery/LookupI.cpp b/cpp/src/IceDiscovery/LookupI.cpp
index 0f554e397d8..76fb6525f03 100644
--- a/cpp/src/IceDiscovery/LookupI.cpp
+++ b/cpp/src/IceDiscovery/LookupI.cpp
@@ -56,24 +56,31 @@ AdapterRequest::response(const Ice::ObjectPrxPtr& proxy, bool isReplicaGroup)
return true;
}
-#ifdef ICE_CPP11_MAPPING
void
-AdapterRequest::finished(const Ice::ObjectPrxPtr& proxy)
+AdapterRequest::finished(const ObjectPrxPtr& proxy)
{
if(proxy || _proxies.empty())
{
+#ifdef ICE_CPP11_MAPPING
Request<string>::finished(proxy);
+#else
+ RequestT<string, AMD_Locator_findAdapterByIdPtr>::finished(proxy);
+#endif
return;
}
else if(_proxies.size() == 1)
{
+#ifdef ICE_CPP11_MAPPING
Request<string>::finished(_proxies[0]);
+#else
+ RequestT<string, AMD_Locator_findAdapterByIdPtr>::finished(_proxies[0]);
+#endif
return;
}
EndpointSeq endpoints;
- shared_ptr<ObjectPrx> prx;
- for(vector<shared_ptr<ObjectPrx>>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p)
+ ObjectPrxPtr prx;
+ for(vector<ObjectPrxPtr>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p)
{
if(!prx)
{
@@ -82,37 +89,12 @@ AdapterRequest::finished(const Ice::ObjectPrxPtr& proxy)
Ice::EndpointSeq endpts = (*p)->ice_getEndpoints();
copy(endpts.begin(), endpts.end(), back_inserter(endpoints));
}
+#ifdef ICE_CPP11_MAPPING
Request<string>::finished(prx->ice_endpoints(endpoints));
-}
#else
-void
-AdapterRequest::finished(const Ice::ObjectPrxPtr& proxy)
-{
- if(proxy || _proxies.empty())
- {
- RequestT<std::string, Ice::AMD_Locator_findAdapterByIdPtr>::finished(proxy);
- return;
- }
- else if(_proxies.size() == 1)
- {
- RequestT<std::string, Ice::AMD_Locator_findAdapterByIdPtr>::finished(_proxies[0]);
- return;
- }
-
- Ice::EndpointSeq endpoints;
- Ice::ObjectPrxPtr prx;
- for(vector<Ice::ObjectPrxPtr>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p)
- {
- if(!prx)
- {
- prx = *p;
- }
- Ice::EndpointSeq endpts = (*p)->ice_getEndpoints();
- copy(endpts.begin(), endpts.end(), back_inserter(endpoints));
- }
- RequestT<std::string, Ice::AMD_Locator_findAdapterByIdPtr>::finished(prx->ice_endpoints(endpoints));
-}
+ RequestT<string, AMD_Locator_findAdapterByIdPtr>::finished(prx->ice_endpoints(endpoints));
#endif
+}
void
AdapterRequest::runTimerTask()
@@ -134,13 +116,52 @@ ObjectRequest::runTimerTask()
LookupI::LookupI(const LocatorRegistryIPtr& registry, const LookupPrxPtr& lookup, const Ice::PropertiesPtr& properties) :
_registry(registry),
- _lookup(lookup),
_timeout(IceUtil::Time::milliSeconds(properties->getPropertyAsIntWithDefault("IceDiscovery.Timeout", 300))),
_retryCount(properties->getPropertyAsIntWithDefault("IceDiscovery.RetryCount", 3)),
_latencyMultiplier(properties->getPropertyAsIntWithDefault("IceDiscovery.LatencyMultiplier", 1)),
_domainId(properties->getProperty("IceDiscovery.DomainId")),
_timer(IceInternal::getInstanceTimer(lookup->ice_getCommunicator()))
{
+#ifndef ICE_CPP11_MAPPING
+ __setNoDelete(true);
+#endif
+ try
+ {
+ // Ensure we can establish a connection to the multicast proxy
+ lookup->ice_getConnection();
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ ostringstream os;
+ os << "IceDiscovery is unable to establish a multicast connection:\n";
+ os << "proxy = " << lookup << '\n';
+ os << ex;
+ throw Ice::PluginInitializationException(__FILE__, __LINE__, os.str());
+ }
+
+ //
+ // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast
+ // datagram on each endpoint.
+ //
+ EndpointSeq endpoints = lookup->ice_getEndpoints();
+ for(vector<EndpointPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
+ {
+ try
+ {
+ EndpointSeq single;
+ single.push_back(*p);
+ LookupPrxPtr l = lookup->ice_endpoints(single);
+ l->ice_getConnection();
+ _lookup.push_back(make_pair(l, LookupReplyPrxPtr()));
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ }
+ assert(!_lookup.empty());
+#ifndef ICE_CPP11_MAPPING
+ __setNoDelete(false);
+#endif
}
LookupI::~LookupI()
@@ -168,7 +189,32 @@ LookupI::destroy()
void
LookupI::setLookupReply(const LookupReplyPrxPtr& lookupReply)
{
- _lookupReply = lookupReply;
+ //
+ // Use a lookup reply proxy whose adress matches the interface used to send multicast datagrams.
+ //
+ for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::iterator p = _lookup.begin(); p != _lookup.end(); ++p)
+ {
+ UDPEndpointInfoPtr info = ICE_DYNAMIC_CAST(UDPEndpointInfo, p->first->ice_getEndpoints()[0]->getInfo());
+ if(info && !info->mcastInterface.empty())
+ {
+ EndpointSeq endpts = lookupReply->ice_getEndpoints();
+ for(EndpointSeq::const_iterator q = endpts.begin(); q != endpts.end(); ++q)
+ {
+ IPEndpointInfoPtr r = ICE_DYNAMIC_CAST(IPEndpointInfo, (*q)->getInfo());
+ if(r && r->host == info->mcastInterface)
+ {
+ EndpointSeq single;
+ single.push_back(*q);
+ p->second = lookupReply->ice_endpoints(single);
+ }
+ }
+ }
+
+ if(!p->second)
+ {
+ p->second = lookupReply; // Fallback: just use the given lookup reply proxy if no matching endpoint found.
+ }
+ }
}
void
@@ -242,107 +288,85 @@ LookupI::findAdapterById(const string& domainId, const string& adapterId, const
}
}
-#ifdef ICE_CPP11_MAPPING
void
-LookupI::findObject(function<void(const shared_ptr<Ice::ObjectPrx>&)> response, const Ice::Identity& id)
-{
- Lock sync(*this);
- map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(id);
- if(p == _objectRequests.end())
- {
- p = _objectRequests.insert(make_pair(id, make_shared<ObjectRequest>(shared_from_this(), id, _retryCount))).first;
- }
-
- if(p->second->addCallback(response))
- {
- try
- {
- _lookup->findObjectByIdAsync(_domainId, id, _lookupReply);
- _timer->schedule(p->second, _timeout);
- }
- catch(const Ice::LocalException&)
- {
- p->second->finished(nullptr);
- _objectRequests.erase(p);
- }
- }
-}
-
-void
-LookupI::findAdapter(function<void(const shared_ptr<Ice::ObjectPrx>&)> response, const std::string& adapterId)
-{
- Lock sync(*this);
- map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(adapterId);
- if(p == _adapterRequests.end())
- {
- p = _adapterRequests.insert(make_pair(adapterId, make_shared<AdapterRequest>(shared_from_this(), adapterId, _retryCount))).first;
- }
-
- if(p->second->addCallback(response))
- {
- try
- {
- _lookup->findAdapterByIdAsync(_domainId, adapterId, _lookupReply);
- _timer->schedule(p->second, _timeout);
- }
- catch(const Ice::LocalException&)
- {
- p->second->finished(0);
- _adapterRequests.erase(p);
- }
- }
-}
+#ifdef ICE_CPP11_MAPPING
+LookupI::findObject(function<void(const shared_ptr<Ice::ObjectPrx>&)> cb, const Ice::Identity& id)
#else
-void
LookupI::findObject(const Ice::AMD_Locator_findObjectByIdPtr& cb, const Ice::Identity& id)
+#endif
{
Lock sync(*this);
map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(id);
if(p == _objectRequests.end())
{
- p = _objectRequests.insert(make_pair(id, new ObjectRequest(this, id, _retryCount))).first;
+ p = _objectRequests.insert(make_pair(id, ICE_MAKE_SHARED(ObjectRequest,
+ ICE_SHARED_FROM_THIS,
+ id,
+ _retryCount))).first;
}
if(p->second->addCallback(cb))
{
try
{
- _lookup->begin_findObjectById(_domainId, id, _lookupReply);
+ for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin(); l != _lookup.end();
+ ++l)
+ {
+#ifdef ICE_CPP11_MAPPING
+ l->first->findObjectByIdAsync(_domainId, id, l->second);
+#else
+ l->first->begin_findObjectById(_domainId, id, l->second);
+#endif
+ }
_timer->schedule(p->second, _timeout);
}
catch(const Ice::LocalException&)
{
- p->second->finished(0);
+ p->second->finished(ICE_NULLPTR);
_objectRequests.erase(p);
}
}
}
void
+#ifdef ICE_CPP11_MAPPING
+LookupI::findAdapter(function<void(const shared_ptr<Ice::ObjectPrx>&)> cb, const std::string& adapterId)
+#else
LookupI::findAdapter(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const std::string& adapterId)
+#endif
{
Lock sync(*this);
map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(adapterId);
if(p == _adapterRequests.end())
{
- p = _adapterRequests.insert(make_pair(adapterId, new AdapterRequest(this, adapterId, _retryCount))).first;
+ p = _adapterRequests.insert(make_pair(adapterId, ICE_MAKE_SHARED(AdapterRequest,
+ ICE_SHARED_FROM_THIS,
+ adapterId,
+ _retryCount))).first;
}
if(p->second->addCallback(cb))
{
try
{
- _lookup->begin_findAdapterById(_domainId, adapterId, _lookupReply);
+ for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin(); l != _lookup.end();
+ ++l)
+ {
+#ifdef ICE_CPP11_MAPPING
+ l->first->findAdapterByIdAsync(_domainId, adapterId, l->second);
+#else
+ l->first->begin_findAdapterById(_domainId, adapterId, l->second);
+#endif
+ }
_timer->schedule(p->second, _timeout);
}
catch(const Ice::LocalException&)
{
- p->second->finished(0);
+ p->second->finished(ICE_NULLPTR);
_adapterRequests.erase(p);
}
}
}
-#endif
void
LookupI::foundObject(const Ice::Identity& id, const Ice::ObjectPrxPtr& proxy)
@@ -390,11 +414,15 @@ LookupI::objectRequestTimedOut(const ObjectRequestPtr& request)
{
try
{
+ for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin(); l != _lookup.end();
+ ++l)
+ {
#ifdef ICE_CPP11_MAPPING
- _lookup->findObjectByIdAsync(_domainId, request->getId(), _lookupReply);
+ l->first->findObjectByIdAsync(_domainId, request->getId(), l->second);
#else
- _lookup->begin_findObjectById(_domainId, request->getId(), _lookupReply);
+ l->first->begin_findObjectById(_domainId, request->getId(), l->second);
#endif
+ }
_timer->schedule(p->second, _timeout);
return;
}
@@ -422,11 +450,15 @@ LookupI::adapterRequestTimedOut(const AdapterRequestPtr& request)
{
try
{
+ for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin(); l != _lookup.end();
+ ++l)
+ {
#ifdef ICE_CPP11_MAPPING
- _lookup->findAdapterByIdAsync(_domainId, request->getId(), _lookupReply);
+ l->first->findAdapterByIdAsync(_domainId, request->getId(), l->second);
#else
- _lookup->begin_findAdapterById(_domainId, request->getId(), _lookupReply);
+ l->first->begin_findAdapterById(_domainId, request->getId(), l->second);
#endif
+ }
_timer->schedule(p->second, _timeout);
return;
}