diff options
Diffstat (limited to 'java')
4 files changed, 305 insertions, 90 deletions
diff --git a/java/src/IceDiscovery/src/main/java/com/zeroc/IceDiscovery/LookupI.java b/java/src/IceDiscovery/src/main/java/com/zeroc/IceDiscovery/LookupI.java index ac99857d3ae..0de78d3215b 100644 --- a/java/src/IceDiscovery/src/main/java/com/zeroc/IceDiscovery/LookupI.java +++ b/java/src/IceDiscovery/src/main/java/com/zeroc/IceDiscovery/LookupI.java @@ -22,7 +22,7 @@ class LookupI implements Lookup Request(T id, int retryCount) { _id = id; - _nRetry = retryCount; + _retryCount = retryCount; } T getId() @@ -38,7 +38,27 @@ class LookupI implements Lookup boolean retry() { - return --_nRetry >= 0; + return --_retryCount >= 0; + } + + void invoke(String domainId, Map<LookupPrx, LookupReplyPrx> lookups) + { + _lookupCount = lookups.size(); + _failureCount = 0; + for(Map.Entry<LookupPrx, LookupReplyPrx> entry : lookups.entrySet()) + { + invokeWithLookup(domainId, entry.getKey(), entry.getValue()); + } + } + + boolean exception() + { + if(++_failureCount == _lookupCount) + { + finished(null); + return true; + } + return false; } void scheduleTimer(long timeout) @@ -53,9 +73,15 @@ class LookupI implements Lookup _future = null; } - protected int _nRetry; + abstract void finished(com.zeroc.Ice.ObjectPrx proxy); + + abstract protected void invokeWithLookup(String domainId, LookupPrx lookup, LookupReplyPrx lookupReply); + + protected int _retryCount; + protected int _lookupCount; + protected int _failureCount; protected List<CompletableFuture<Ret>> _futures = new ArrayList<>(); - private T _id; + protected T _id; protected java.util.concurrent.Future<?> _future; } @@ -71,7 +97,7 @@ class LookupI implements Lookup @Override boolean retry() { - return _proxies.size() == 0 && --_nRetry >= 0; + return _proxies.size() == 0 && --_retryCount >= 0; } boolean response(com.zeroc.Ice.ObjectPrx proxy, boolean isReplicaGroup) @@ -95,6 +121,7 @@ class LookupI implements Lookup return true; } + @Override void finished(com.zeroc.Ice.ObjectPrx proxy) { if(proxy != null || _proxies.isEmpty()) @@ -127,6 +154,17 @@ class LookupI implements Lookup adapterRequestTimedOut(this); } + @Override + protected void invokeWithLookup(String domainId, LookupPrx lookup, LookupReplyPrx lookupReply) + { + lookup.findAdapterByIdAsync(domainId, _id, lookupReply).whenComplete((v, ex) -> { + if(ex != null) + { + adapterRequestException(AdapterRequest.this, ex); + } + }); + } + private void sendResponse(com.zeroc.Ice.ObjectPrx proxy) { for(CompletableFuture<com.zeroc.Ice.ObjectPrx> f : _futures) @@ -153,6 +191,7 @@ class LookupI implements Lookup finished(proxy); } + @Override void finished(com.zeroc.Ice.ObjectPrx proxy) { for(CompletableFuture<com.zeroc.Ice.ObjectPrx> f : _futures) @@ -167,51 +206,36 @@ class LookupI implements Lookup { objectRequestTimedOut(this); } + + @Override + protected void invokeWithLookup(String domainId, LookupPrx lookup, LookupReplyPrx lookupReply) + { + lookup.findObjectByIdAsync(domainId, _id, lookupReply).whenComplete((v, ex) -> { + if(ex != null) + { + objectRequestException(ObjectRequest.this, ex); + } + }); + } } public LookupI(LocatorRegistryI registry, LookupPrx lookup, com.zeroc.Ice.Properties properties) { _registry = registry; + _lookup = lookup; _timeout = properties.getPropertyAsIntWithDefault("IceDiscovery.Timeout", 300); _retryCount = properties.getPropertyAsIntWithDefault("IceDiscovery.RetryCount", 3); _latencyMultiplier = properties.getPropertyAsIntWithDefault("IceDiscovery.LatencyMultiplier", 1); _domainId = properties.getProperty("IceDiscovery.DomainId"); _timer = com.zeroc.IceInternal.Util.getInstance(lookup.ice_getCommunicator()).timer(); - try - { - lookup.ice_getConnection(); - } - catch(com.zeroc.Ice.LocalException ex) - { - StringBuilder b = new StringBuilder(); - b.append("IceDiscovery is unable to establish a multicast connection:\n"); - b.append("proxy = "); - b.append(lookup.toString()); - b.append('\n'); - b.append(ex.toString()); - throw new com.zeroc.Ice.PluginInitializationException(b.toString()); - } - - // - // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast - // datagram on each endpoint. - // com.zeroc.Ice.Endpoint[] single = new com.zeroc.Ice.Endpoint[1]; for(com.zeroc.Ice.Endpoint endpt : lookup.ice_getEndpoints()) { - try - { - single[0] = endpt; - LookupPrx l = (LookupPrx)lookup.ice_endpoints(single); - l.ice_getConnection(); - _lookup.put(l, null); - } - catch(com.zeroc.Ice.LocalException ex) - { - } + single[0] = endpt; + _lookups.put((LookupPrx)lookup.ice_endpoints(single), null); } - assert(!_lookup.isEmpty()); + assert(!_lookups.isEmpty()); } void setLookupReply(LookupReplyPrx lookupReply) @@ -220,7 +244,7 @@ class LookupI implements Lookup // Use a lookup reply proxy whose adress matches the interface used to send multicast datagrams. // com.zeroc.Ice.Endpoint[] single = new com.zeroc.Ice.Endpoint[1]; - for(Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookup.entrySet()) + for(Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookups.entrySet()) { com.zeroc.Ice.UDPEndpointInfo info = (com.zeroc.Ice.UDPEndpointInfo)entry.getKey().ice_getEndpoints()[0].getInfo(); @@ -311,10 +335,7 @@ class LookupI implements Lookup { try { - for(Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookup.entrySet()) - { - entry.getKey().findObjectByIdAsync(_domainId, id, entry.getValue()); - } + request.invoke(_domainId, _lookups); request.scheduleTimer(_timeout); } catch(com.zeroc.Ice.LocalException ex) @@ -338,10 +359,7 @@ class LookupI implements Lookup { try { - for(Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookup.entrySet()) - { - entry.getKey().findAdapterByIdAsync(_domainId, adapterId, entry.getValue()); - } + request.invoke(_domainId, _lookups); request.scheduleTimer(_timeout); } catch(com.zeroc.Ice.LocalException ex) @@ -392,10 +410,7 @@ class LookupI implements Lookup { try { - for(Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookup.entrySet()) - { - entry.getKey().findObjectByIdAsync(_domainId, request.getId(), entry.getValue()); - } + request.invoke(_domainId, _lookups); request.scheduleTimer(_timeout); return; } @@ -408,6 +423,34 @@ class LookupI implements Lookup _objectRequests.remove(request.getId()); } + + synchronized void objectRequestException(ObjectRequest request, Throwable ex) + { + ObjectRequest r = _objectRequests.get(request.getId()); + if(r == null || r != request) + { + return; + } + + if(request.exception()) + { + if(_warnOnce) + { + StringBuilder s = new StringBuilder(); + s.append("failed to lookup object `"); + s.append(_lookup.ice_getCommunicator().identityToString(request.getId())); + s.append("' with lookup proxy `"); + s.append(_lookup); + s.append("':\n"); + s.append(ex.toString()); + _lookup.ice_getCommunicator().getLogger().warning(s.toString()); + _warnOnce = false; + } + request.cancelTimer(); + _objectRequests.remove(request.getId()); + } + } + synchronized void adapterRequestTimedOut(AdapterRequest request) { AdapterRequest r = _adapterRequests.get(request.getId()); @@ -420,10 +463,7 @@ class LookupI implements Lookup { try { - for(Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookup.entrySet()) - { - entry.getKey().findAdapterByIdAsync(_domainId, request.getId(), entry.getValue()); - } + request.invoke(_domainId, _lookups); request.scheduleTimer(_timeout); return; } @@ -436,14 +476,43 @@ class LookupI implements Lookup _adapterRequests.remove(request.getId()); } + synchronized void adapterRequestException(AdapterRequest request, Throwable ex) + { + AdapterRequest r = _adapterRequests.get(request.getId()); + if(r == null || r != request) + { + return; + } + + if(request.exception()) + { + if(_warnOnce) + { + StringBuilder s = new StringBuilder(); + s.append("failed to lookup adapter `"); + s.append(request.getId()); + s.append("' with lookup proxy `"); + s.append(_lookup); + s.append("':\n"); + s.append(ex.toString()); + _lookup.ice_getCommunicator().getLogger().warning(s.toString()); + _warnOnce = false; + } + request.cancelTimer(); + _adapterRequests.remove(request.getId()); + } + } + private LocatorRegistryI _registry; - private java.util.Map<LookupPrx, LookupReplyPrx> _lookup = new java.util.HashMap<>(); + private LookupPrx _lookup; + private java.util.Map<LookupPrx, LookupReplyPrx> _lookups = new java.util.HashMap<>(); private final int _timeout; private final int _retryCount; private final int _latencyMultiplier; private final String _domainId; private final java.util.concurrent.ScheduledExecutorService _timer; + private boolean _warnOnce = true; private Map<com.zeroc.Ice.Identity, ObjectRequest> _objectRequests = new HashMap<>(); private Map<String, AdapterRequest> _adapterRequests = new HashMap<>(); diff --git a/java/src/IceLocatorDiscovery/src/main/java/com/zeroc/IceLocatorDiscovery/PluginI.java b/java/src/IceLocatorDiscovery/src/main/java/com/zeroc/IceLocatorDiscovery/PluginI.java index 61dd6f7cca1..4001913fc2e 100644 --- a/java/src/IceLocatorDiscovery/src/main/java/com/zeroc/IceLocatorDiscovery/PluginI.java +++ b/java/src/IceLocatorDiscovery/src/main/java/com/zeroc/IceLocatorDiscovery/PluginI.java @@ -141,6 +141,7 @@ class PluginI implements Plugin LocatorI(String name, LookupPrx lookup, com.zeroc.Ice.Properties properties, String instanceName, com.zeroc.Ice.LocatorPrx voidLocator) { + _lookup = lookup; _timeout = properties.getPropertyAsIntWithDefault(name + ".Timeout", 300); _retryCount = properties.getPropertyAsIntWithDefault(name + ".RetryCount", 3); _retryDelay = properties.getPropertyAsIntWithDefault(name + ".RetryDelay", 2000); @@ -150,21 +151,8 @@ class PluginI implements Plugin _locator = lookup.ice_getCommunicator().getDefaultLocator(); _voidLocator = voidLocator; _pendingRetryCount = 0; - - try - { - lookup.ice_getConnection(); - } - catch(com.zeroc.Ice.LocalException ex) - { - StringBuilder b = new StringBuilder(); - b.append("IceDiscovery is unable to establish a multicast connection:\n"); - b.append("proxy = "); - b.append(lookup.toString()); - b.append('\n'); - b.append(ex.toString()); - throw new com.zeroc.Ice.PluginInitializationException(b.toString()); - } + _failureCount = 0; + _warnOnce = true; // // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast @@ -173,18 +161,10 @@ class PluginI implements Plugin com.zeroc.Ice.Endpoint[] single = new com.zeroc.Ice.Endpoint[1]; for(com.zeroc.Ice.Endpoint endpt : lookup.ice_getEndpoints()) { - try - { - single[0] = endpt; - LookupPrx l = (LookupPrx)lookup.ice_endpoints(single); - l.ice_getConnection(); - _lookup.put(l, null); - } - catch(com.zeroc.Ice.LocalException ex) - { - } + single[0] = endpt; + _lookups.put((LookupPrx)lookup.ice_endpoints(single), null); } - assert(!_lookup.isEmpty()); + assert(!_lookups.isEmpty()); } public void setLookupReply(LookupReplyPrx lookupReply) @@ -193,7 +173,7 @@ class PluginI implements Plugin // Use a lookup reply proxy whose adress matches the interface used to send multicast datagrams. // com.zeroc.Ice.Endpoint[] single = new com.zeroc.Ice.Endpoint[1]; - for(Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookup.entrySet()) + for(Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookups.entrySet()) { com.zeroc.Ice.UDPEndpointInfo info = (com.zeroc.Ice.UDPEndpointInfo)entry.getKey().ice_getEndpoints()[0].getInfo(); @@ -228,8 +208,7 @@ class PluginI implements Plugin return f; } - public List<com.zeroc.Ice.LocatorPrx> - getLocators(String instanceName, int waitTime) + public List<com.zeroc.Ice.LocatorPrx> getLocators(String instanceName, int waitTime) { // // Clear locators from previous search. @@ -398,12 +377,18 @@ class PluginI implements Plugin if(_pendingRetryCount == 0) // No request in progress { + _failureCount = 0; _pendingRetryCount = _retryCount; try { - for(Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookup.entrySet()) + for(Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookups.entrySet()) { - entry.getKey().findLocatorAsync(_instanceName, entry.getValue()); // Send multicast request. + entry.getKey().findLocatorAsync(_instanceName, entry.getValue()).whenComplete((v, ex) -> { + if(ex != null) + { + exception(ex); + } + }); // Send multicast request. } _future = _timer.schedule(_retryTask, _timeout, java.util.concurrent.TimeUnit.MILLISECONDS); } @@ -420,6 +405,44 @@ class PluginI implements Plugin } } + synchronized void exception(Throwable ex) + { + if(++_failureCount == _lookups.size() && _pendingRetryCount > 0) + { + // + // All the lookup calls failed, cancel the timer and propagate the error to the requests. + // + _future.cancel(false); + _future = null; + + _pendingRetryCount = 0; + + if(_warnOnce) + { + StringBuilder builder = new StringBuilder(); + builder.append("failed to lookup locator with lookup proxy `"); + builder.append(_lookup); + builder.append("':\n"); + builder.append(ex); + _lookup.ice_getCommunicator().getLogger().warning(builder.toString()); + _warnOnce = false; + } + + if(_pendingRequests.isEmpty()) + { + notify(); + } + else + { + for(Request req : _pendingRequests) + { + req.invoke(_voidLocator); + } + _pendingRequests.clear(); + } + } + } + private Runnable _retryTask = new Runnable() { @Override @@ -431,9 +454,15 @@ class PluginI implements Plugin { try { - for(Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookup.entrySet()) + _failureCount = 0; + for(Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookups.entrySet()) { - entry.getKey().findLocatorAsync(_instanceName, entry.getValue()); // Send multicast request + entry.getKey().findLocatorAsync(_instanceName, entry.getValue()).whenComplete((v, ex) -> { + if(ex != null) + { + exception(ex); + } + }); // Send multicast request. } _future = _timer.schedule(_retryTask, _timeout, java.util.concurrent.TimeUnit.MILLISECONDS); return; @@ -444,18 +473,26 @@ class PluginI implements Plugin _pendingRetryCount = 0; } - for(Request req : _pendingRequests) + if(_pendingRequests.isEmpty()) { - req.invoke(_voidLocator); + notify(); + } + else + { + for(Request req : _pendingRequests) + { + req.invoke(_voidLocator); + } + _pendingRequests.clear(); } - _pendingRequests.clear(); _nextRetry = com.zeroc.IceInternal.Time.currentMonotonicTimeMillis() + _retryDelay; } } }; - private final Map<LookupPrx, LookupReplyPrx> _lookup = new java.util.HashMap<>(); + private final LookupPrx _lookup; + private final Map<LookupPrx, LookupReplyPrx> _lookups = new java.util.HashMap<>(); private final int _timeout; private java.util.concurrent.Future<?> _future; private final java.util.concurrent.ScheduledExecutorService _timer; @@ -469,6 +506,8 @@ class PluginI implements Plugin private Map<String, com.zeroc.Ice.LocatorPrx> _locators = new HashMap<>(); private int _pendingRetryCount; + private int _failureCount; + private boolean _warnOnce; private List<Request> _pendingRequests = new ArrayList<>(); private long _nextRetry; } diff --git a/java/test/src/main/java/test/IceDiscovery/simple/AllTests.java b/java/test/src/main/java/test/IceDiscovery/simple/AllTests.java index 820a6ad0bd9..f43e44dd761 100644 --- a/java/test/src/main/java/test/IceDiscovery/simple/AllTests.java +++ b/java/test/src/main/java/test/IceDiscovery/simple/AllTests.java @@ -209,6 +209,55 @@ public class AllTests } System.out.println("ok"); + System.out.print("testing invalid lookup endpoints... "); + System.out.flush(); + { + String multicast; + if(communicator.getProperties().getProperty("Ice.IPv6").equals("1")) + { + multicast = "\"ff15::1\""; + } + else + { + multicast = "239.255.0.1"; + } + + { + com.zeroc.Ice.InitializationData initData = new com.zeroc.Ice.InitializationData(); + initData.properties = communicator.getProperties()._clone(); + initData.properties.setProperty("IceDiscovery.Lookup", "udp -h " + multicast + " --interface unknown"); + com.zeroc.Ice.Communicator comm = com.zeroc.Ice.Util.initialize(initData); + test(comm.getDefaultLocator() != null); + try + { + comm.stringToProxy("controller0@control0").ice_ping(); + test(false); + } + catch(com.zeroc.Ice.LocalException ex) + { + } + comm.destroy(); + } + { + com.zeroc.Ice.InitializationData initData = new com.zeroc.Ice.InitializationData(); + initData.properties = communicator.getProperties()._clone(); + String intf = initData.properties.getProperty("IceDiscovery.Interface"); + if(!intf.isEmpty()) + { + intf = " --interface \"" + intf + "\""; + } + String port = initData.properties.getProperty("IceDiscovery.Port"); + initData.properties.setProperty("IceDiscovery.Lookup", + "udp -h " + multicast + " --interface unknown:" + + "udp -h " + multicast + " -p " + port + intf); + com.zeroc.Ice.Communicator comm = com.zeroc.Ice.Util.initialize(initData); + test(comm.getDefaultLocator() != null); + comm.stringToProxy("controller0@control0").ice_ping(); + comm.destroy(); + } + } + System.out.println("ok"); + System.out.print("shutting down... "); System.out.flush(); for(ControllerPrx prx : proxies) diff --git a/java/test/src/main/java/test/IceGrid/simple/AllTests.java b/java/test/src/main/java/test/IceGrid/simple/AllTests.java index 1c9e28cccc2..4e3fb2335b2 100644 --- a/java/test/src/main/java/test/IceGrid/simple/AllTests.java +++ b/java/test/src/main/java/test/IceGrid/simple/AllTests.java @@ -144,6 +144,64 @@ public class AllTests adapter.deactivate(); comm.destroy(); + + String multicast; + if(communicator.getProperties().getProperty("Ice.IPv6").equals("1")) + { + multicast = "\"ff15::1\""; + } + else + { + multicast = "239.255.0.1"; + } + + // + // Test invalid lookup endpoints + // + initData.properties = communicator.getProperties()._clone(); + initData.properties.setProperty("Ice.Default.Locator", ""); + initData.properties.setProperty("Ice.Plugin.IceLocatorDiscovery", + "com.zeroc.IceLocatorDiscovery.PluginFactory"); + initData.properties.setProperty("IceLocatorDiscovery.Lookup", + "udp -h " + multicast + " --interface unknown"); + comm = com.zeroc.Ice.Util.initialize(initData); + test(comm.getDefaultLocator() != null); + try + { + comm.stringToProxy("test @ TestAdapter").ice_ping(); + test(false); + } + catch(com.zeroc.Ice.NoEndpointException ex) + { + } + comm.destroy(); + + initData.properties = communicator.getProperties()._clone(); + initData.properties.setProperty("Ice.Default.Locator", ""); + initData.properties.setProperty("Ice.Plugin.IceLocatorDiscovery", + "com.zeroc.IceLocatorDiscovery.PluginFactory"); + { + String intf = initData.properties.getProperty("IceLocatorDiscovery.Interface"); + if(!intf.isEmpty()) + { + intf = " --interface \"" + intf + "\""; + } + String port = Integer.toString(app.getTestPort(99)); + initData.properties.setProperty("IceLocatorDiscovery.Lookup", + "udp -h " + multicast + " --interface unknown:" + + "udp -h " + multicast + " -p " + port + intf); + } + comm = com.zeroc.Ice.Util.initialize(initData); + test(comm.getDefaultLocator() != null); + try + { + comm.stringToProxy("test @ TestAdapter").ice_ping(); + } + catch(com.zeroc.Ice.NoEndpointException ex) + { + test(false); + } + comm.destroy(); } out.println("ok"); |