.NET 上的 gRPC 侦听器

作者:Ernest Nguyen

侦听器是一个 gRPC 概念,允许应用与传入或传出的 gRPC 调用进行交互。 它们提供了一种方法来扩充请求处理管道。

侦听器针对通道或服务进行配置,并针对每个 gRPC 调用自动执行。 由于侦听器对用户的应用程序逻辑是透明的,因此它们是适用于常见情况(例如日志记录、监视、身份验证和验证)的极佳解决方案。

Interceptor 类型

可以通过创建从 Interceptor 类型继承的类,为 gRPC 服务器和客户端实现侦听器:

public class ExampleInterceptor : Interceptor
{
}

默认情况下,Interceptor 基类不执行任何操作。 通过在侦听器实现中重写相应的基类方法,将行为添加到侦听器。

客户端侦听器

gRPC 客户端侦听器截获传出的 RPC 调用。 它们提供对发送的请求、传入的响应以及客户端调用的上下文的访问权限。

Interceptor 方法为客户端重写以下项:

  • BlockingUnaryCall:截获一元 RPC 的阻塞调用。
  • AsyncUnaryCall:截获一元 RPC 的异步调用。
  • AsyncClientStreamingCall:截获客户端流式处理 RPC 的异步调用。
  • AsyncServerStreamingCall:截获服务器流式处理 RPC 的异步调用。
  • AsyncDuplexStreamingCall:截获双向流式处理 RPC 的异步调用。

警告

尽管 BlockingUnaryCallAsyncUnaryCall 都是指一元 RPC,但二者不可互换。 阻塞调用不会被 AsyncUnaryCall 截获,异步调用不会被 BlockingUnaryCall 截获。

创建客户端 gRPC 侦听器

以下代码展示了截获一元调用的异步调用的一个基本示例:

public class ClientLoggingInterceptor : Interceptor
{
    private readonly ILogger _logger;

    public ClientLoggingInterceptor(ILoggerFactory loggerFactory)
    {
        _logger = loggerFactory.CreateLogger<ClientLoggingInterceptor>();
    }

    public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
        TRequest request,
        ClientInterceptorContext<TRequest, TResponse> context,
        AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
    {
        _logger.LogInformation("Starting call. Type/Method: {Type} / {Method}",
            context.Method.Type, context.Method.Name);
        return continuation(request, context);
    }
}

重写 AsyncUnaryCall

  • 截获异步一元调用。
  • 记录有关调用的详细信息。
  • 调用传入方法的 continuation 参数。 如果这是最后一个侦听器,则调用链中的下一个侦听器或基础调用的调用程序。

每种服务方法的 Interceptor 上的方法具有不同的签名。 但是,continuationcontext 参数背后的概念保持不变:

  • continuation 是一个委托,它调用链中的下一个侦听器或基础调用的调用程序(如果链中没有侦听器)。 调用零次或多次并不是错误。 侦听器不需要返回从 continuation 委托返回的调用表示形式(如果是一元 RPC,则为 AsyncUnaryCall)。 如果省略委托调用并返回你自己的调用表示形式实例,会中断侦听器的链,并立即返回关联的响应。
  • context 包含与客户端调用关联的作用域值。 使用 context 传递元数据,例如安全主体、凭据或跟踪数据。 此外,context 包含有关截止时间和取消功能的信息。 有关详细信息,请参阅具有截止时间和取消功能的可靠的 gRPC 服务

在客户端侦听器中等待响应

侦听器可以通过更新 AsyncUnaryCall<TResponse>.ResponseAsyncAsyncClientStreamingCall<TRequest, TResponse>.ResponseAsync 值,等待一元调用和客户端流式处理调用中的响应。

public class ErrorHandlerInterceptor : Interceptor
{
    public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
        TRequest request,
        ClientInterceptorContext<TRequest, TResponse> context,
        AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
    {
        var call = continuation(request, context);

        return new AsyncUnaryCall<TResponse>(
            HandleResponse(call.ResponseAsync),
            call.ResponseHeadersAsync,
            call.GetStatus,
            call.GetTrailers,
            call.Dispose);
    }

    private async Task<TResponse> HandleResponse<TResponse>(Task<TResponse> inner)
    {
        try
        {
            return await inner;
        }
        catch (Exception ex)
        {
            throw new InvalidOperationException("Custom error", ex);
        }
    }
}

前面的代码:

  • 创建了一个新侦听器,重写了 AsyncUnaryCall
  • 重写 AsyncUnaryCall
    • 调用 continuation 参数以调用侦听器链中的下一项。
    • 根据延续的结果创建一个新的 AsyncUnaryCall<TResponse> 实例。
    • 使用 HandleResponse 方法包装 ResponseAsync 任务。
    • 使用 HandleResponse 等待响应。 通过等待响应,可在客户端收到响应后添加逻辑。 通过在 try-catch 块中等待响应,可以记录来自调用的错误。

有关如何创建客户端侦听器的详细信息,请参阅 grpc/grpc-dotnet GitHub 存储库中的 ClientLoggerInterceptor.cs 示例

配置客户端侦听器

gRPC 客户端侦听器是在通道上配置的。

以下代码:

  • 使用 GrpcChannel.ForAddress 创建一个通道。
  • 使用 Intercept 扩展方法将通道配置为使用侦听器。 请注意,此方法返回 CallInvoker。 可以从调用程序(就像通道一样)创建强类型 gRPC 客户端。
  • 从调用程序创建客户端。 客户端发出的 gRPC 调用会自动执行侦听器。
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
var invoker = channel.Intercept(new ClientLoggerInterceptor());

var client = new Greeter.GreeterClient(invoker);

