Partilhar via


Criar serviços e métodos gRPC

Por James Newton-King

Este documento explica como criar serviços e métodos gRPC em C#. Os tópicos incluem:

  • Como definir serviços e métodos em arquivos .proto.
  • Código gerado usando ferramentas gRPC em C#.
  • Implementando serviços e métodos gRPC.

Criar novos serviços gRPC

Os serviços gRPC com C# introduziram a abordagem de primeiro contrato do gRPC para desenvolvimento de APIs. Serviços e mensagens são definidos em arquivos .proto: Em seguida, as ferramentas em C# geram código a partir de .proto. Para ativos do lado do servidor, um tipo base abstrato é gerado para cada serviço, juntamente com classes para qualquer mensagem.

O seguinte arquivo .proto:

  • Define um serviço Greeter.
  • O serviço Greeter define uma chamada SayHello.
  • SayHello envia uma mensagem HelloRequest e recebe uma mensagem HelloReply
syntax = "proto3";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

As ferramentas em C# geram o tipo base GreeterBase em C#:

public abstract partial class GreeterBase
{
    public virtual Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        throw new RpcException(new Status(StatusCode.Unimplemented, ""));
    }
}

public class HelloRequest
{
    public string Name { get; set; }
}

public class HelloReply
{
    public string Message { get; set; }
}

Por padrão, o GreeterBase gerado não faz nada. Seu método virtual SayHello retornará um erro UNIMPLEMENTED para todos os clientes que o chamam. Para que o serviço seja útil, um aplicativo deve criar uma implementação concreta de GreeterBase:

public class GreeterService : GreeterBase
{
    public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        return Task.FromResult(new HelloReply { Message = $"Hello {request.Name}" });
    }
}

O ServerCallContext fornece o contexto para uma chamada do lado do servidor.

A implementação do serviço é registrada com o aplicativo. Se o serviço for hospedado pelo gRPC do ASP.NET Core, ele deverá ser adicionado ao pipeline de roteamento com o método MapGrpcService.

app.MapGrpcService<GreeterService>();

Confira os serviços gRPC com ASP.NET Core para obter mais informações.

Implementar métodos gRPC

Um serviço gRPC pode ter diferentes tipos de métodos. A forma como as mensagens são enviadas e recebidas por um serviço depende do tipo de método definido. Os tipos de método gRPC são:

  • Unário
  • Streaming de servidor
  • Streaming de cliente
  • Fluxo de dados bidirecional

As chamadas de fluxo de dados são especificadas com a palavra-chave stream no arquivo .proto. stream pode ser colocado na mensagem de solicitação, na mensagem de resposta ou em ambas.

syntax = "proto3";

service ExampleService {
  // Unary
  rpc UnaryCall (ExampleRequest) returns (ExampleResponse);

  // Server streaming
  rpc StreamingFromServer (ExampleRequest) returns (stream ExampleResponse);

  // Client streaming
  rpc StreamingFromClient (stream ExampleRequest) returns (ExampleResponse);

  // Bi-directional streaming
  rpc StreamingBothWays (stream ExampleRequest) returns (stream ExampleResponse);
}

Cada tipo de chamada tem uma assinatura de método diferente. Substituir métodos gerados do tipo de serviço base abstrato em uma implementação concreta garante que os argumentos corretos e o tipo de retorno sejam usados.

Método unário

Um método unário tem a mensagem de solicitação como um parâmetro e retorna a resposta. Uma chamada unária é concluída quando a resposta é retornada.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var response = new ExampleResponse();
    return Task.FromResult(response);
}

Chamadas unárias são as mais semelhantes às ações em controladores de APIs Web. Uma diferença importante que os métodos gRPC têm das ações é que os métodos gRPC não são capazes de associar partes de uma solicitação a diferentes argumentos de método. Os métodos gRPC sempre têm um argumento de mensagem para os dados de solicitação de entrada. Vários valores ainda podem ser enviados a um serviço gRPC adicionando campos à mensagem de solicitação:

message ExampleRequest {
    int32 pageIndex = 1;
    int32 pageSize = 2;
    bool isDescending = 3;
}

Método de fluxo de dados de servidor

Um método de fluxo de dados de servidor tem a mensagem de solicitação como um parâmetro. Como várias mensagens podem ser transmitidas de volta para o chamador, responseStream.WriteAsync é usado para enviar mensagens de resposta. Uma chamada de fluxo de dados de servidor é concluída quando o método retorna.

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    for (var i = 0; i < 5; i++)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

O cliente não tem como enviar mensagens ou dados adicionais depois que o método de fluxo de dados de servidor for iniciado. Alguns métodos de fluxo de dados foram projetados para serem executados para sempre. Para métodos de fluxo de dados contínuos, um cliente pode cancelar a chamada quando ela não for mais necessária. Quando o cancelamento acontece, o cliente envia um sinal para o servidor e o ServerCallContext.CancellationToken é gerado. O token CancellationToken deve ser usado no servidor com métodos assíncronos para que:

  • Qualquer trabalho assíncrono seja cancelado junto com a chamada de fluxo de dados.
  • O método seja encerrado rapidamente.
