summaryrefslogtreecommitdiff
path: root/cpp/src/IceLocatorDiscovery/PluginI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceLocatorDiscovery/PluginI.cpp')
-rw-r--r--cpp/src/IceLocatorDiscovery/PluginI.cpp204
1 files changed, 132 insertions, 72 deletions
diff --git a/cpp/src/IceLocatorDiscovery/PluginI.cpp b/cpp/src/IceLocatorDiscovery/PluginI.cpp
index ee01719fd72..112147f8978 100644
--- a/cpp/src/IceLocatorDiscovery/PluginI.cpp
+++ b/cpp/src/IceLocatorDiscovery/PluginI.cpp
@@ -10,6 +10,7 @@
#include <IceUtil/IceUtil.h>
#include <Ice/Ice.h>
#include <Ice/Network.h> // For getInterfacesForMulticast
+#include <Ice/LoggerUtil.h>
#include <IceLocatorDiscovery/Plugin.h>
#include <IceLocatorDiscovery/IceLocatorDiscovery.h>
@@ -22,6 +23,14 @@ namespace
class LocatorI; // Forward declaration
+#ifdef ICE_CPP11_MAPPING
+typedef std::pair<function<void(bool, const pair<const Ice::Byte*, const Ice::Byte*>&)>,
+ function<void(exception_ptr)>> AMDCallback;
+#else
+typedef Ice::AMD_Object_ice_invokePtr AMDCallback;
+#endif
+
+
class Request :
#ifdef ICE_CPP11_MAPPING
public std::enable_shared_from_this<Request>
@@ -31,35 +40,18 @@ class Request :
{
public:
-#ifdef ICE_CPP11_MAPPING
- Request(LocatorI* locator,
- const string& operation,
- Ice::OperationMode mode,
- const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
- const Ice::Context& ctx,
- function<void(bool, const pair<const Ice::Byte*, const Ice::Byte*>&)> responseCB,
- function<void(exception_ptr)> exceptionCB) :
- _locator(locator),
- _operation(operation),
- _mode(mode),
- _context(ctx),
- _inParams(inParams.first, inParams.second),
- _responseCB(move(responseCB)),
- _exceptionCB(move(exceptionCB))
-#else
Request(LocatorI* locator,
const string& operation,
Ice::OperationMode mode,
const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
const Ice::Context& ctx,
- const Ice::AMD_Object_ice_invokePtr& amdCB) :
+ const AMDCallback& amdCB) :
_locator(locator),
_operation(operation),
_mode(mode),
_context(ctx),
_inParams(inParams.first, inParams.second),
_amdCB(amdCB)
-#endif
{
}
@@ -74,12 +66,10 @@ protected:
const Ice::OperationMode _mode;
const Ice::Context _context;
const Ice::ByteSeq _inParams;
+ AMDCallback _amdCB;
#ifdef ICE_CPP11_MAPPING
- function<void(bool, const pair<const Ice::Byte*, const Ice::Byte*>&)> _responseCB;
- function<void(exception_ptr)> _exceptionCB;
exception_ptr _exception;
#else
- const Ice::AMD_Object_ice_invokePtr _amdCB;
IceInternal::UniquePtr<Ice::Exception> _exception;
#endif
@@ -114,11 +104,14 @@ public:
vector<Ice::LocatorPrxPtr> getLocators(const string&, const IceUtil::Time&);
+ void exception(const Ice::LocalException&);
+
private:
virtual void runTimerTask();
- vector<pair<LookupPrxPtr, LookupReplyPrxPtr> > _lookup;
+ LookupPrxPtr _lookup;
+ vector<pair<LookupPrxPtr, LookupReplyPrxPtr> > _lookups;
const IceUtil::Time _timeout;
const int _retryCount;
const IceUtil::Time _retryDelay;
@@ -132,6 +125,8 @@ private:
IceUtil::Time _nextRetry;
int _pendingRetryCount;
+ int _failureCount;
+ bool _warnOnce;
vector<RequestPtr> _pendingRequests;
};
ICE_DEFINE_PTR(LocatorIPtr, LocatorI);
@@ -229,6 +224,36 @@ private:
Ice::LocatorPrxPtr _defaultLocator;
};
+#ifndef ICE_CPP11_MAPPING
+
+class CallbackI : public IceUtil::Shared
+{
+public:
+
+ CallbackI(const LocatorIPtr& locator) : _locator(locator)
+ {
+ }
+
+ void
+ completed(const Ice::AsyncResultPtr& result)
+ {
+ try
+ {
+ result->throwLocalException();
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ _locator->exception(ex);
+ }
+ }
+
+private:
+
+ LocatorIPtr _locator;
+};
+
+#endif
+
}
//
@@ -417,7 +442,7 @@ Request::invoke(const Ice::LocatorPrxPtr& l)
else
{
assert(_exception); // Don't retry if the proxy didn't change
- _exceptionCB(_exception);
+ _amdCB.second(_exception);
}
#else
@@ -447,7 +472,7 @@ void
Request::response(bool ok, const pair<const Ice::Byte*, const Ice::Byte*>& outParams)
{
#ifdef ICE_CPP11_MAPPING
- _responseCB(ok, outParams);
+ _amdCB.first(ok, outParams);
#else
_amdCB->ice_response(ok, outParams);
#endif
@@ -463,11 +488,11 @@ Request::exception(const Ice::Exception& ex)
}
catch(const Ice::RequestFailedException&)
{
- _exceptionCB(current_exception());
+ _amdCB.second(current_exception());
}
catch(const Ice::UnknownException&)
{
- _exceptionCB(current_exception());
+ _amdCB.second(current_exception());
}
catch(const Ice::NoEndpointException&)
{
@@ -477,7 +502,7 @@ Request::exception(const Ice::Exception& ex)
}
catch(...)
{
- _exceptionCB(current_exception());
+ _amdCB.second(current_exception());
}
}
catch(const Ice::CommunicatorDestroyedException&)
@@ -488,7 +513,7 @@ Request::exception(const Ice::Exception& ex)
}
catch(...)
{
- _exceptionCB(current_exception());
+ _amdCB.second(current_exception());
}
}
catch(const Ice::ObjectAdapterDeactivatedException&)
@@ -499,7 +524,7 @@ Request::exception(const Ice::Exception& ex)
}
catch(...)
{
- _exceptionCB(current_exception());
+ _amdCB.second(current_exception());
}
}
catch(const Ice::Exception&)
@@ -545,6 +570,7 @@ LocatorI::LocatorI(const string& name,
const Ice::PropertiesPtr& p,
const string& instanceName,
const Ice::LocatorPrxPtr& voidLocator) :
+ _lookup(lookup),
_timeout(IceUtil::Time::milliSeconds(p->getPropertyAsIntWithDefault(name + ".Timeout", 300))),
_retryCount(p->getPropertyAsIntWithDefault(name + ".RetryCount", 3)),
_retryDelay(IceUtil::Time::milliSeconds(p->getPropertyAsIntWithDefault(name + ".RetryDelay", 2000))),
@@ -553,25 +579,10 @@ LocatorI::LocatorI(const string& name,
_warned(false),
_locator(lookup->ice_getCommunicator()->getDefaultLocator()),
_voidLocator(voidLocator),
- _pendingRetryCount(0)
+ _pendingRetryCount(0),
+ _failureCount(0),
+ _warnOnce(true)
{
-#ifndef ICE_CPP11_MAPPING
- __setNoDelete(true);
-#endif
- try
- {
- // Ensure we can establish a connection to the multicast proxy
- lookup->ice_getConnection();
- }
- catch(const Ice::LocalException& ex)
- {
- ostringstream os;
- os << "IceLocatorDiscovery is unable to establish a multicast connection:\n";
- os << "proxy = " << lookup << '\n';
- os << ex;
- throw Ice::PluginInitializationException(__FILE__, __LINE__, os.str());
- }
-
//
// Create one lookup proxy per endpoint from the given proxy. We want to send a multicast
// datagram on each endpoint.
@@ -579,22 +590,11 @@ LocatorI::LocatorI(const string& name,
Ice::EndpointSeq endpoints = lookup->ice_getEndpoints();
for(vector<Ice::EndpointPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
{
- try
- {
- Ice::EndpointSeq single;
- single.push_back(*p);
- LookupPrxPtr l = lookup->ice_endpoints(single);
- l->ice_getConnection();
- _lookup.push_back(make_pair(l, LookupReplyPrxPtr()));
- }
- catch(const Ice::LocalException&)
- {
- }
+ Ice::EndpointSeq single;
+ single.push_back(*p);
+ _lookups.push_back(make_pair(lookup->ice_endpoints(single), LookupReplyPrxPtr()));
}
- assert(!_lookup.empty());
-#ifndef ICE_CPP11_MAPPING
- __setNoDelete(false);
-#endif
+ assert(!_lookups.empty());
}
void
@@ -603,7 +603,7 @@ LocatorI::setLookupReply(const LookupReplyPrxPtr& lookupReply)
//
// Use a lookup reply proxy whose adress matches the interface used to send multicast datagrams.
//
- for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::iterator p = _lookup.begin(); p != _lookup.end(); ++p)
+ for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::iterator p = _lookups.begin(); p != _lookups.end(); ++p)
{
Ice::UDPEndpointInfoPtr info = ICE_DYNAMIC_CAST(Ice::UDPEndpointInfo, p->first->ice_getEndpoints()[0]->getInfo());
if(info && !info->mcastInterface.empty())
@@ -636,7 +636,7 @@ LocatorI::ice_invokeAsync(pair<const Ice::Byte*, const Ice::Byte*> inParams,
const Ice::Current& current)
{
invoke(nullptr, make_shared<Request>(this, current.operation, current.mode, inParams, current.ctx,
- move(responseCB), move(exceptionCB)));
+ make_pair(move(responseCB), move(exceptionCB))));
}
#else
void
@@ -811,16 +811,29 @@ LocatorI::invoke(const Ice::LocatorPrxPtr& locator, const RequestPtr& request)
if(_pendingRetryCount == 0) // No request in progress
{
+ _failureCount = 0;
_pendingRetryCount = _retryCount;
try
{
- for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin();
- l != _lookup.end(); ++l)
+ for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookups.begin();
+ l != _lookups.end(); ++l)
{
#ifdef ICE_CPP11_MAPPING
- l->first->findLocatorAsync(_instanceName, l->second); // Send multicast request.
+ auto self = shared_from_this();
+ l->first->findLocatorAsync(_instanceName, l->second, nullptr, [self](exception_ptr ex)
+ {
+ try
+ {
+ rethrow_exception(ex);
+ }
+ catch(const Ice::LocalException& e)
+ {
+ self->exception(e);
+ }
+ });
#else
- l->first->begin_findLocator(_instanceName, l->second); // Send multicast request.
+ l->first->begin_findLocator(_instanceName, l->second, Ice::newCallback(new CallbackI(this),
+ &CallbackI::completed));
#endif
}
_timer->schedule(ICE_SHARED_FROM_THIS, _timeout);
@@ -839,6 +852,40 @@ LocatorI::invoke(const Ice::LocatorPrxPtr& locator, const RequestPtr& request)
}
void
+LocatorI::exception(const Ice::LocalException& ex)
+{
+ Lock sync(*this);
+ if(++_failureCount == _lookups.size() && _pendingRetryCount > 0)
+ {
+ //
+ // All the lookup calls failed, cancel the timer and propagate the error to the requests.
+ //
+ _timer->cancel(ICE_SHARED_FROM_THIS);
+ _pendingRetryCount = 0;
+
+ if(_warnOnce)
+ {
+ Ice::Warning warn(_lookup->ice_getCommunicator()->getLogger());
+ warn << "failed to lookup locator with lookup proxy `" << _lookup << "':\n" << ex;
+ _warnOnce = false;
+ }
+
+ if(_pendingRequests.empty())
+ {
+ notify();
+ }
+ else
+ {
+ for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p)
+ {
+ (*p)->invoke(_voidLocator);
+ }
+ _pendingRequests.clear();
+ }
+ }
+}
+
+void
LocatorI::runTimerTask()
{
Lock sync(*this);
@@ -846,13 +893,26 @@ LocatorI::runTimerTask()
{
try
{
- for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookup.begin();
- l != _lookup.end(); ++l)
+ _failureCount = 0;
+ for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator l = _lookups.begin();
+ l != _lookups.end(); ++l)
{
#ifdef ICE_CPP11_MAPPING
- l->first->findLocatorAsync(_instanceName, l->second); // Send multicast request.
+ auto self = shared_from_this();
+ l->first->findLocatorAsync(_instanceName, l->second, nullptr, [self](exception_ptr ex)
+ {
+ try
+ {
+ rethrow_exception(ex);
+ }
+ catch(const Ice::LocalException& e)
+ {
+ self->exception(e);
+ }
+ });
#else
- l->first->begin_findLocator(_instanceName, l->second); // Send multicast request.
+ l->first->begin_findLocator(_instanceName, l->second, Ice::newCallback(new CallbackI(this),
+ &CallbackI::completed));
#endif
}
_timer->schedule(ICE_SHARED_FROM_THIS, _timeout);