summaryrefslogtreecommitdiff
path: root/cpp/src/IceLocatorDiscovery/PluginI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceLocatorDiscovery/PluginI.cpp')
-rw-r--r--cpp/src/IceLocatorDiscovery/PluginI.cpp419
1 files changed, 275 insertions, 144 deletions
diff --git a/cpp/src/IceLocatorDiscovery/PluginI.cpp b/cpp/src/IceLocatorDiscovery/PluginI.cpp
index d68d4eacf34..7948fa13870 100644
--- a/cpp/src/IceLocatorDiscovery/PluginI.cpp
+++ b/cpp/src/IceLocatorDiscovery/PluginI.cpp
@@ -9,50 +9,14 @@
#include <IceUtil/IceUtil.h>
#include <Ice/Ice.h>
+#include <Ice/Network.h> // For getInterfacesForMulticast
-#include <IceLocatorDiscovery/PluginI.h>
+#include <IceLocatorDiscovery/Plugin.h>
#include <IceLocatorDiscovery/IceLocatorDiscovery.h>
using namespace std;
using namespace IceLocatorDiscovery;
-#ifndef ICE_LOCATOR_DISCOVERY_API
-# ifdef ICE_LOCATOR_DISCOVERY_API_EXPORTS
-# define ICE_LOCATOR_DISCOVERY_API ICE_DECLSPEC_EXPORT
-# else
-# define ICE_LOCATOR_DISCOVERY_API /**/
-# endif
-#endif
-
-//
-// Plugin factory function.
-//
-extern "C" ICE_LOCATOR_DISCOVERY_API Ice::Plugin*
-createIceLocatorDiscovery(const Ice::CommunicatorPtr& communicator, const string&, const Ice::StringSeq&)
-{
- return new PluginI(communicator);
-}
-
-namespace Ice
-{
-
-ICE_LOCATOR_DISCOVERY_API void
-registerIceLocatorDiscovery(bool loadOnInitialize)
-{
- Ice::registerPluginFactory("IceLocatorDiscovery", createIceLocatorDiscovery, loadOnInitialize);
-}
-
-}
-
-//
-// Objective-C function to allow Objective-C programs to register plugin.
-//
-extern "C" ICE_LOCATOR_DISCOVERY_API void
-ICEregisterIceLocatorDiscovery(bool loadOnInitialize)
-{
- Ice::registerIceLocatorDiscovery(loadOnInitialize);
-}
-
namespace
{
@@ -136,7 +100,7 @@ class LocatorI : public Ice::BlobjectArrayAsync,
{
public:
- LocatorI(const LookupPrxPtr&, const Ice::PropertiesPtr&, const string&, const Ice::LocatorPrxPtr&);
+ LocatorI(const string&, const LookupPrxPtr&, const Ice::PropertiesPtr&, const string&, const Ice::LocatorPrxPtr&);
void setLookupReply(const LookupReplyPrxPtr&);
#ifdef ICE_CPP11_MAPPING
@@ -152,11 +116,13 @@ public:
void foundLocator(const Ice::LocatorPrxPtr&);
void invoke(const Ice::LocatorPrxPtr&, const RequestPtr&);
+ vector<Ice::LocatorPrxPtr> getLocators(const string&, const IceUtil::Time&);
+
private:
virtual void runTimerTask();
- const LookupPrxPtr _lookup;
+ vector<pair<LookupPrxPtr, LookupReplyPrxPtr> > _lookup;
const IceUtil::Time _timeout;
const int _retryCount;
const IceUtil::Time _retryDelay;
@@ -164,8 +130,8 @@ private:
string _instanceName;
bool _warned;
- LookupReplyPrxPtr _lookupReply;
Ice::LocatorPrxPtr _locator;
+ map<string, Ice::LocatorPrxPtr> _locators;
Ice::LocatorPrxPtr _voidLocator;
IceUtil::Time _nextRetry;
@@ -246,9 +212,58 @@ public:
}
};
+class PluginI : public Plugin
+{
+public:
+
+ PluginI(const std::string&, const Ice::CommunicatorPtr&);
+
+ virtual void initialize();
+ virtual void destroy();
+ virtual vector<Ice::LocatorPrxPtr> getLocators(const string&, const IceUtil::Time&) const;
+
+private:
+
+ const string _name;
+ const Ice::CommunicatorPtr _communicator;
+ Ice::ObjectAdapterPtr _locatorAdapter;
+ Ice::ObjectAdapterPtr _replyAdapter;
+ LocatorIPtr _locator;
+};
+
+}
+
+//
+// Plugin factory function.
+//
+extern "C" ICE_LOCATOR_DISCOVERY_API Ice::Plugin*
+createIceLocatorDiscovery(const Ice::CommunicatorPtr& communicator, const string& name, const Ice::StringSeq&)
+{
+ return new PluginI(name, communicator);
+}
+
+namespace Ice
+{
+
+ICE_LOCATOR_DISCOVERY_API void
+registerIceLocatorDiscovery(bool loadOnInitialize)
+{
+ Ice::registerPluginFactory("IceLocatorDiscovery", createIceLocatorDiscovery, loadOnInitialize);
+}
+
}
-PluginI::PluginI(const Ice::CommunicatorPtr& communicator) : _communicator(communicator)
+//
+// Objective-C function to allow Objective-C programs to register plugin.
+//
+extern "C" ICE_LOCATOR_DISCOVERY_API void
+ICEregisterIceLocatorDiscovery(bool loadOnInitialize)
+{
+ Ice::registerIceLocatorDiscovery(loadOnInitialize);
+}
+
+PluginI::PluginI(const string& name, const Ice::CommunicatorPtr& communicator) :
+ _name(name), _communicator(communicator)
{
}
@@ -262,109 +277,80 @@ PluginI::initialize()
string address;
if(ipv4 && !preferIPv6)
{
- address = properties->getPropertyWithDefault("IceLocatorDiscovery.Address", "239.255.0.1");
+ address = properties->getPropertyWithDefault(_name + ".Address", "239.255.0.1");
}
else
{
- address = properties->getPropertyWithDefault("IceLocatorDiscovery.Address", "ff15::1");
+ address = properties->getPropertyWithDefault(_name + ".Address", "ff15::1");
}
- int port = properties->getPropertyAsIntWithDefault("IceLocatorDiscovery.Port", 4061);
- string intf = properties->getProperty("IceLocatorDiscovery.Interface");
+ int port = properties->getPropertyAsIntWithDefault(_name + ".Port", 4061);
+ string intf = properties->getProperty(_name + ".Interface");
- if(properties->getProperty("IceLocatorDiscovery.Reply.Endpoints").empty())
+ string lookupEndpoints = properties->getProperty(_name + ".Lookup");
+ if(lookupEndpoints.empty())
{
- ostringstream os;
- os << "udp";
- if(!intf.empty())
+ //
+ // If no lookup endpoints are specified, we get all the network interfaces and create
+ // an endpoint for each of them. We'll send UDP multicast packages on each interface.
+ //
+ IceInternal::ProtocolSupport protocol = ipv4 && !preferIPv6 ? IceInternal::EnableIPv4 : IceInternal::EnableIPv6;
+ vector<string> interfaces = IceInternal::getInterfacesForMulticast(intf, protocol);
+ ostringstream lookup;
+ for(vector<string>::const_iterator p = interfaces.begin(); p != interfaces.end(); ++p)
{
- os << " -h \"" << intf << "\"";
+ if(p != interfaces.begin())
+ {
+ lookup << ":";
+ }
+ lookup << "udp -h \"" << address << "\" -p " << port << " --interface \"" << *p << "\"";
}
- properties->setProperty("IceLocatorDiscovery.Reply.Endpoints", os.str());
+ lookupEndpoints = lookup.str();
+ }
+
+ if(properties->getProperty(_name + ".Reply.Endpoints").empty())
+ {
+ properties->setProperty(_name + ".Reply.Endpoints", "udp -h " + (intf.empty() ? "*" : "\"" + intf + "\""));
}
- if(properties->getProperty("IceLocatorDiscovery.Locator.Endpoints").empty())
+
+ if(properties->getProperty(_name + ".Locator.Endpoints").empty())
{
- properties->setProperty("IceLocatorDiscovery.Locator.AdapterId", Ice::generateUUID()); // Collocated adapter
+ properties->setProperty(_name + ".Locator.AdapterId", Ice::generateUUID()); // Collocated adapter
}
- _replyAdapter = _communicator->createObjectAdapter("IceLocatorDiscovery.Reply");
- _locatorAdapter = _communicator->createObjectAdapter("IceLocatorDiscovery.Locator");
+ _replyAdapter = _communicator->createObjectAdapter(_name + ".Reply");
+ _locatorAdapter = _communicator->createObjectAdapter(_name + ".Locator");
// We don't want those adapters to be registered with the locator so clear their locator.
_replyAdapter->setLocator(0);
_locatorAdapter->setLocator(0);
- string lookupEndpoints = properties->getProperty("IceLocatorDiscovery.Lookup");
- if(lookupEndpoints.empty())
- {
- ostringstream os;
- os << "udp -h \"" << address << "\" -p " << port;
- if(!intf.empty())
- {
- os << " --interface \"" << intf << "\"";
- }
- lookupEndpoints = os.str();
- }
-
Ice::ObjectPrxPtr lookupPrx = _communicator->stringToProxy("IceLocatorDiscovery/Lookup -d:" + lookupEndpoints);
lookupPrx = lookupPrx->ice_collocationOptimized(false); // No collocation optimization for the multicast proxy!
- try
- {
- // Ensure we can establish a connection to the multicast proxy
- // but don't block.
-#ifdef ICE_CPP11_MAPPING
- promise<bool> sent;
- promise<void> completed;
-
- lookupPrx->ice_getConnectionAsync(
- [&](shared_ptr<Ice::Connection>)
- {
- completed.set_value();
- },
- [&](exception_ptr ex)
- {
- completed.set_exception(ex);
- },
- [&](bool sentSynchronously)
- {
- sent.set_value(sentSynchronously);
- });
- if(sent.get_future().get())
- {
- completed.get_future().get();
- }
-#else
- Ice::AsyncResultPtr result = lookupPrx->begin_ice_getConnection();
- if(result->sentSynchronously())
- {
- lookupPrx->end_ice_getConnection(result);
- }
-#endif
- }
- catch(const Ice::LocalException& ex)
- {
- ostringstream os;
- os << "IceLocatorDiscovery is unable to establish a multicast connection:\n";
- os << "proxy = " << lookupPrx << '\n';
- os << ex;
- throw Ice::PluginInitializationException(__FILE__, __LINE__, os.str());
- }
- Ice::LocatorPrxPtr voidLocator = ICE_UNCHECKED_CAST(Ice::LocatorPrx, _locatorAdapter->addWithUUID(ICE_MAKE_SHARED(VoidLocatorI)));
+ Ice::LocatorPrxPtr voidLocator = ICE_UNCHECKED_CAST(Ice::LocatorPrx,
+ _locatorAdapter->addWithUUID(ICE_MAKE_SHARED(VoidLocatorI)));
- string instanceName = properties->getProperty("IceLocatorDiscovery.InstanceName");
+ string instanceName = properties->getProperty(_name + ".InstanceName");
Ice::Identity id;
id.name = "Locator";
id.category = !instanceName.empty() ? instanceName : Ice::generateUUID();
- LocatorIPtr locator = ICE_MAKE_SHARED(LocatorI, ICE_UNCHECKED_CAST(LookupPrx, lookupPrx), properties, instanceName, voidLocator);
- _communicator->setDefaultLocator(ICE_UNCHECKED_CAST(Ice::LocatorPrx, _locatorAdapter->add(locator, id)));
+ _locator = ICE_MAKE_SHARED(LocatorI, _name, ICE_UNCHECKED_CAST(LookupPrx, lookupPrx), properties, instanceName,
+ voidLocator);
+ _communicator->setDefaultLocator(ICE_UNCHECKED_CAST(Ice::LocatorPrx, _locatorAdapter->add(_locator, id)));
- Ice::ObjectPrxPtr lookupReply = _replyAdapter->addWithUUID(ICE_MAKE_SHARED(LookupReplyI, locator))->ice_datagram();
- locator->setLookupReply(ICE_UNCHECKED_CAST(LookupReplyPrx, lookupReply));
+ Ice::ObjectPrxPtr lookupReply = _replyAdapter->addWithUUID(ICE_MAKE_SHARED(LookupReplyI, _locator))->ice_datagram();
+ _locator->setLookupReply(ICE_UNCHECKED_CAST(LookupReplyPrx, lookupReply));
_replyAdapter->activate();
_locatorAdapter->activate();
}
+vector<Ice::LocatorPrxPtr>
+PluginI::getLocators(const string& instanceName, const IceUtil::Time& waitTime) const
+{
+ return _locator->getLocators(instanceName, waitTime);
+}
+
void
PluginI::destroy()
{
@@ -545,14 +531,14 @@ Request::exception(const Ice::Exception& ex)
#endif
}
-LocatorI::LocatorI(const LookupPrxPtr& lookup,
+LocatorI::LocatorI(const string& name,
+ const LookupPrxPtr& lookup,
const Ice::PropertiesPtr& p,
const string& instanceName,
const Ice::LocatorPrxPtr& voidLocator) :
- _lookup(lookup),
- _timeout(IceUtil::Time::milliSeconds(p->getPropertyAsIntWithDefault("IceLocatorDiscovery.Timeout", 300))),
- _retryCount(p->getPropertyAsIntWithDefault("IceLocatorDiscovery.RetryCount", 3)),
- _retryDelay(IceUtil::Time::milliSeconds(p->getPropertyAsIntWithDefault("IceLocatorDiscovery.RetryDelay", 2000))),
+ _timeout(IceUtil::Time::milliSeconds(p->getPropertyAsIntWithDefault(name + ".Timeout", 300))),
+ _retryCount(p->getPropertyAsIntWithDefault(name + ".RetryCount", 3)),
+ _retryDelay(IceUtil::Time::milliSeconds(p->getPropertyAsIntWithDefault(name + ".RetryDelay", 2000))),
_timer(IceInternal::getInstanceTimer(lookup->ice_getCommunicator())),
_instanceName(instanceName),
_warned(false),
@@ -560,12 +546,77 @@ LocatorI::LocatorI(const LookupPrxPtr& lookup,
_voidLocator(voidLocator),
_pendingRetryCount(0)
{
+#ifndef ICE_CPP11_MAPPING
+ __setNoDelete(true);
+#endif
+ try
+ {
+ // Ensure we can establish a connection to the multicast proxy
+ lookup->ice_getConnection();
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ ostringstream os;
+ os << "IceLocatorDiscovery is unable to establish a multicast connection:\n";
+ os << "proxy = " << lookup << '\n';
+ os << ex;
+ throw Ice::PluginInitializationException(__FILE__, __LINE__, os.str());
+ }
+
+ //
+ // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast
+ // datagram on each endpoint.
+ //
+ Ice::EndpointSeq endpoints = lookup->ice_getEndpoints();
+ for(vector<Ice::EndpointPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
+ {
+ try
+ {
+ Ice::EndpointSeq single;
+ single.push_back(*p);
+ LookupPrxPtr l = lookup->ice_endpoints(single);
+ l->ice_getConnection();
+ _lookup.push_back(make_pair(l, LookupReplyPrxPtr()));
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ }
+ assert(!_lookup.empty());
+#ifndef ICE_CPP11_MAPPING
+ __setNoDelete(false);
+#endif
}
void
LocatorI::setLookupReply(const LookupReplyPrxPtr& lookupReply)
{
- _lookupReply = lookupReply;
+ //
+ // Use a lookup reply proxy whose adress matches the interface used to send multicast datagrams.
+ //
+ for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::iterator p = _lookup.begin(); p != _lookup.end(); ++p)
+ {
+ Ice::UDPEndpointInfoPtr info = ICE_DYNAMIC_CAST(Ice::UDPEndpointInfo, p->first->ice_getEndpoints()[0]->getInfo());
+ if(info && !info->mcastInterface.empty())
+ {
+ Ice::EndpointSeq endpts = lookupReply->ice_getEndpoints();
+ for(Ice::EndpointSeq::const_iterator q = endpts.begin(); q != endpts.end(); ++q)
+ {
+ Ice::IPEndpointInfoPtr r = ICE_DYNAMIC_CAST(Ice::IPEndpointInfo, (*q)->getInfo());
+ if(r && r->host == info->mcastInterface)
+ {
+ Ice::EndpointSeq single;
+ single.push_back(*q);
+ p->second = lookupReply->ice_endpoints(single);
+ }
+ }
+ }
+
+ if(!p->second)
+ {
+ p->second = lookupReply; // Fallback: just use the given lookup reply proxy if no matching endpoint found.
+ }
+ }
}
#ifdef ICE_CPP11_MAPPING
@@ -588,6 +639,50 @@ LocatorI::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr& amdCB,
}
#endif
+vector<Ice::LocatorPrxPtr>
+LocatorI::getLocators(const string& instanceName, const IceUtil::Time& waitTime)
+{
+ //
+ // Clear locators from previous search
+ //
+ {
+ Lock sync(*this);
+ _locators.clear();
+ }
+
+ //
+ // Find a locator
+ //
+ invoke(ICE_NULLPTR, ICE_NULLPTR);
+
+ //
+ // Wait for responses
+ //
+ if(instanceName.empty())
+ {
+ IceUtil::ThreadControl::sleep(waitTime);
+ }
+ else
+ {
+ Lock sync(*this);
+ while(_locators.find(instanceName) == _locators.end() && _pendingRetryCount > 0)
+ {
+ timedWait(waitTime);
+ }
+ }
+
+ //
+ // Return found locators
+ //
+ Lock sync(*this);
+ vector<Ice::LocatorPrxPtr> locators;
+ for(map<string, Ice::LocatorPrxPtr>::const_iterator p = _locators.begin(); p != _locators.end(); ++p)
+ {
+ locators.push_back(p->second);
+ }
+ return locators;
+}
+
void
LocatorI::foundLocator(const Ice::LocatorPrxPtr& locator)
{
@@ -601,7 +696,8 @@ LocatorI::foundLocator(const Ice::LocatorPrxPtr& locator)
// If we already have a locator assigned, ensure the given locator
// has the same identity, otherwise ignore it.
//
- if(_locator && locator->ice_getIdentity().category != _locator->ice_getIdentity().category)
+ if(!_pendingRequests.empty() &&
+ _locator && locator->ice_getIdentity().category != _locator->ice_getIdentity().category)
{
if(!_warned)
{
@@ -624,13 +720,14 @@ LocatorI::foundLocator(const Ice::LocatorPrxPtr& locator)
_pendingRetryCount = 0;
}
- if(_locator)
+ Ice::LocatorPrxPtr l = _pendingRequests.empty() ? _locators[locator->ice_getIdentity().category] : _locator;
+ if(l)
{
//
// We found another locator replica, append its endpoints to the
// current locator proxy endpoints.
//
- Ice::EndpointSeq newEndpoints = _locator->ice_getEndpoints();
+ Ice::EndpointSeq newEndpoints = l->ice_getEndpoints();
Ice::EndpointSeq endpts = locator->ice_getEndpoints();
for(Ice::EndpointSeq::const_iterator p = endpts.begin(); p != endpts.end(); ++p)
{
@@ -651,25 +748,35 @@ LocatorI::foundLocator(const Ice::LocatorPrxPtr& locator)
newEndpoints.push_back(*p);
}
}
- _locator = _locator->ice_endpoints(newEndpoints);
+ l = l->ice_endpoints(newEndpoints);
+ }
+ else
+ {
+ l = locator;
+ }
+
+ if(_pendingRequests.empty())
+ {
+ _locators[locator->ice_getIdentity().category] = l;
+ notify();
}
else
{
- _locator = locator;
+ _locator = l;
if(_instanceName.empty())
{
_instanceName = _locator->ice_getIdentity().category; // Stick to the first discovered locator.
}
- }
- //
- // Send pending requests if any.
- //
- for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p)
- {
- (*p)->invoke(_locator);
+ //
+ // Send pending requests if any.
+ //
+ for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p)
+ {
+ (*p)->invoke(_locator);
+ }
+ _pendingRequests.clear();
}
- _pendingRequests.clear();
}
void
@@ -678,28 +785,41 @@ LocatorI::invoke(const Ice::LocatorPrxPtr& locator, const RequestPtr& request)
Lock sync(*this);
if(_locator && _locator != locator)
{
- request->invoke(_locator);
+ if(request)
+ {
+ request->invoke(_locator);
+ }
}
else if(IceUtil::Time::now() < _nextRetry)
{
- request->invoke(_voidLocator); // Don't retry to find a locator before the retry delay expires
+ if(request)
+ {
+ request->invoke(_voidLocator); // Don't retry to find a locator before the retry delay expires
+ }
}
else
{
_locator = 0;
- _pendingRequests.push_back(request);
+ if(request)
+ {
+ _pendingRequests.push_back(request);
+ }
if(_pendingRetryCount == 0) // No request in progress
{
_pendingRetryCount = _retryCount;
try
{
+ for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin();
+ l != _lookup.end(); ++l)
+ {
#ifdef ICE_CPP11_MAPPING
- _lookup->findLocatorAsync(_instanceName, _lookupReply); // Send multicast request.
+ l->first->findLocatorAsync(_instanceName, l->second); // Send multicast request.
#else
- _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
+ l->first->begin_findLocator(_instanceName, l->second); // Send multicast request.
#endif
+ }
_timer->schedule(ICE_SHARED_FROM_THIS, _timeout);
}
catch(const Ice::LocalException&)
@@ -723,11 +843,15 @@ LocatorI::runTimerTask()
{
try
{
+ for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin();
+ l != _lookup.end(); ++l)
+ {
#ifdef ICE_CPP11_MAPPING
- _lookup->findLocatorAsync(_instanceName, _lookupReply); // Send multicast request.
+ l->first->findLocatorAsync(_instanceName, l->second); // Send multicast request.
#else
- _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
+ l->first->begin_findLocator(_instanceName, l->second); // Send multicast request.
#endif
+ }
_timer->schedule(ICE_SHARED_FROM_THIS, _timeout);
return;
}
@@ -737,11 +861,18 @@ LocatorI::runTimerTask()
_pendingRetryCount = 0;
}
- for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p)
+ if(_pendingRequests.empty())
+ {
+ notify();
+ }
+ else
{
- (*p)->invoke(_voidLocator); // Send pending requests on void locator.
+ for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p)
+ {
+ (*p)->invoke(_voidLocator); // Send pending requests on void locator.
+ }
+ _pendingRequests.clear();
}
- _pendingRequests.clear();
_nextRetry = IceUtil::Time::now() + _retryDelay; // Only retry when the retry delay expires
}