创建 gRPC 服务和方法

作者:James Newton-King

本文档介绍如何以 C# 创建 gRPC 服务和方法。 主题包括:

  • 如何在 .proto 文件中定义服务和方法。
  • 使用 gRPC C# 工具生成的代码。
  • 实现 gRPC 服务和方法。

创建新的 gRPC 服务

使用 C# 的 gRPC 服务介绍了 gRPC 的 API 开发协定优先方法。 服务和消息是在 .proto 文件中定义的。 然后,C# 工具从 .proto 文件生成代码。 对于服务器端资产,将为每个服务生成一个抽象基类型,同时为所有消息生成类。

以下 .proto 文件:

  • 定义 Greeter 服务。
  • Greeter 服务定义 SayHello 调用。
  • SayHello 发送 HelloRequest 消息并接收 HelloReply 消息
syntax = "proto3";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

C# 工具生成 C# GreeterBase 基类型:

public abstract partial class GreeterBase
{
    public virtual Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        throw new RpcException(new Status(StatusCode.Unimplemented, ""));
    }
}

public class HelloRequest
{
    public string Name { get; set; }
}

public class HelloReply
{
    public string Message { get; set; }
}

默认情况下,生成的 GreeterBase 不执行任何操作。 它的虚拟 SayHello 方法会将 UNIMPLEMENTED 错误返回到调用它的任何客户端。 为了使服务有用,应用必须创建 GreeterBase 的具体实现:

public class GreeterService : GreeterBase
{
    public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        return Task.FromResult(new HelloReply { Message = $"Hello {request.Name}" });
    }
}

ServerCallContext 提供服务器端调用的上下文。

服务实现已注册到应用。 如果服务由 ASP.NET Core gRPC 托管,则应使用 MapGrpcService 方法将其添加到路由管道。

app.MapGrpcService<GreeterService>();

有关详细信息,请参阅 ASP.NET Core 的 gRPC 服务

实现 gRPC 方法

gRPC 服务可以有不同类型的方法。 服务发送和接收消息的方式取决于所定义的方法的类型。 gRPC 方法类型如下:

  • 一元
  • 服务器流式处理
  • 客户端流式处理
  • 双向流式处理

流式处理调用是使用 stream 关键字在 .proto 文件中指定的。 stream 可以放置在调用的请求消息和/或响应消息中。

syntax = "proto3";

service ExampleService {
  // Unary
  rpc UnaryCall (ExampleRequest) returns (ExampleResponse);

  // Server streaming
  rpc StreamingFromServer (ExampleRequest) returns (stream ExampleResponse);

  // Client streaming
  rpc StreamingFromClient (stream ExampleRequest) returns (ExampleResponse);

  // Bi-directional streaming
  rpc StreamingBothWays (stream ExampleRequest) returns (stream ExampleResponse);
}

每个调用类型都有不同的方法签名。 在具体实现中替代从抽象基本服务类型生成的方法,可确保使用正确的参数和返回类型。

一元方法

一元方法将请求消息作为参数,并返回响应。 返回响应时,一元调用完成。

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var response = new ExampleResponse();
    return Task.FromResult(response);
}

一元调用与 Web API 控制器上的操作最为相似。 gRPC 方法与操作的一个重要区别是,gRPC 方法无法将请求的某些部分绑定到不同的方法参数。 对于传入请求数据,gRPC 方法始终有一个消息参数。 通过在请求消息中添加字段,仍可以将多个值发送到 gRPC 服务:

message ExampleRequest {
    int32 pageIndex = 1;
    int32 pageSize = 2;
    bool isDescending = 3;
}

服务器流式处理方法

服务器流式处理方法将请求消息作为参数。 由于可以将多个消息流式传输回调用方,因此可使用 responseStream.WriteAsync 发送响应消息。 当方法返回时,服务器流式处理调用完成。

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    for (var i = 0; i < 5; i++)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

服务器流式处理方法启动后,客户端无法发送其他消息或数据。 某些流式处理方法设计为永久运行。 对于连续流式处理方法,客户端可以在不再需要调用时将其取消。 当发生取消时,客户端会将信号发送到服务器,并引发 ServerCallContext.CancellationToken。 应在服务器上通过异步方法使用 CancellationToken 标记,以实现以下目的:

  • 所有异步工作都与流式处理调用一起取消。
  • 该方法快速退出。
