// Copyright (c) ZeroC, Inc. All rights reserved.
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ZeroC.Ice.Discovery
{
/// Servant class that implements the Slice interface Ice::Locator.
internal class Locator : IAsyncLocator
{
internal ILocatorPrx Proxy { get; }
private readonly string _domainId;
private readonly int _latencyMultiplier;
private readonly ObjectAdapter _locatorAdapter;
private readonly ILookupPrx _lookup;
// The key is a single-endpoint datagram Lookup proxy extracted from the _lookup proxy.
// The value is a dummy datagram proxy with usually a single endpoint that is one of _replyAdapter's endpoints
// and that matches the interface of the key's endpoint.
private readonly Dictionary _lookups = new();
private readonly ObjectAdapter _multicastAdapter;
private readonly ILocatorRegistryPrx _registry;
private readonly ObjectAdapter _replyAdapter;
private readonly int _retryCount;
private readonly TimeSpan _timeout;
public async ValueTask FindAdapterByIdAsync(
string adapterId,
Current current,
CancellationToken cancel)
{
using var replyServant = new FindAdapterByIdReply(_replyAdapter);
return await InvokeAsync(
(lookup, dummyReply) =>
{
IFindAdapterByIdReplyPrx reply =
dummyReply.Clone(IFindAdapterByIdReplyPrx.Factory, identity: replyServant.Identity);
return lookup.FindAdapterByIdAsync(_domainId,
adapterId,
reply,
cancel: cancel);
},
replyServant).ConfigureAwait(false);
}
public async ValueTask FindObjectByIdAsync(
Identity identity,
string? facet,
Current current,
CancellationToken cancel)
{
using var replyServant = new FindObjectByIdReply(_replyAdapter);
return await InvokeAsync(
(lookup, dummyReply) =>
{
IFindObjectByIdReplyPrx reply =
dummyReply.Clone(IFindObjectByIdReplyPrx.Factory, identity: replyServant.Identity);
return lookup.FindObjectByIdAsync(_domainId, identity, facet, reply, cancel: cancel);
},
replyServant).ConfigureAwait(false);
}
public ValueTask GetRegistryAsync(Current current, CancellationToken cancel) =>
new(_registry);
public async ValueTask> ResolveLocationAsync(
string[] location,
Current current,
CancellationToken cancel)
{
if (location.Length == 0)
{
throw new InvalidArgumentException("location cannot be empty", nameof(location));
}
else if (location.Length > 1)
{
// Ice discovery supports only single-segment locations.
return ImmutableArray.Empty;
}
string adapterId = location[0];
using var replyServant = new ResolveAdapterIdReply(_replyAdapter);
return await InvokeAsync(
(lookup, dummyReply) =>
{
IResolveAdapterIdReplyPrx reply =
dummyReply.Clone(IResolveAdapterIdReplyPrx.Factory, identity: replyServant.Identity);
return lookup.ResolveAdapterIdAsync(_domainId,
adapterId,
reply,
cancel: cancel);
},
replyServant).ConfigureAwait(false);
}
public async ValueTask<(IEnumerable, IEnumerable)> ResolveWellKnownProxyAsync(
Identity identity,
string facet,
Current current,
CancellationToken cancel)
{
using var replyServant = new ResolveWellKnownProxyReply(_replyAdapter);
string adapterId = await InvokeAsync(
(lookup, dummyReply) =>
{
IResolveWellKnownProxyReplyPrx reply =
dummyReply.Clone(IResolveWellKnownProxyReplyPrx.Factory, identity: replyServant.Identity);
return lookup.ResolveWellKnownProxyAsync(_domainId,
identity,
facet,
reply,
cancel: cancel);
},
replyServant).ConfigureAwait(false);
// We never return endpoints
return (ImmutableArray.Empty,
adapterId.Length > 0 ? ImmutableArray.Create(adapterId) : ImmutableArray.Empty);
}
internal Locator(Communicator communicator)
{
const string defaultIPv4Endpoint = "udp -h 239.255.0.1 -p 4061";
const string defaultIPv6Endpoint = "udp -h \"ff15::1\" -p 4061";
if (communicator.GetProperty("Ice.Discovery.Multicast.Endpoints") == null)
{
communicator.SetProperty("Ice.Discovery.Multicast.Endpoints",
$"{defaultIPv4Endpoint}:{defaultIPv6Endpoint}");
}
communicator.SetProperty("Ice.Discovery.Multicast.AcceptNonSecure", "Always");
if (communicator.GetProperty("Ice.Discovery.Reply.Endpoints") == null)
{
communicator.SetProperty("Ice.Discovery.Reply.Endpoints", "udp -h \"::0\" -p 0");
}
// create datagram proxies
communicator.SetProperty("Ice.Discovery.Reply.ProxyOptions", "-d");
// datagram connection are nonsecure
communicator.SetProperty("Ice.Discovery.Reply.AcceptNonSecure", "Always");
_timeout = communicator.GetPropertyAsTimeSpan("Ice.Discovery.Timeout") ?? TimeSpan.FromMilliseconds(300);
if (_timeout == Timeout.InfiniteTimeSpan)
{
_timeout = TimeSpan.FromMilliseconds(300);
}
_retryCount = communicator.GetPropertyAsInt("Ice.Discovery.RetryCount") ?? 3;
_latencyMultiplier = communicator.GetPropertyAsInt("Ice.Discovery.LatencyMultiplier") ?? 1;
if (_latencyMultiplier < 1)
{
throw new InvalidConfigurationException(
"the value of Ice.Discovery.LatencyMultiplier must be an integer greater than 0");
}
_domainId = communicator.GetProperty("Ice.Discovery.DomainId") ?? "";
string? lookupEndpoints = communicator.GetProperty("Ice.Discovery.Lookup");
if (lookupEndpoints == null)
{
var endpoints = new List();
List ipv4Interfaces = Network.GetInterfacesForMulticast("0.0.0.0", Network.EnableIPv4);
List ipv6Interfaces = Network.GetInterfacesForMulticast("::0", Network.EnableIPv6);
endpoints.AddRange(ipv4Interfaces.Select(i => $"{defaultIPv4Endpoint} --interface \"{i}\""));
endpoints.AddRange(ipv6Interfaces.Select(i => $"{defaultIPv6Endpoint} --interface \"{i}\""));
lookupEndpoints = string.Join(":", endpoints);
}
// Datagram proxies do not support SSL/TLS so they can only be used with PreferNonSecure set to
// NonSecure.Always.
_lookup = ILookupPrx.Parse($"IceDiscovery/Lookup -d:{lookupEndpoints}", communicator).Clone(
clearRouter: true,
invocationTimeout: _timeout,
preferNonSecure: NonSecure.Always);
_locatorAdapter = communicator.CreateObjectAdapter();
Proxy = _locatorAdapter.AddWithUUID(this, ILocatorPrx.Factory);
// Setup locator registry.
var registryServant = new LocatorRegistry(communicator);
_registry = _locatorAdapter.AddWithUUID(registryServant, ILocatorRegistryPrx.Factory);
_multicastAdapter = communicator.CreateObjectAdapter("Ice.Discovery.Multicast");
_replyAdapter = communicator.CreateObjectAdapter("Ice.Discovery.Reply");
// Dummy proxy for replies which can have multiple endpoints (but see below).
IObjectPrx lookupReply = _replyAdapter.CreateProxy("dummy", IObjectPrx.Factory);
Debug.Assert(lookupReply.InvocationMode == InvocationMode.Datagram);
// Create one lookup proxy per endpoint from the given proxy. We want to send a multicast datagram on
// each of the lookup proxy.
// TODO: this code is incorrect now that the default published endpoints are no longer an expansion
// of the object adapter endpoints.
foreach (Endpoint endpoint in _lookup.Endpoints)
{
if (!endpoint.IsDatagram)
{
throw new InvalidConfigurationException("Ice.Discovery.Lookup can only have udp endpoints");
}
ILookupPrx key = _lookup.Clone(endpoints: ImmutableArray.Create(endpoint));
if (endpoint["interface"] is string mcastInterface && mcastInterface.Length > 0)
{
Endpoint? q = lookupReply.Endpoints.FirstOrDefault(e => e.Host == mcastInterface);
if (q != null)
{
_lookups[key] = lookupReply.Clone(endpoints: ImmutableArray.Create(q));
}
}
if (!_lookups.ContainsKey(key))
{
// Fallback: just use the given lookup reply proxy if no matching endpoint found.
_lookups[key] = lookupReply;
}
}
Debug.Assert(_lookups.Count > 0);
// Add lookup Ice object
_multicastAdapter.Add("IceDiscovery/Lookup", new Lookup(registryServant, communicator));
}
internal Task ActivateAsync(CancellationToken cancel) =>
Task.WhenAll(_locatorAdapter.ActivateAsync(cancel),
_multicastAdapter.ActivateAsync(cancel),
_replyAdapter.ActivateAsync(cancel));
/// Invokes a find or resolve request on a Lookup object and processes the reply(ies).
/// A delegate that performs the remote call. Its parameters correspond to an entry in
/// the _lookups dictionary.
/// The reply servant.
private async Task InvokeAsync(
Func findAsync,
ReplyServant replyServant)
{
// We retry only when at least one findAsync request is sent successfully and we don't get any reply.
// TODO: this _retryCount is really an attempt count not a retry count.
for (int i = 0; i < _retryCount; ++i)
{
TimeSpan start = Time.Elapsed;
var timeoutTask = Task.Delay(_timeout, replyServant.CancellationToken);
var sendTask = Task.WhenAll(_lookups.Select(
entry =>
{
try
{
return findAsync(entry.Key, entry.Value);
}
catch (Exception ex)
{
return Task.FromException(ex);
}
}));
Task task = await Task.WhenAny(sendTask, replyServant.Task, timeoutTask).ConfigureAwait(false);
if (task == sendTask)
{
if (sendTask.Status == TaskStatus.Faulted)
{
if (sendTask.Exception!.InnerExceptions.Count == _lookups.Count)
{
// All the tasks failed: log warning and return empty result (no retry)
_replyAdapter.Communicator.Logger.Warning(
@$"Ice discovery failed to send lookup request using `{_lookup
}':\n{sendTask.Exception!.InnerException!}");
replyServant.SetEmptyResult();
return await replyServant.Task.ConfigureAwait(false);
}
}
// For Canceled or RanToCompletion, we assume at least one send was successful. If we're wrong,
// we'll timeout soon anyways.
task = await Task.WhenAny(replyServant.Task, timeoutTask).ConfigureAwait(false);
}
if (task == replyServant.Task)
{
return await replyServant.Task.ConfigureAwait(false);
}
else if (task.IsCanceled)
{
// If the timeout was canceled we delay the completion of the request to give a chance to other
// members of this replica group to reply
return await
replyServant.GetReplicaGroupRepliesAsync(start, _latencyMultiplier).ConfigureAwait(false);
}
// else timeout, so we retry until _retryCount
}
replyServant.SetEmptyResult(); // _retryCount exceeded
return await replyServant.Task.ConfigureAwait(false);
}
}
/// The base class of all Reply servant that helps collect / gather the reply(ies) to a lookup reques.
///
internal class ReplyServant : IObject, IDisposable
{
internal CancellationToken CancellationToken => _cancellationSource.Token;
internal Identity Identity { get; }
internal Task Task => _completionSource.Task;
private readonly CancellationTokenSource _cancellationSource;
private readonly TaskCompletionSource _completionSource;
private readonly TResult _emptyResult;
private readonly ObjectAdapter _replyAdapter;
public void Dispose()
{
_cancellationSource.Dispose();
_replyAdapter.Remove(Identity);
}
internal async Task GetReplicaGroupRepliesAsync(TimeSpan start, int latencyMultiplier)
{
// This method is called by InvokeAsync after the first reply from a replica group to wait for additional
// replies from the replica group.
TimeSpan latency = (Time.Elapsed - start) * latencyMultiplier;
if (latency == TimeSpan.Zero)
{
latency = TimeSpan.FromMilliseconds(1);
}
await System.Threading.Tasks.Task.Delay(latency).ConfigureAwait(false);
SetResult(CollectReplicaReplies());
return await Task.ConfigureAwait(false);
}
internal void SetEmptyResult() => _completionSource.SetResult(_emptyResult);
private protected ReplyServant(TResult emptyResult, ObjectAdapter replyAdapter)
{
// Add servant (this) to object adapter with new UUID identity.
Identity = replyAdapter.AddWithUUID(this, IObjectPrx.Factory).Identity;
_cancellationSource = new();
_completionSource = new();
_emptyResult = emptyResult;
_replyAdapter = replyAdapter;
}
private protected void Cancel() => _cancellationSource.Cancel();
private protected virtual TResult CollectReplicaReplies()
{
Debug.Assert(false); // must be overridden if called by WaitForReplicaGroupRepliesAsync
return _emptyResult;
}
private protected void SetResult(TResult result) => _completionSource.SetResult(result);
}
/// Servant class that implements the Slice interface FindAdapterByIdReply.
internal sealed class FindAdapterByIdReply : ReplyServant, IAsyncFindAdapterByIdReply
{
private readonly object _mutex = new();
private readonly HashSet _proxies = new();
public ValueTask FoundAdapterByIdAsync(
string adapterId,
IObjectPrx proxy,
bool isReplicaGroup,
Current current,
CancellationToken cancel)
{
if (isReplicaGroup)
{
lock (_mutex)
{
_proxies.Add(proxy);
if (_proxies.Count == 1)
{
// Cancel WhenAny and let InvokeAsync wait for additional replies from the replica group, and
// later call CollectReplicaReplies.
Cancel();
}
}
}
else
{
SetResult(proxy);
}
return default;
}
internal FindAdapterByIdReply(ObjectAdapter replyAdapter)
: base(emptyResult: null, replyAdapter)
{
}
private protected override IObjectPrx? CollectReplicaReplies()
{
lock (_mutex)
{
Debug.Assert(_proxies.Count > 0);
var endpoints = new List();
IObjectPrx result = _proxies.First();
foreach (IObjectPrx prx in _proxies)
{
endpoints.AddRange(prx.Endpoints);
}
return result.Clone(endpoints: endpoints);
}
}
}
/// Servant class that implements the Slice interface FindObjectByIdReply.
internal class FindObjectByIdReply : ReplyServant, IAsyncFindObjectByIdReply
{
public ValueTask FoundObjectByIdAsync(Identity id, IObjectPrx proxy, Current current, CancellationToken cancel)
{
SetResult(proxy);
return default;
}
internal FindObjectByIdReply(ObjectAdapter replyAdapter)
: base(emptyResult: null, replyAdapter)
{
}
}
/// Servant class that implements the Slice interface ResolveAdapterIdReply.
internal sealed class ResolveAdapterIdReply : ReplyServant>, IAsyncResolveAdapterIdReply
{
private readonly object _mutex = new();
private readonly HashSet _endpointDataSet = new();
public ValueTask FoundAdapterIdAsync(
EndpointData[] endpoints,
bool isReplicaGroup,
Current current,
CancellationToken cancel)
{
if (isReplicaGroup)
{
lock (_mutex)
{
bool firstReply = _endpointDataSet.Count == 0;
_endpointDataSet.UnionWith(endpoints);
if (firstReply)
{
Cancel();
}
}
}
else
{
SetResult(endpoints);
}
return default;
}
internal ResolveAdapterIdReply(ObjectAdapter replyAdapter)
: base(ImmutableArray.Empty, replyAdapter)
{
}
private protected override IReadOnlyList CollectReplicaReplies()
{
lock (_mutex)
{
Debug.Assert(_endpointDataSet.Count > 0);
return _endpointDataSet.ToList();
}
}
}
/// Servant class that implements the Slice interface ResolveWellKnownProxyReply.
internal class ResolveWellKnownProxyReply : ReplyServant, IAsyncResolveWellKnownProxyReply
{
public ValueTask FoundWellKnownProxyAsync(string adapterId, Current current, CancellationToken cancel)
{
SetResult(adapterId);
return default;
}
internal ResolveWellKnownProxyReply(ObjectAdapter replyAdapter)
: base(emptyResult: "", replyAdapter)
{
}
}
}