diff options
Diffstat (limited to 'cpp/src/IceLocatorDiscovery/PluginI.cpp')
-rw-r--r-- | cpp/src/IceLocatorDiscovery/PluginI.cpp | 419 |
1 files changed, 275 insertions, 144 deletions
diff --git a/cpp/src/IceLocatorDiscovery/PluginI.cpp b/cpp/src/IceLocatorDiscovery/PluginI.cpp index d68d4eacf34..7948fa13870 100644 --- a/cpp/src/IceLocatorDiscovery/PluginI.cpp +++ b/cpp/src/IceLocatorDiscovery/PluginI.cpp @@ -9,50 +9,14 @@ #include <IceUtil/IceUtil.h> #include <Ice/Ice.h> +#include <Ice/Network.h> // For getInterfacesForMulticast -#include <IceLocatorDiscovery/PluginI.h> +#include <IceLocatorDiscovery/Plugin.h> #include <IceLocatorDiscovery/IceLocatorDiscovery.h> using namespace std; using namespace IceLocatorDiscovery; -#ifndef ICE_LOCATOR_DISCOVERY_API -# ifdef ICE_LOCATOR_DISCOVERY_API_EXPORTS -# define ICE_LOCATOR_DISCOVERY_API ICE_DECLSPEC_EXPORT -# else -# define ICE_LOCATOR_DISCOVERY_API /**/ -# endif -#endif - -// -// Plugin factory function. -// -extern "C" ICE_LOCATOR_DISCOVERY_API Ice::Plugin* -createIceLocatorDiscovery(const Ice::CommunicatorPtr& communicator, const string&, const Ice::StringSeq&) -{ - return new PluginI(communicator); -} - -namespace Ice -{ - -ICE_LOCATOR_DISCOVERY_API void -registerIceLocatorDiscovery(bool loadOnInitialize) -{ - Ice::registerPluginFactory("IceLocatorDiscovery", createIceLocatorDiscovery, loadOnInitialize); -} - -} - -// -// Objective-C function to allow Objective-C programs to register plugin. -// -extern "C" ICE_LOCATOR_DISCOVERY_API void -ICEregisterIceLocatorDiscovery(bool loadOnInitialize) -{ - Ice::registerIceLocatorDiscovery(loadOnInitialize); -} - namespace { @@ -136,7 +100,7 @@ class LocatorI : public Ice::BlobjectArrayAsync, { public: - LocatorI(const LookupPrxPtr&, const Ice::PropertiesPtr&, const string&, const Ice::LocatorPrxPtr&); + LocatorI(const string&, const LookupPrxPtr&, const Ice::PropertiesPtr&, const string&, const Ice::LocatorPrxPtr&); void setLookupReply(const LookupReplyPrxPtr&); #ifdef ICE_CPP11_MAPPING @@ -152,11 +116,13 @@ public: void foundLocator(const Ice::LocatorPrxPtr&); void invoke(const Ice::LocatorPrxPtr&, const RequestPtr&); + vector<Ice::LocatorPrxPtr> getLocators(const string&, const IceUtil::Time&); + private: virtual void runTimerTask(); - const LookupPrxPtr _lookup; + vector<pair<LookupPrxPtr, LookupReplyPrxPtr> > _lookup; const IceUtil::Time _timeout; const int _retryCount; const IceUtil::Time _retryDelay; @@ -164,8 +130,8 @@ private: string _instanceName; bool _warned; - LookupReplyPrxPtr _lookupReply; Ice::LocatorPrxPtr _locator; + map<string, Ice::LocatorPrxPtr> _locators; Ice::LocatorPrxPtr _voidLocator; IceUtil::Time _nextRetry; @@ -246,9 +212,58 @@ public: } }; +class PluginI : public Plugin +{ +public: + + PluginI(const std::string&, const Ice::CommunicatorPtr&); + + virtual void initialize(); + virtual void destroy(); + virtual vector<Ice::LocatorPrxPtr> getLocators(const string&, const IceUtil::Time&) const; + +private: + + const string _name; + const Ice::CommunicatorPtr _communicator; + Ice::ObjectAdapterPtr _locatorAdapter; + Ice::ObjectAdapterPtr _replyAdapter; + LocatorIPtr _locator; +}; + +} + +// +// Plugin factory function. +// +extern "C" ICE_LOCATOR_DISCOVERY_API Ice::Plugin* +createIceLocatorDiscovery(const Ice::CommunicatorPtr& communicator, const string& name, const Ice::StringSeq&) +{ + return new PluginI(name, communicator); +} + +namespace Ice +{ + +ICE_LOCATOR_DISCOVERY_API void +registerIceLocatorDiscovery(bool loadOnInitialize) +{ + Ice::registerPluginFactory("IceLocatorDiscovery", createIceLocatorDiscovery, loadOnInitialize); +} + } -PluginI::PluginI(const Ice::CommunicatorPtr& communicator) : _communicator(communicator) +// +// Objective-C function to allow Objective-C programs to register plugin. +// +extern "C" ICE_LOCATOR_DISCOVERY_API void +ICEregisterIceLocatorDiscovery(bool loadOnInitialize) +{ + Ice::registerIceLocatorDiscovery(loadOnInitialize); +} + +PluginI::PluginI(const string& name, const Ice::CommunicatorPtr& communicator) : + _name(name), _communicator(communicator) { } @@ -262,109 +277,80 @@ PluginI::initialize() string address; if(ipv4 && !preferIPv6) { - address = properties->getPropertyWithDefault("IceLocatorDiscovery.Address", "239.255.0.1"); + address = properties->getPropertyWithDefault(_name + ".Address", "239.255.0.1"); } else { - address = properties->getPropertyWithDefault("IceLocatorDiscovery.Address", "ff15::1"); + address = properties->getPropertyWithDefault(_name + ".Address", "ff15::1"); } - int port = properties->getPropertyAsIntWithDefault("IceLocatorDiscovery.Port", 4061); - string intf = properties->getProperty("IceLocatorDiscovery.Interface"); + int port = properties->getPropertyAsIntWithDefault(_name + ".Port", 4061); + string intf = properties->getProperty(_name + ".Interface"); - if(properties->getProperty("IceLocatorDiscovery.Reply.Endpoints").empty()) + string lookupEndpoints = properties->getProperty(_name + ".Lookup"); + if(lookupEndpoints.empty()) { - ostringstream os; - os << "udp"; - if(!intf.empty()) + // + // If no lookup endpoints are specified, we get all the network interfaces and create + // an endpoint for each of them. We'll send UDP multicast packages on each interface. + // + IceInternal::ProtocolSupport protocol = ipv4 && !preferIPv6 ? IceInternal::EnableIPv4 : IceInternal::EnableIPv6; + vector<string> interfaces = IceInternal::getInterfacesForMulticast(intf, protocol); + ostringstream lookup; + for(vector<string>::const_iterator p = interfaces.begin(); p != interfaces.end(); ++p) { - os << " -h \"" << intf << "\""; + if(p != interfaces.begin()) + { + lookup << ":"; + } + lookup << "udp -h \"" << address << "\" -p " << port << " --interface \"" << *p << "\""; } - properties->setProperty("IceLocatorDiscovery.Reply.Endpoints", os.str()); + lookupEndpoints = lookup.str(); + } + + if(properties->getProperty(_name + ".Reply.Endpoints").empty()) + { + properties->setProperty(_name + ".Reply.Endpoints", "udp -h " + (intf.empty() ? "*" : "\"" + intf + "\"")); } - if(properties->getProperty("IceLocatorDiscovery.Locator.Endpoints").empty()) + + if(properties->getProperty(_name + ".Locator.Endpoints").empty()) { - properties->setProperty("IceLocatorDiscovery.Locator.AdapterId", Ice::generateUUID()); // Collocated adapter + properties->setProperty(_name + ".Locator.AdapterId", Ice::generateUUID()); // Collocated adapter } - _replyAdapter = _communicator->createObjectAdapter("IceLocatorDiscovery.Reply"); - _locatorAdapter = _communicator->createObjectAdapter("IceLocatorDiscovery.Locator"); + _replyAdapter = _communicator->createObjectAdapter(_name + ".Reply"); + _locatorAdapter = _communicator->createObjectAdapter(_name + ".Locator"); // We don't want those adapters to be registered with the locator so clear their locator. _replyAdapter->setLocator(0); _locatorAdapter->setLocator(0); - string lookupEndpoints = properties->getProperty("IceLocatorDiscovery.Lookup"); - if(lookupEndpoints.empty()) - { - ostringstream os; - os << "udp -h \"" << address << "\" -p " << port; - if(!intf.empty()) - { - os << " --interface \"" << intf << "\""; - } - lookupEndpoints = os.str(); - } - Ice::ObjectPrxPtr lookupPrx = _communicator->stringToProxy("IceLocatorDiscovery/Lookup -d:" + lookupEndpoints); lookupPrx = lookupPrx->ice_collocationOptimized(false); // No collocation optimization for the multicast proxy! - try - { - // Ensure we can establish a connection to the multicast proxy - // but don't block. -#ifdef ICE_CPP11_MAPPING - promise<bool> sent; - promise<void> completed; - - lookupPrx->ice_getConnectionAsync( - [&](shared_ptr<Ice::Connection>) - { - completed.set_value(); - }, - [&](exception_ptr ex) - { - completed.set_exception(ex); - }, - [&](bool sentSynchronously) - { - sent.set_value(sentSynchronously); - }); - if(sent.get_future().get()) - { - completed.get_future().get(); - } -#else - Ice::AsyncResultPtr result = lookupPrx->begin_ice_getConnection(); - if(result->sentSynchronously()) - { - lookupPrx->end_ice_getConnection(result); - } -#endif - } - catch(const Ice::LocalException& ex) - { - ostringstream os; - os << "IceLocatorDiscovery is unable to establish a multicast connection:\n"; - os << "proxy = " << lookupPrx << '\n'; - os << ex; - throw Ice::PluginInitializationException(__FILE__, __LINE__, os.str()); - } - Ice::LocatorPrxPtr voidLocator = ICE_UNCHECKED_CAST(Ice::LocatorPrx, _locatorAdapter->addWithUUID(ICE_MAKE_SHARED(VoidLocatorI))); + Ice::LocatorPrxPtr voidLocator = ICE_UNCHECKED_CAST(Ice::LocatorPrx, + _locatorAdapter->addWithUUID(ICE_MAKE_SHARED(VoidLocatorI))); - string instanceName = properties->getProperty("IceLocatorDiscovery.InstanceName"); + string instanceName = properties->getProperty(_name + ".InstanceName"); Ice::Identity id; id.name = "Locator"; id.category = !instanceName.empty() ? instanceName : Ice::generateUUID(); - LocatorIPtr locator = ICE_MAKE_SHARED(LocatorI, ICE_UNCHECKED_CAST(LookupPrx, lookupPrx), properties, instanceName, voidLocator); - _communicator->setDefaultLocator(ICE_UNCHECKED_CAST(Ice::LocatorPrx, _locatorAdapter->add(locator, id))); + _locator = ICE_MAKE_SHARED(LocatorI, _name, ICE_UNCHECKED_CAST(LookupPrx, lookupPrx), properties, instanceName, + voidLocator); + _communicator->setDefaultLocator(ICE_UNCHECKED_CAST(Ice::LocatorPrx, _locatorAdapter->add(_locator, id))); - Ice::ObjectPrxPtr lookupReply = _replyAdapter->addWithUUID(ICE_MAKE_SHARED(LookupReplyI, locator))->ice_datagram(); - locator->setLookupReply(ICE_UNCHECKED_CAST(LookupReplyPrx, lookupReply)); + Ice::ObjectPrxPtr lookupReply = _replyAdapter->addWithUUID(ICE_MAKE_SHARED(LookupReplyI, _locator))->ice_datagram(); + _locator->setLookupReply(ICE_UNCHECKED_CAST(LookupReplyPrx, lookupReply)); _replyAdapter->activate(); _locatorAdapter->activate(); } +vector<Ice::LocatorPrxPtr> +PluginI::getLocators(const string& instanceName, const IceUtil::Time& waitTime) const +{ + return _locator->getLocators(instanceName, waitTime); +} + void PluginI::destroy() { @@ -545,14 +531,14 @@ Request::exception(const Ice::Exception& ex) #endif } -LocatorI::LocatorI(const LookupPrxPtr& lookup, +LocatorI::LocatorI(const string& name, + const LookupPrxPtr& lookup, const Ice::PropertiesPtr& p, const string& instanceName, const Ice::LocatorPrxPtr& voidLocator) : - _lookup(lookup), - _timeout(IceUtil::Time::milliSeconds(p->getPropertyAsIntWithDefault("IceLocatorDiscovery.Timeout", 300))), - _retryCount(p->getPropertyAsIntWithDefault("IceLocatorDiscovery.RetryCount", 3)), - _retryDelay(IceUtil::Time::milliSeconds(p->getPropertyAsIntWithDefault("IceLocatorDiscovery.RetryDelay", 2000))), + _timeout(IceUtil::Time::milliSeconds(p->getPropertyAsIntWithDefault(name + ".Timeout", 300))), + _retryCount(p->getPropertyAsIntWithDefault(name + ".RetryCount", 3)), + _retryDelay(IceUtil::Time::milliSeconds(p->getPropertyAsIntWithDefault(name + ".RetryDelay", 2000))), _timer(IceInternal::getInstanceTimer(lookup->ice_getCommunicator())), _instanceName(instanceName), _warned(false), @@ -560,12 +546,77 @@ LocatorI::LocatorI(const LookupPrxPtr& lookup, _voidLocator(voidLocator), _pendingRetryCount(0) { +#ifndef ICE_CPP11_MAPPING + __setNoDelete(true); +#endif + try + { + // Ensure we can establish a connection to the multicast proxy + lookup->ice_getConnection(); + } + catch(const Ice::LocalException& ex) + { + ostringstream os; + os << "IceLocatorDiscovery is unable to establish a multicast connection:\n"; + os << "proxy = " << lookup << '\n'; + os << ex; + throw Ice::PluginInitializationException(__FILE__, __LINE__, os.str()); + } + + // + // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast + // datagram on each endpoint. + // + Ice::EndpointSeq endpoints = lookup->ice_getEndpoints(); + for(vector<Ice::EndpointPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { + try + { + Ice::EndpointSeq single; + single.push_back(*p); + LookupPrxPtr l = lookup->ice_endpoints(single); + l->ice_getConnection(); + _lookup.push_back(make_pair(l, LookupReplyPrxPtr())); + } + catch(const Ice::LocalException&) + { + } + } + assert(!_lookup.empty()); +#ifndef ICE_CPP11_MAPPING + __setNoDelete(false); +#endif } void LocatorI::setLookupReply(const LookupReplyPrxPtr& lookupReply) { - _lookupReply = lookupReply; + // + // Use a lookup reply proxy whose adress matches the interface used to send multicast datagrams. + // + for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::iterator p = _lookup.begin(); p != _lookup.end(); ++p) + { + Ice::UDPEndpointInfoPtr info = ICE_DYNAMIC_CAST(Ice::UDPEndpointInfo, p->first->ice_getEndpoints()[0]->getInfo()); + if(info && !info->mcastInterface.empty()) + { + Ice::EndpointSeq endpts = lookupReply->ice_getEndpoints(); + for(Ice::EndpointSeq::const_iterator q = endpts.begin(); q != endpts.end(); ++q) + { + Ice::IPEndpointInfoPtr r = ICE_DYNAMIC_CAST(Ice::IPEndpointInfo, (*q)->getInfo()); + if(r && r->host == info->mcastInterface) + { + Ice::EndpointSeq single; + single.push_back(*q); + p->second = lookupReply->ice_endpoints(single); + } + } + } + + if(!p->second) + { + p->second = lookupReply; // Fallback: just use the given lookup reply proxy if no matching endpoint found. + } + } } #ifdef ICE_CPP11_MAPPING @@ -588,6 +639,50 @@ LocatorI::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr& amdCB, } #endif +vector<Ice::LocatorPrxPtr> +LocatorI::getLocators(const string& instanceName, const IceUtil::Time& waitTime) +{ + // + // Clear locators from previous search + // + { + Lock sync(*this); + _locators.clear(); + } + + // + // Find a locator + // + invoke(ICE_NULLPTR, ICE_NULLPTR); + + // + // Wait for responses + // + if(instanceName.empty()) + { + IceUtil::ThreadControl::sleep(waitTime); + } + else + { + Lock sync(*this); + while(_locators.find(instanceName) == _locators.end() && _pendingRetryCount > 0) + { + timedWait(waitTime); + } + } + + // + // Return found locators + // + Lock sync(*this); + vector<Ice::LocatorPrxPtr> locators; + for(map<string, Ice::LocatorPrxPtr>::const_iterator p = _locators.begin(); p != _locators.end(); ++p) + { + locators.push_back(p->second); + } + return locators; +} + void LocatorI::foundLocator(const Ice::LocatorPrxPtr& locator) { @@ -601,7 +696,8 @@ LocatorI::foundLocator(const Ice::LocatorPrxPtr& locator) // If we already have a locator assigned, ensure the given locator // has the same identity, otherwise ignore it. // - if(_locator && locator->ice_getIdentity().category != _locator->ice_getIdentity().category) + if(!_pendingRequests.empty() && + _locator && locator->ice_getIdentity().category != _locator->ice_getIdentity().category) { if(!_warned) { @@ -624,13 +720,14 @@ LocatorI::foundLocator(const Ice::LocatorPrxPtr& locator) _pendingRetryCount = 0; } - if(_locator) + Ice::LocatorPrxPtr l = _pendingRequests.empty() ? _locators[locator->ice_getIdentity().category] : _locator; + if(l) { // // We found another locator replica, append its endpoints to the // current locator proxy endpoints. // - Ice::EndpointSeq newEndpoints = _locator->ice_getEndpoints(); + Ice::EndpointSeq newEndpoints = l->ice_getEndpoints(); Ice::EndpointSeq endpts = locator->ice_getEndpoints(); for(Ice::EndpointSeq::const_iterator p = endpts.begin(); p != endpts.end(); ++p) { @@ -651,25 +748,35 @@ LocatorI::foundLocator(const Ice::LocatorPrxPtr& locator) newEndpoints.push_back(*p); } } - _locator = _locator->ice_endpoints(newEndpoints); + l = l->ice_endpoints(newEndpoints); + } + else + { + l = locator; + } + + if(_pendingRequests.empty()) + { + _locators[locator->ice_getIdentity().category] = l; + notify(); } else { - _locator = locator; + _locator = l; if(_instanceName.empty()) { _instanceName = _locator->ice_getIdentity().category; // Stick to the first discovered locator. } - } - // - // Send pending requests if any. - // - for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p) - { - (*p)->invoke(_locator); + // + // Send pending requests if any. + // + for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p) + { + (*p)->invoke(_locator); + } + _pendingRequests.clear(); } - _pendingRequests.clear(); } void @@ -678,28 +785,41 @@ LocatorI::invoke(const Ice::LocatorPrxPtr& locator, const RequestPtr& request) Lock sync(*this); if(_locator && _locator != locator) { - request->invoke(_locator); + if(request) + { + request->invoke(_locator); + } } else if(IceUtil::Time::now() < _nextRetry) { - request->invoke(_voidLocator); // Don't retry to find a locator before the retry delay expires + if(request) + { + request->invoke(_voidLocator); // Don't retry to find a locator before the retry delay expires + } } else { _locator = 0; - _pendingRequests.push_back(request); + if(request) + { + _pendingRequests.push_back(request); + } if(_pendingRetryCount == 0) // No request in progress { _pendingRetryCount = _retryCount; try { + for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin(); + l != _lookup.end(); ++l) + { #ifdef ICE_CPP11_MAPPING - _lookup->findLocatorAsync(_instanceName, _lookupReply); // Send multicast request. + l->first->findLocatorAsync(_instanceName, l->second); // Send multicast request. #else - _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. + l->first->begin_findLocator(_instanceName, l->second); // Send multicast request. #endif + } _timer->schedule(ICE_SHARED_FROM_THIS, _timeout); } catch(const Ice::LocalException&) @@ -723,11 +843,15 @@ LocatorI::runTimerTask() { try { + for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin(); + l != _lookup.end(); ++l) + { #ifdef ICE_CPP11_MAPPING - _lookup->findLocatorAsync(_instanceName, _lookupReply); // Send multicast request. + l->first->findLocatorAsync(_instanceName, l->second); // Send multicast request. #else - _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. + l->first->begin_findLocator(_instanceName, l->second); // Send multicast request. #endif + } _timer->schedule(ICE_SHARED_FROM_THIS, _timeout); return; } @@ -737,11 +861,18 @@ LocatorI::runTimerTask() _pendingRetryCount = 0; } - for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p) + if(_pendingRequests.empty()) + { + notify(); + } + else { - (*p)->invoke(_voidLocator); // Send pending requests on void locator. + for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p) + { + (*p)->invoke(_voidLocator); // Send pending requests on void locator. + } + _pendingRequests.clear(); } - _pendingRequests.clear(); _nextRetry = IceUtil::Time::now() + _retryDelay; // Only retry when the retry delay expires } |