public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    while (!context.CancellationToken.IsCancellationRequested)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

客户端流式处理方法

客户端流式处理方法在该方法没有接收消息的情况下启动。 requestStream 参数用于从客户端读取消息。 返回响应消息时,客户端流式处理调用完成:

public override async Task<ExampleResponse> StreamingFromClient(
    IAsyncStreamReader<ExampleRequest> requestStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        // ...
    }
    return new ExampleResponse();
}

双向流式处理方法

双向流式处理方法在该方法没有接收到消息的情况下启动。 requestStream 参数用于从客户端读取消息。 该方法可选择使用 responseStream.WriteAsync 发送消息。 当方法返回时,双向流式处理调用完成:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        await responseStream.WriteAsync(new ExampleResponse());
    }
}

前面的代码:

  • 发送每个请求的响应。
  • 是双向流式处理的基本用法。

可以支持更复杂的方案,例如同时读取请求和发送响应:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    // Read requests in a background task.
    var readTask = Task.Run(async () =>
    {
        await foreach (var message in requestStream.ReadAllAsync())
        {
            // Process request.
        }
    });

    // Send responses until the client signals that it is complete.
    while (!readTask.IsCompleted)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

在双向流式处理方法中,客户端和服务可在任何时间互相发送消息。 双向方法的最佳实现根据需求而有所不同。

访问 gRPC 请求标头

请求消息并不是客户端将数据发送到 gRPC 服务的唯一方法。 标头值在使用 ServerCallContext.RequestHeaders 的服务中可用。

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var userAgent = context.RequestHeaders.GetValue("user-agent");
    // ...

    return Task.FromResult(new ExampleResponse());
}

使用 gRPC 流式处理方法的多线程处理

实现使用多个线程的 gRPC 流式处理方法有一些重要的注意事项。

读取器和编写器线程安全性

IAsyncStreamReader<TMessage>IServerStreamWriter<TMessage> 一次只能由一个线程使用。 对于流式处理 gRPC 方法,多个线程无法使用 requestStream.MoveNext() 同时读取新消息。 多个线程无法使用 responseStream.WriteAsync(message) 同时写入新消息。

多个线程能够与 gRPC 方法实现交互的一种安全方法是将生成方-使用者模式与 System.Threading.Channels 配合使用。

public override async Task DownloadResults(DataRequest request,
        IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
{
    var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5));

    var consumerTask = Task.Run(async () =>
    {
        // Consume messages from channel and write to response stream.
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            await responseStream.WriteAsync(message);
        }
    });

    var dataChunks = request.Value.Chunk(size: 10);

    // Write messages to channel from multiple threads.
    await Task.WhenAll(dataChunks.Select(
        async c =>
        {
            var message = new DataResult { BytesProcessed = c.Length };
            await channel.Writer.WriteAsync(message);
        }));

    // Complete writing and wait for consumer to complete.
    channel.Writer.Complete();
    await consumerTask;
}

之前的 gRPC 服务器流式处理方法:

  • 创建用于生成和使用 DataResult 消息的有界通道。
  • 启动从通道读取消息,然后将消息写入响应流的任务。
  • 从多个线程将消息写入通道。

注意

双向流式处理方法采用 IAsyncStreamReader<TMessage>IServerStreamWriter<TMessage> 作为自变量。 在彼此独立的线程上使用这些类型是安全的。

调用结束后与 gRPC 方法交互

gRPC 方法退出后,gRPC 调用将在服务器上结束。 在调用结束后,使用以下传递到 gRPC 方法的自变量并不安全:

  • ServerCallContext
  • IAsyncStreamReader<TMessage>
  • IServerStreamWriter<TMessage>

如果 gRPC 方法启动使用这些类型的后台任务,则必须在 gRPC 方法退出之前完成任务。 在 gRPC 方法退出后,若继续使用上下文、流读取器或流编写器,将导致错误和不可预知的行为。

在以下示例中,服务器流式处理方法可以在调用完成后写入响应流:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    _ = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();
}

对于前面的示例,解决方案是在退出方法之前等待写入任务:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    var writeTask = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();

    await writeTask;
}

其他资源