diff options
Diffstat (limited to 'csharp/src/IceDiscovery/LookupI.cs')
-rw-r--r-- | csharp/src/IceDiscovery/LookupI.cs | 194 |
1 files changed, 137 insertions, 57 deletions
diff --git a/csharp/src/IceDiscovery/LookupI.cs b/csharp/src/IceDiscovery/LookupI.cs index 9c4abb2d8a1..1912505f2da 100644 --- a/csharp/src/IceDiscovery/LookupI.cs +++ b/csharp/src/IceDiscovery/LookupI.cs @@ -15,12 +15,12 @@ namespace IceDiscovery using System.Text; using System.Diagnostics; - class Request<T> + abstract class Request<T> { protected Request(LookupI lookup, T id, int retryCount) { lookup_ = lookup; - nRetry_ = retryCount; + retryCount_ = retryCount; _id = id; } @@ -37,14 +37,40 @@ namespace IceDiscovery public virtual bool retry() { - return --nRetry_ >= 0; + return --retryCount_ >= 0; } + public void invoke(String domainId, Dictionary<LookupPrx, LookupReplyPrx> lookups) + { + _lookupCount = lookups.Count; + _failureCount = 0; + foreach(var entry in lookups) + { + invokeWithLookup(domainId, entry.Key, entry.Value); + } + } + + public bool exception() + { + if(++_failureCount == _lookupCount) + { + finished(null); + return true; + } + return false; + } + + abstract public void finished(Ice.ObjectPrx proxy); + + abstract protected void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply); + protected LookupI lookup_; - protected int nRetry_; + protected int retryCount_; + protected int _lookupCount; + protected int _failureCount; protected List<TaskCompletionSource<Ice.ObjectPrx>> callbacks_ = new List<TaskCompletionSource<Ice.ObjectPrx>>(); - private T _id; + protected T _id; }; class AdapterRequest : Request<string>, IceInternal.TimerTask @@ -56,7 +82,7 @@ namespace IceDiscovery public override bool retry() { - return _proxies.Count == 0 && --nRetry_ >= 0; + return _proxies.Count == 0 && --retryCount_ >= 0; } public bool response(Ice.ObjectPrx proxy, bool isReplicaGroup) @@ -80,7 +106,7 @@ namespace IceDiscovery return true; } - public void finished(Ice.ObjectPrx proxy) + public override void finished(Ice.ObjectPrx proxy) { if(proxy != null || _proxies.Count == 0) { @@ -111,6 +137,20 @@ namespace IceDiscovery lookup_.adapterRequestTimedOut(this); } + protected override void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply) + { + lookup.findAdapterByIdAsync(domainId, _id, lookupReply).ContinueWith(task => { + try + { + task.Wait(); + } + catch(AggregateException ex) + { + lookup_.adapterRequestException(this, ex.InnerException); + } + }); + } + private void sendResponse(Ice.ObjectPrx proxy) { foreach(var cb in callbacks_) @@ -136,7 +176,7 @@ namespace IceDiscovery finished(proxy); } - public void finished(Ice.ObjectPrx proxy) + public override void finished(Ice.ObjectPrx proxy) { foreach(var cb in callbacks_) { @@ -149,6 +189,20 @@ namespace IceDiscovery { lookup_.objectRequestTimedOut(this); } + + protected override void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply) + { + lookup.findObjectByIdAsync(domainId, _id, lookupReply).ContinueWith(task => { + try + { + task.Wait(); + } + catch(AggregateException ex) + { + lookup_.objectRequestException(this, ex.InnerException); + } + }); + } }; class LookupI : LookupDisp_ @@ -156,27 +210,13 @@ namespace IceDiscovery public LookupI(LocatorRegistryI registry, LookupPrx lookup, Ice.Properties properties) { _registry = registry; + _lookup = lookup; _timeout = properties.getPropertyAsIntWithDefault("IceDiscovery.Timeout", 300); _retryCount = properties.getPropertyAsIntWithDefault("IceDiscovery.RetryCount", 3); _latencyMultiplier = properties.getPropertyAsIntWithDefault("IceDiscovery.LatencyMultiplier", 1); _domainId = properties.getProperty("IceDiscovery.DomainId"); _timer = IceInternal.Util.getInstance(lookup.ice_getCommunicator()).timer(); - try - { - lookup.ice_getConnection(); - } - catch(Ice.LocalException ex) - { - StringBuilder b = new StringBuilder(); - b.Append("IceDiscovery is unable to establish a multicast connection:\n"); - b.Append("proxy = "); - b.Append(lookup.ToString()); - b.Append('\n'); - b.Append(ex.ToString()); - throw new Ice.PluginInitializationException(b.ToString()); - } - // // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast // datagram on each endpoint. @@ -184,19 +224,10 @@ namespace IceDiscovery var single = new Ice.Endpoint[1]; foreach(var endpt in lookup.ice_getEndpoints()) { - try - { - single[0] = endpt; - LookupPrx l = (LookupPrx)lookup.ice_endpoints(single); - l.ice_getConnection(); - _lookup[(LookupPrx)lookup.ice_endpoints(single)] = null; - } - catch(Ice.LocalException) - { - // Ignore - } + single[0] = endpt; + _lookups[(LookupPrx)lookup.ice_endpoints(single)] = null; } - Debug.Assert(_lookup.Count > 0); + Debug.Assert(_lookups.Count > 0); } public void setLookupReply(LookupReplyPrx lookupReply) @@ -205,7 +236,7 @@ namespace IceDiscovery // Use a lookup reply proxy whose adress matches the interface used to send multicast datagrams. // var single = new Ice.Endpoint[1]; - foreach(var key in new List<LookupPrx>(_lookup.Keys)) + foreach(var key in new List<LookupPrx>(_lookups.Keys)) { var info = (Ice.UDPEndpointInfo)key.ice_getEndpoints()[0].getInfo(); if(info.mcastInterface.Length > 0) @@ -216,15 +247,15 @@ namespace IceDiscovery if(r is Ice.IPEndpointInfo && ((Ice.IPEndpointInfo)r).host.Equals(info.mcastInterface)) { single[0] = q; - _lookup[key] = (LookupReplyPrx)lookupReply.ice_endpoints(single); + _lookups[key] = (LookupReplyPrx)lookupReply.ice_endpoints(single); } } } - if(_lookup[key] == null) + if(_lookups[key] == null) { // Fallback: just use the given lookup reply proxy if no matching endpoint found. - _lookup[key] = lookupReply; + _lookups[key] = lookupReply; } } } @@ -296,10 +327,7 @@ namespace IceDiscovery { try { - foreach(var l in _lookup) - { - l.Key.findObjectByIdAsync(_domainId, id, l.Value); - } + request.invoke(_domainId, _lookups); _timer.schedule(request, _timeout); } catch(Ice.LocalException) @@ -328,10 +356,7 @@ namespace IceDiscovery { try { - foreach(var l in _lookup) - { - l.Key.findAdapterByIdAsync(_domainId, adapterId, l.Value); - } + request.invoke(_domainId, _lookups); _timer.schedule(request, _timeout); } catch(Ice.LocalException) @@ -391,10 +416,7 @@ namespace IceDiscovery { try { - foreach(var l in _lookup) - { - l.Key.findObjectByIdAsync(_domainId, request.getId(), l.Value); - } + request.invoke(_domainId, _lookups); _timer.schedule(request, _timeout); return; } @@ -409,6 +431,36 @@ namespace IceDiscovery } } + internal void objectRequestException(ObjectRequest request, Exception ex) + { + lock(this) + { + ObjectRequest r; + if(!_objectRequests.TryGetValue(request.getId(), out r) || r != request) + { + return; + } + + if(request.exception()) + { + if(_warnOnce) + { + StringBuilder s = new StringBuilder(); + s.Append("failed to lookup object `"); + s.Append(_lookup.ice_getCommunicator().identityToString(request.getId())); + s.Append("' with lookup proxy `"); + s.Append(_lookup); + s.Append("':\n"); + s.Append(ex.ToString()); + _lookup.ice_getCommunicator().getLogger().warning(s.ToString()); + _warnOnce = false; + } + _timer.cancel(request); + _objectRequests.Remove(request.getId()); + } + } + } + internal void adapterRequestTimedOut(AdapterRequest request) { lock(this) @@ -423,10 +475,7 @@ namespace IceDiscovery { try { - foreach(var l in _lookup) - { - l.Key.findAdapterByIdAsync(_domainId, request.getId(), l.Value); - } + request.invoke(_domainId, _lookups); _timer.schedule(request, _timeout); return; } @@ -441,6 +490,36 @@ namespace IceDiscovery } } + internal void adapterRequestException(AdapterRequest request, Exception ex) + { + lock(this) + { + AdapterRequest r; + if(!_adapterRequests.TryGetValue(request.getId(), out r) || r != request) + { + return; + } + + if(request.exception()) + { + if(_warnOnce) + { + StringBuilder s = new StringBuilder(); + s.Append("failed to lookup adapter `"); + s.Append(request.getId()); + s.Append("' with lookup proxy `"); + s.Append(_lookup); + s.Append("':\n"); + s.Append(ex.ToString()); + _lookup.ice_getCommunicator().getLogger().warning(s.ToString()); + _warnOnce = false; + } + _timer.cancel(request); + _adapterRequests.Remove(request.getId()); + } + } + } + internal IceInternal.Timer timer() { return _timer; @@ -452,14 +531,15 @@ namespace IceDiscovery } private LocatorRegistryI _registry; - private Dictionary<LookupPrx, LookupReplyPrx> _lookup = new Dictionary<LookupPrx, LookupReplyPrx>(); + private LookupPrx _lookup; + private Dictionary<LookupPrx, LookupReplyPrx> _lookups = new Dictionary<LookupPrx, LookupReplyPrx>(); private readonly int _timeout; private readonly int _retryCount; private readonly int _latencyMultiplier; private readonly string _domainId; private IceInternal.Timer _timer; - + private bool _warnOnce = true; private Dictionary<Ice.Identity, ObjectRequest> _objectRequests = new Dictionary<Ice.Identity, ObjectRequest>(); private Dictionary<string, AdapterRequest> _adapterRequests = new Dictionary<string, AdapterRequest>(); }; |