public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    while (!context.CancellationToken.IsCancellationRequested)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

Método de fluxo de dados do cliente

Um método de fluxo de dados do cliente é iniciado sem que o método receba uma mensagem. O parâmetro requestStream é usado para ler mensagens do cliente. Uma chamada de fluxo de dados do cliente é concluída quando uma mensagem de resposta é retornada:

public override async Task<ExampleResponse> StreamingFromClient(
    IAsyncStreamReader<ExampleRequest> requestStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        // ...
    }
    return new ExampleResponse();
}

Método de fluxo de dados bidirecional

Um método de fluxo de dados do cliente é iniciado sem que o método receba uma mensagem. O parâmetro requestStream é usado para ler mensagens do cliente. O método pode optar por enviar mensagens com responseStream.WriteAsync. Uma chamada de fluxo de dados bidirecional é concluída quando o método retorna:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        await responseStream.WriteAsync(new ExampleResponse());
    }
}

O código anterior:

  • Envia uma resposta para cada solicitação.
  • É um uso básico do fluxo de dados bidirecional.

É possível dar suporte a cenários mais complexos, como ler solicitações e enviar respostas simultaneamente:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    // Read requests in a background task.
    var readTask = Task.Run(async () =>
    {
        await foreach (var message in requestStream.ReadAllAsync())
        {
            // Process request.
        }
    });

    // Send responses until the client signals that it is complete.
    while (!readTask.IsCompleted)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

Em um método de fluxo de dados bidirecional, o cliente e o serviço podem enviar mensagens entre si a qualquer momento. A melhor implementação de um método bidirecional varia dependendo dos requisitos.

Acessar cabeçalhos de solicitação gRPC

Uma mensagem de solicitação não é a única maneira de um cliente enviar dados para um serviço gRPC. Os valores de cabeçalho estão disponíveis em um serviço usando ServerCallContext.RequestHeaders.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var userAgent = context.RequestHeaders.GetValue("user-agent");
    // ...

    return Task.FromResult(new ExampleResponse());
}

Multithreading com métodos de fluxo de dados gRPC

Há considerações importantes a fazer ao implementar métodos de fluxo de dados gRPC que usam vários threads.

Acesso thread-safe de leitura e gravação

IAsyncStreamReader<TMessage> e IServerStreamWriter<TMessage> podem ser usados por apenas um thread de cada vez. Para um método gRPC de fluxo de dados, vários threads não podem ler novas mensagens com requestStream.MoveNext() simultaneamente. E vários threads não podem gravar novas mensagens com responseStream.WriteAsync(message) simultaneamente.

Uma maneira segura de permitir que vários threads interajam com um método gRPC é usar o padrão produtor-consumidor com System.Threading.Channels.

public override async Task DownloadResults(DataRequest request,
        IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
{
    var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5));

    var consumerTask = Task.Run(async () =>
    {
        // Consume messages from channel and write to response stream.
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            await responseStream.WriteAsync(message);
        }
    });

    var dataChunks = request.Value.Chunk(size: 10);

    // Write messages to channel from multiple threads.
    await Task.WhenAll(dataChunks.Select(
        async c =>
        {
            var message = new DataResult { BytesProcessed = c.Length };
            await channel.Writer.WriteAsync(message);
        }));

    // Complete writing and wait for consumer to complete.
    channel.Writer.Complete();
    await consumerTask;
}

O método de fluxo de dados de servidor gRPC anterior:

  • Cria um canal limitado para produzir e consumir mensagens DataResult.
  • Inicia uma tarefa para ler mensagens do canal e gravá-las no fluxo de resposta.
  • Grava mensagens no canal de vários threads.

Observação

Métodos de fluxo de dados bidirecional tomam IAsyncStreamReader<TMessage> e IServerStreamWriter<TMessage> como argumentos. É seguro usar esses tipos em threads separados uns dos outros.

Interagir com um método gRPC após o término de uma chamada

Uma chamada gRPC termina no servidor quando o método gRPC é encerrado. Os seguintes argumentos passados para métodos gRPC não são seguros de usar após o término da chamada:

  • ServerCallContext
  • IAsyncStreamReader<TMessage>
  • IServerStreamWriter<TMessage>

Se um método gRPC iniciar tarefas em segundo plano que usam esses tipos, ele deverá concluir as tarefas antes que o método gRPC seja encerrado. Continuar a usar o contexto, o leitor de fluxo ou o gravador de fluxo após a existência do método gRPC causará erros e comportamento imprevisível.

No exemplo a seguir, o método de fluxo de dados de servidor pode gravar no fluxo de resposta após a conclusão da chamada:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    _ = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();
}

Para o exemplo anterior, a solução é aguardar a tarefa de gravação antes de sair do método:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    var writeTask = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();

    await writeTask;
}

Recursos adicionais