可以链接 Intercept 扩展方法,为一个通道配置多个侦听器。 或者,有一个接受多个侦听器的 Intercept 重载。 可以针对单个 gRPC 调用执行任意数目的侦听器,如以下示例所示:

var invoker = channel
    .Intercept(new ClientTokenInterceptor())
    .Intercept(new ClientMonitoringInterceptor())
    .Intercept(new ClientLoggerInterceptor());

侦听器是以链接的 Intercept 扩展方法的相反顺序调用的。 在前面的代码中,侦听器是按以下顺序调用的:

  1. ClientLoggerInterceptor
  2. ClientMonitoringInterceptor
  3. ClientTokenInterceptor

有关如何使用 gRPC 客户端工厂配置侦听器的信息,请参阅 .NET 中的 gRPC 客户端工厂集成

服务器侦听器

gRPC 服务器侦听器截获传入的 RPC 请求。 它们提供对传入的请求、传出的响应和服务器端调用的上下文的访问权限。

Interceptor 方法为服务器重写以下项:

  • UnaryServerHandler:截获一元 RPC。
  • ClientStreamingServerHandler:截获客户端流式处理 RPC。
  • ServerStreamingServerHandler:截获服务器流式处理 RPC。
  • DuplexStreamingServerHandler:截获双向流式处理 RPC。

创建服务器 gRPC 侦听器

以下代码展示了截获传入的一元 RPC 的示例:

public class ServerLoggerInterceptor : Interceptor
{
    private readonly ILogger _logger;

    public ServerLoggerInterceptor(ILogger<ServerLoggerInterceptor> logger)
    {
        _logger = logger;
    }

    public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
        TRequest request,
        ServerCallContext context,
        UnaryServerMethod<TRequest, TResponse> continuation)
    {
        _logger.LogInformation("Starting receiving call. Type/Method: {Type} / {Method}",
            MethodType.Unary, context.Method);
        try
        {
            return await continuation(request, context);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, $"Error thrown by {context.Method}.");
            throw;
        }
    }
}

重写 UnaryServerHandler

  • 截获传入的一元调用。
  • 记录有关调用的详细信息。
  • 调用传入方法的 continuation 参数。 如果这是最后一个侦听器,则调用链中的下一个侦听器,或者调用服务处理程序。
  • 记录任何异常。 通过等待延续,可在服务方法执行后添加逻辑。 通过在 try-catch 块中等待延续,可以记录来自方法的错误。

客户端和服务器侦听器方法的签名类似:

  • continuation 表示传入的 RPC 的一个委托,它调用链中的下一个侦听器或服务器处理程序(如果链中没有侦听器)。 与客户端侦听器类似,你随时都可以调用它,并且无需直接从延续委托返回响应。 通过等待延续,可在服务处理程序执行后添加出站逻辑。
  • context 包含与服务器端调用关联的元数据,例如请求元数据、截止时间、取消功能或 RPC 结果。

有关如何创建服务器侦听器的详细信息,请参阅 grpc/grpc-dotnet GitHub 存储库中的 ServerLoggerInterceptor.cs 示例

配置服务器侦听器

gRPC 服务器侦听器是在启动时配置的。 以下代码:

  • 使用 AddGrpc 将 gRPC 添加到应用。
  • 通过将 ServerLoggerInterceptor 添加到服务选项的 Interceptors 集合中,为所有服务配置它。
public void ConfigureServices(IServiceCollection services)
{
    services.AddGrpc(options =>
    {
        options.Interceptors.Add<ServerLoggerInterceptor>();
    });
}

也可以通过使用 AddServiceOptions 并指定服务类型,为特定服务配置侦听器。

public void ConfigureServices(IServiceCollection services)
{
    services
        .AddGrpc()
        .AddServiceOptions<GreeterService>(options =>
        {
            options.Interceptors.Add<ServerLoggerInterceptor>();
        });
}

侦听器是按它们添加到 InterceptorCollection 的顺序运行的。 如果同时配置了全局和单一服务侦听器,则全局配置的侦听器在针对单一服务配置的侦听器之前运行。

默认情况下,gRPC 服务器侦听器为每个请求设置了生存期。 通过向依赖关系注入注册侦听器类型,可以重写此行为。 以下示例注册 ServerLoggerInterceptor 的单一实例生存期:

public void ConfigureServices(IServiceCollection services)
{
    services.AddGrpc(options =>
    {
        options.Interceptors.Add<ServerLoggerInterceptor>();
    });

    services.AddSingleton<ServerLoggerInterceptor>();
}

gRPC 侦听器与中间件

与基于 C-core 的 gRPC 应用中的拦截器相比,ASP.NET Core 中间件提供类似功能。 ASP.NET Core 中间件和拦截器在概念上类似。 两者:

  • 用于构造处理 gRPC 请求的管道。
  • 允许在管道中的下一个组件前或后执行工作。
  • 提供对 HttpContext 的访问权限:
    • 在中间件中,HttpContext 是参数。
    • 在侦听器中,可以通过 ServerCallContext.GetHttpContext 扩展方法使用 ServerCallContext 参数访问 HttpContext。 此功能专门用于在 ASP.NET Core 中运行的侦听器。

gRPC 拦截器与 ASP.NET Core 中间件的不同之处:

  • 拦截器:
    • 使用 ServerCallContext 在 gRPC 抽象层上操作。
    • 提供对以下内容的访问权限:
      • 发送到调用的反序列化消息。
      • 序列化之前从调用返回的消息。
    • 可以捕获和处理从 gRPC 服务引发的异常。
  • 中间件:
    • 针对所有 HTTP 请求运行。
    • 在 gRPC 拦截器之前运行。
    • 对基础 HTTP/2 消息进行操作。
    • 只能访问请求和响应流中的字节。

其他资源