summaryrefslogtreecommitdiff
path: root/csharp/src/IceDiscovery/LookupI.cs
diff options
context:
space:
mode:
Diffstat (limited to 'csharp/src/IceDiscovery/LookupI.cs')
-rw-r--r--csharp/src/IceDiscovery/LookupI.cs194
1 files changed, 137 insertions, 57 deletions
diff --git a/csharp/src/IceDiscovery/LookupI.cs b/csharp/src/IceDiscovery/LookupI.cs
index 9c4abb2d8a1..1912505f2da 100644
--- a/csharp/src/IceDiscovery/LookupI.cs
+++ b/csharp/src/IceDiscovery/LookupI.cs
@@ -15,12 +15,12 @@ namespace IceDiscovery
using System.Text;
using System.Diagnostics;
- class Request<T>
+ abstract class Request<T>
{
protected Request(LookupI lookup, T id, int retryCount)
{
lookup_ = lookup;
- nRetry_ = retryCount;
+ retryCount_ = retryCount;
_id = id;
}
@@ -37,14 +37,40 @@ namespace IceDiscovery
public virtual bool retry()
{
- return --nRetry_ >= 0;
+ return --retryCount_ >= 0;
}
+ public void invoke(String domainId, Dictionary<LookupPrx, LookupReplyPrx> lookups)
+ {
+ _lookupCount = lookups.Count;
+ _failureCount = 0;
+ foreach(var entry in lookups)
+ {
+ invokeWithLookup(domainId, entry.Key, entry.Value);
+ }
+ }
+
+ public bool exception()
+ {
+ if(++_failureCount == _lookupCount)
+ {
+ finished(null);
+ return true;
+ }
+ return false;
+ }
+
+ abstract public void finished(Ice.ObjectPrx proxy);
+
+ abstract protected void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply);
+
protected LookupI lookup_;
- protected int nRetry_;
+ protected int retryCount_;
+ protected int _lookupCount;
+ protected int _failureCount;
protected List<TaskCompletionSource<Ice.ObjectPrx>> callbacks_ = new List<TaskCompletionSource<Ice.ObjectPrx>>();
- private T _id;
+ protected T _id;
};
class AdapterRequest : Request<string>, IceInternal.TimerTask
@@ -56,7 +82,7 @@ namespace IceDiscovery
public override bool retry()
{
- return _proxies.Count == 0 && --nRetry_ >= 0;
+ return _proxies.Count == 0 && --retryCount_ >= 0;
}
public bool response(Ice.ObjectPrx proxy, bool isReplicaGroup)
@@ -80,7 +106,7 @@ namespace IceDiscovery
return true;
}
- public void finished(Ice.ObjectPrx proxy)
+ public override void finished(Ice.ObjectPrx proxy)
{
if(proxy != null || _proxies.Count == 0)
{
@@ -111,6 +137,20 @@ namespace IceDiscovery
lookup_.adapterRequestTimedOut(this);
}
+ protected override void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply)
+ {
+ lookup.findAdapterByIdAsync(domainId, _id, lookupReply).ContinueWith(task => {
+ try
+ {
+ task.Wait();
+ }
+ catch(AggregateException ex)
+ {
+ lookup_.adapterRequestException(this, ex.InnerException);
+ }
+ });
+ }
+
private void sendResponse(Ice.ObjectPrx proxy)
{
foreach(var cb in callbacks_)
@@ -136,7 +176,7 @@ namespace IceDiscovery
finished(proxy);
}
- public void finished(Ice.ObjectPrx proxy)
+ public override void finished(Ice.ObjectPrx proxy)
{
foreach(var cb in callbacks_)
{
@@ -149,6 +189,20 @@ namespace IceDiscovery
{
lookup_.objectRequestTimedOut(this);
}
+
+ protected override void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply)
+ {
+ lookup.findObjectByIdAsync(domainId, _id, lookupReply).ContinueWith(task => {
+ try
+ {
+ task.Wait();
+ }
+ catch(AggregateException ex)
+ {
+ lookup_.objectRequestException(this, ex.InnerException);
+ }
+ });
+ }
};
class LookupI : LookupDisp_
@@ -156,27 +210,13 @@ namespace IceDiscovery
public LookupI(LocatorRegistryI registry, LookupPrx lookup, Ice.Properties properties)
{
_registry = registry;
+ _lookup = lookup;
_timeout = properties.getPropertyAsIntWithDefault("IceDiscovery.Timeout", 300);
_retryCount = properties.getPropertyAsIntWithDefault("IceDiscovery.RetryCount", 3);
_latencyMultiplier = properties.getPropertyAsIntWithDefault("IceDiscovery.LatencyMultiplier", 1);
_domainId = properties.getProperty("IceDiscovery.DomainId");
_timer = IceInternal.Util.getInstance(lookup.ice_getCommunicator()).timer();
- try
- {
- lookup.ice_getConnection();
- }
- catch(Ice.LocalException ex)
- {
- StringBuilder b = new StringBuilder();
- b.Append("IceDiscovery is unable to establish a multicast connection:\n");
- b.Append("proxy = ");
- b.Append(lookup.ToString());
- b.Append('\n');
- b.Append(ex.ToString());
- throw new Ice.PluginInitializationException(b.ToString());
- }
-
//
// Create one lookup proxy per endpoint from the given proxy. We want to send a multicast
// datagram on each endpoint.
@@ -184,19 +224,10 @@ namespace IceDiscovery
var single = new Ice.Endpoint[1];
foreach(var endpt in lookup.ice_getEndpoints())
{
- try
- {
- single[0] = endpt;
- LookupPrx l = (LookupPrx)lookup.ice_endpoints(single);
- l.ice_getConnection();
- _lookup[(LookupPrx)lookup.ice_endpoints(single)] = null;
- }
- catch(Ice.LocalException)
- {
- // Ignore
- }
+ single[0] = endpt;
+ _lookups[(LookupPrx)lookup.ice_endpoints(single)] = null;
}
- Debug.Assert(_lookup.Count > 0);
+ Debug.Assert(_lookups.Count > 0);
}
public void setLookupReply(LookupReplyPrx lookupReply)
@@ -205,7 +236,7 @@ namespace IceDiscovery
// Use a lookup reply proxy whose adress matches the interface used to send multicast datagrams.
//
var single = new Ice.Endpoint[1];
- foreach(var key in new List<LookupPrx>(_lookup.Keys))
+ foreach(var key in new List<LookupPrx>(_lookups.Keys))
{
var info = (Ice.UDPEndpointInfo)key.ice_getEndpoints()[0].getInfo();
if(info.mcastInterface.Length > 0)
@@ -216,15 +247,15 @@ namespace IceDiscovery
if(r is Ice.IPEndpointInfo && ((Ice.IPEndpointInfo)r).host.Equals(info.mcastInterface))
{
single[0] = q;
- _lookup[key] = (LookupReplyPrx)lookupReply.ice_endpoints(single);
+ _lookups[key] = (LookupReplyPrx)lookupReply.ice_endpoints(single);
}
}
}
- if(_lookup[key] == null)
+ if(_lookups[key] == null)
{
// Fallback: just use the given lookup reply proxy if no matching endpoint found.
- _lookup[key] = lookupReply;
+ _lookups[key] = lookupReply;
}
}
}
@@ -296,10 +327,7 @@ namespace IceDiscovery
{
try
{
- foreach(var l in _lookup)
- {
- l.Key.findObjectByIdAsync(_domainId, id, l.Value);
- }
+ request.invoke(_domainId, _lookups);
_timer.schedule(request, _timeout);
}
catch(Ice.LocalException)
@@ -328,10 +356,7 @@ namespace IceDiscovery
{
try
{
- foreach(var l in _lookup)
- {
- l.Key.findAdapterByIdAsync(_domainId, adapterId, l.Value);
- }
+ request.invoke(_domainId, _lookups);
_timer.schedule(request, _timeout);
}
catch(Ice.LocalException)
@@ -391,10 +416,7 @@ namespace IceDiscovery
{
try
{
- foreach(var l in _lookup)
- {
- l.Key.findObjectByIdAsync(_domainId, request.getId(), l.Value);
- }
+ request.invoke(_domainId, _lookups);
_timer.schedule(request, _timeout);
return;
}
@@ -409,6 +431,36 @@ namespace IceDiscovery
}
}
+ internal void objectRequestException(ObjectRequest request, Exception ex)
+ {
+ lock(this)
+ {
+ ObjectRequest r;
+ if(!_objectRequests.TryGetValue(request.getId(), out r) || r != request)
+ {
+ return;
+ }
+
+ if(request.exception())
+ {
+ if(_warnOnce)
+ {
+ StringBuilder s = new StringBuilder();
+ s.Append("failed to lookup object `");
+ s.Append(_lookup.ice_getCommunicator().identityToString(request.getId()));
+ s.Append("' with lookup proxy `");
+ s.Append(_lookup);
+ s.Append("':\n");
+ s.Append(ex.ToString());
+ _lookup.ice_getCommunicator().getLogger().warning(s.ToString());
+ _warnOnce = false;
+ }
+ _timer.cancel(request);
+ _objectRequests.Remove(request.getId());
+ }
+ }
+ }
+
internal void adapterRequestTimedOut(AdapterRequest request)
{
lock(this)
@@ -423,10 +475,7 @@ namespace IceDiscovery
{
try
{
- foreach(var l in _lookup)
- {
- l.Key.findAdapterByIdAsync(_domainId, request.getId(), l.Value);
- }
+ request.invoke(_domainId, _lookups);
_timer.schedule(request, _timeout);
return;
}
@@ -441,6 +490,36 @@ namespace IceDiscovery
}
}
+ internal void adapterRequestException(AdapterRequest request, Exception ex)
+ {
+ lock(this)
+ {
+ AdapterRequest r;
+ if(!_adapterRequests.TryGetValue(request.getId(), out r) || r != request)
+ {
+ return;
+ }
+
+ if(request.exception())
+ {
+ if(_warnOnce)
+ {
+ StringBuilder s = new StringBuilder();
+ s.Append("failed to lookup adapter `");
+ s.Append(request.getId());
+ s.Append("' with lookup proxy `");
+ s.Append(_lookup);
+ s.Append("':\n");
+ s.Append(ex.ToString());
+ _lookup.ice_getCommunicator().getLogger().warning(s.ToString());
+ _warnOnce = false;
+ }
+ _timer.cancel(request);
+ _adapterRequests.Remove(request.getId());
+ }
+ }
+ }
+
internal IceInternal.Timer timer()
{
return _timer;
@@ -452,14 +531,15 @@ namespace IceDiscovery
}
private LocatorRegistryI _registry;
- private Dictionary<LookupPrx, LookupReplyPrx> _lookup = new Dictionary<LookupPrx, LookupReplyPrx>();
+ private LookupPrx _lookup;
+ private Dictionary<LookupPrx, LookupReplyPrx> _lookups = new Dictionary<LookupPrx, LookupReplyPrx>();
private readonly int _timeout;
private readonly int _retryCount;
private readonly int _latencyMultiplier;
private readonly string _domainId;
private IceInternal.Timer _timer;
-
+ private bool _warnOnce = true;
private Dictionary<Ice.Identity, ObjectRequest> _objectRequests = new Dictionary<Ice.Identity, ObjectRequest>();
private Dictionary<string, AdapterRequest> _adapterRequests = new Dictionary<string, AdapterRequest>();
};