Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RoundRobin balancer with health checks on subchannels #2515

Open
pipka76 opened this issue Aug 25, 2024 · 0 comments
Open

RoundRobin balancer with health checks on subchannels #2515

pipka76 opened this issue Aug 25, 2024 · 0 comments
Labels
question Further information is requested

Comments

@pipka76
Copy link

pipka76 commented Aug 25, 2024

Hi,

I'm in the process of implementing custom load balancer for gRPC client. As our infrastructure is massive (multiple docker swarm clusters with lot of instances of gRPC microservices) and we use standard RoundRobin implementation of client side balancing. Unfortunately the standard implementation is not sufficient as the fact that the subchannel transport is able to connect on TCP level, does not always mean that he will be able to perform gRPC call over that connection.

What I'm trying to achieve is connection proofing on the level of balancer picker. My implementation works and it is pasted on the end of my post.

My question is: How (if possible) can I achieve that health check call is multiplexed over the very same TCP connection as the actual gRPC call will be made right after Picker will return it? My current implementation below creates new TCP connection to call health service.

Many thanks. George

internal sealed class RoundRobinOnHealthyBalancer : SubchannelsLoadBalancer
{
    /// <summary>
    /// Initializes a new instance of the <see cref="RoundRobinOnHealthyBalancer"/> class.
    /// </summary>
    /// <param name="controller">The controller.</param>
    /// <param name="loggerFactory">The logger factory.</param>
    public RoundRobinOnHealthyBalancer(IChannelControlHelper controller, ILoggerFactory loggerFactory)
        : base(controller, loggerFactory)
    {
    }

    /// <inheritdoc />
    protected override SubchannelPicker CreatePicker(IReadOnlyList<Subchannel> readySubchannels)
    {
       return new RoundRobinOnHealthyPicker(readySubchannels);
    }
}

internal sealed class RoundRobinOnHealthyPicker : SubchannelPicker
{
    private const string HealthStatusKey = "HealthStatus";
    private const string HealthStatusLastCheckKey = "HealthStatusLastCheck";
    private const string HealthStatusServing = "SERVING";

    private static HttpClientHandler _httpClientHandler;
    private static HttpClient _httpClient;

    // Internal for testing
    internal readonly List<Subchannel> _subchannels;
    private long _pickCount;

    static RoundRobinOnHealthyPicker()
    {
        _httpClientHandler = new HttpClientHandler();
        _httpClientHandler.ServerCertificateCustomValidationCallback =
            HttpClientHandler.DangerousAcceptAnyServerCertificateValidator;
        _httpClient = new HttpClient(_httpClientHandler);
    }

    public RoundRobinOnHealthyPicker(IReadOnlyList<Subchannel> subchannels)
    {
        if (subchannels != null)
        {
            var pickCount = new Random().Next(0, subchannels.Count);
            _subchannels = subchannels.ToList();
            _pickCount = pickCount;
            return;
        }

        _subchannels = new List<Subchannel>();
        _pickCount = 0;
    }

    private bool IsSubchannelHealthy(PickContext context)
    {
        var invoker = GrpcChannel.ForAddress(context.Request.RequestUri, new GrpcChannelOptions() { HttpClient = _httpClient }).CreateCallInvoker();
        var healthClient = new HealthClient(invoker);

        try
        {
            var response = healthClient.Check(new HealthCheckRequest(), new CallOptions(deadline: DateTime.UtcNow.AddMilliseconds(1000)));
            return response.Status == HealthCheckResponse.Types.ServingStatus.Serving;
        }
        catch (Grpc.Core.RpcException rpcEx)
        {
            if (rpcEx.StatusCode == StatusCode.Unimplemented)
                return true;
        }

        return false;
    }

    public override PickResult Pick(PickContext context)
    {
        int maxTries = _subchannels.Count;

        while (maxTries > 0)
        {
            var c = Interlocked.Increment(ref _pickCount);
            var index = c % _subchannels.Count;
            var item = _subchannels[(int)index];

            if (item.CurrentAddress == null)
            {
                maxTries--;
                continue;
            }

            bool forceCheck = false;
            var hStatusKey = new BalancerAttributesKey<string>(HealthStatusKey);
            var hStatusCheckKey = new BalancerAttributesKey<string>(HealthStatusLastCheckKey);
            if (!item.CurrentAddress.Attributes.TryGetValue(hStatusCheckKey, out string? hLastStatusCheck))
            {
                forceCheck = true;
            }
            else
            {
                if (DateTime.TryParse(hLastStatusCheck, out DateTime dtLastCheckedOn) && (dtLastCheckedOn < DateTime.Now.AddSeconds(-10)))
                {
                    forceCheck = true;
                }
            }

            if (forceCheck || !item.CurrentAddress.Attributes.TryGetValue(hStatusKey, out string? hStatus))
            {
                item.CurrentAddress.Attributes.Set(new BalancerAttributesKey<string>(HealthStatusLastCheckKey), DateTime.UtcNow.ToString("O"));
                if (IsSubchannelHealthy(context))
                {
                    hStatus = HealthStatusServing;
                    item.CurrentAddress.Attributes.Set(hStatusKey, hStatus);
                }
                else
                {
                    maxTries--;
                    item.CurrentAddress.Attributes.Remove(hStatusKey);
                    continue;
                }
            }

            if (hStatus != HealthStatusServing)
            {
                maxTries--;
                continue;
            }

            return PickResult.ForSubchannel(item);
        }

        Console.WriteLine("No healthy subchannels found.");

        return PickResult.ForFailure(new Grpc.Core.Status(Grpc.Core.StatusCode.NotFound, "No healthy subchannels found."));
    }

    public override string ToString()
    {
        return string.Join(", ", _subchannels.Select(s => s.ToString()));
    }
}
@pipka76 pipka76 added the question Further information is requested label Aug 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant