Compartir a través de


Creación de servicios y métodos gRPC

Por James Newton-King

En este documento se explica cómo crear servicios y métodos gRPC en C#. Contenido de los temas:

  • Procedimientos para definir servicios y métodos en archivos .proto.
  • Código generado mediante herramientas de gRPC en C#.
  • Implementación de servicios y métodos gRPC.

Creación de servicios gRPC nuevos

Los servicios gRPC con C# presentaron el enfoque "contract-first" de gRPC para el desarrollo de API, en el que primero se diseña el modo de comunicación entre los servicios y, después, los propios servicios. Los servicios y mensajes se definen en los archivos .proto. Posteriormente, las herramientas de C# generan código a partir de los archivos .proto. En el caso de los recursos del lado servidor, se genera un tipo base abstracto para cada servicio, junto con las clases para los mensajes.

El archivo .proto siguiente:

  • Define un servicio Greeter.
  • El servicio Greeter define una llamada a SayHello.
  • SayHello envía un mensaje HelloRequest y recibe un mensaje HelloReply.
syntax = "proto3";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

Las herramientas de C# generan el tipo base GreeterBase de C#:

public abstract partial class GreeterBase
{
    public virtual Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        throw new RpcException(new Status(StatusCode.Unimplemented, ""));
    }
}

public class HelloRequest
{
    public string Name { get; set; }
}

public class HelloReply
{
    public string Message { get; set; }
}

De forma predeterminada, el elemento GreeterBase generado no hace nada. Su método SayHello virtual devolverá un error UNIMPLEMENTED a cualquier cliente que llame. Para que el servicio sea útil, una aplicación debe crear una implementación concreta de GreeterBase:

public class GreeterService : GreeterBase
{
    public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        return Task.FromResult(new HelloReply { Message = $"Hello {request.Name}" });
    }
}

ServerCallContext proporciona el contexto de una llamada del lado del servidor.

La implementación del servicio se registra con la aplicación. Si el servicio lo hospeda gRPC de ASP.NET Core, debe agregarse a la canalización de enrutamiento con el método MapGrpcService.

app.MapGrpcService<GreeterService>();

Consulte Servicios gRPC con ASP.NET Core para obtener más información.

Implementación de métodos gRPC

Un servicio gRPC puede tener distintos tipos de métodos. El modo en el que un servicio envía y recibe los mensajes depende del tipo de método definido. Los tipos de métodos gRPC son los siguientes:

  • Unario
  • Streaming de servidor
  • Streaming de cliente
  • Streaming bidireccional

Las llamadas de streaming se especifican con la palabra clave stream en el archivo .proto. stream se puede colocar en el mensaje de solicitud de una llamada, en el mensaje de respuesta o en ambos.

syntax = "proto3";

service ExampleService {
  // Unary
  rpc UnaryCall (ExampleRequest) returns (ExampleResponse);

  // Server streaming
  rpc StreamingFromServer (ExampleRequest) returns (stream ExampleResponse);

  // Client streaming
  rpc StreamingFromClient (stream ExampleRequest) returns (ExampleResponse);

  // Bi-directional streaming
  rpc StreamingBothWays (stream ExampleRequest) returns (stream ExampleResponse);
}

Cada tipo de llamada tiene una signatura de método distinta. El reemplazo de métodos generados a partir del tipo de servicio base abstracto en una implementación concreta garantiza que se usan los argumentos correctos y el tipo de valor devuelto.

Método unario

Un método unario tiene el mensaje de solicitud como parámetro y devuelve la respuesta. Se completa una llamada unaria cuando se devuelve la respuesta.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var response = new ExampleResponse();
    return Task.FromResult(response);
}

Las llamadas unarias son las más parecidas a las acciones en los controladores de API web. Una diferencia importante de los métodos gRPC respecto a las acciones es que los métodos gRPC no pueden enlazar partes de una solicitud a argumentos de método distintos. Los métodos gRPC siempre tienen un argumento de mensaje para los datos de la solicitud entrante. Todavía se pueden enviar varios valores a un servicio gRPC agregando campos al mensaje de solicitud:

message ExampleRequest {
    int32 pageIndex = 1;
    int32 pageSize = 2;
    bool isDescending = 3;
}

Método de streaming de servidor

Un método de streaming de servidor tiene el mensaje de solicitud como parámetro. Dado que se pueden transmitir varios mensajes de vuelta al autor de la llamada, responseStream.WriteAsync se usa para enviar mensajes de respuesta. Una llamada de streaming de servidor se completa cuando el método devuelve.

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    for (var i = 0; i < 5; i++)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

El cliente no tiene ninguna manera de enviar mensajes o datos adicionales una vez que se ha iniciado el método de streaming de servidor. Algunos métodos de streaming están diseñados para ejecutarse indefinidamente. En el caso de los métodos de streaming continuos, un cliente puede cancelar la llamada cuando ya no se necesite. Cuando se produce la cancelación, el cliente envía una señal al servidor y se genera ServerCallContext.CancellationToken. El token CancellationToken debe usarse en el servidor con métodos asincrónicos para que ocurra lo siguiente:

  • Cualquier trabajo asincrónico se cancela junto con la llamada de streaming.
  • El método finaliza rápidamente.
public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    while (!context.CancellationToken.IsCancellationRequested)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

Método de streaming de cliente

Un método de streaming de cliente comienza sin que el método reciba un mensaje. El parámetro requestStream se utiliza para leer los mensajes del cliente. Cuando se devuelve un mensaje de respuesta, se completa una llamada de streaming de cliente:

public override async Task<ExampleResponse> StreamingFromClient(
    IAsyncStreamReader<ExampleRequest> requestStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        // ...
    }
    return new ExampleResponse();
}

Método de streaming bidireccional

Un método de streaming bidireccional comienza sin que el método reciba un mensaje. El parámetro requestStream se utiliza para leer los mensajes del cliente. El método puede elegir enviar mensajes con responseStream.WriteAsync. Una llamada de streaming bidireccional se completa cuando el método devuelve lo siguiente:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        await responseStream.WriteAsync(new ExampleResponse());
    }
}

El código anterior:

  • Envía una respuesta para cada solicitud.
  • Es un uso básico del streaming bidireccional.

Es posible admitir escenarios más complejos, como la lectura de solicitudes y el envío de respuestas de forma simultánea:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    // Read requests in a background task.
    var readTask = Task.Run(async () =>
    {
        await foreach (var message in requestStream.ReadAllAsync())
        {
            // Process request.
        }
    });

    // Send responses until the client signals that it is complete.
    while (!readTask.IsCompleted)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

En un método de streaming bidireccional, el cliente y el servicio se pueden enviar mensajes entre sí en cualquier momento. La mejor implementación de un método bidireccional varía en función de los requisitos.

Acceso a los encabezados de solicitud gRPC

Un mensaje de solicitud no es la única manera que tiene un cliente para enviar datos a un servicio gRPC. Los valores de encabezado están disponibles en un servicio mediante ServerCallContext.RequestHeaders.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var userAgent = context.RequestHeaders.GetValue("user-agent");
    // ...

    return Task.FromResult(new ExampleResponse());
}

Multiproceso con métodos de streaming gRPC

Hay consideraciones importantes para implementar métodos de streaming de gRPC que usan varios subprocesos.

Seguridad de los subprocesos lector y escritor

IAsyncStreamReader<TMessage> y IServerStreamWriter<TMessage> solo se pueden usar en un subproceso a la vez. Para un método gRPC de streaming, varios subprocesos no pueden leer mensajes nuevos con requestStream.MoveNext() simultáneamente. Y varios subprocesos no pueden escribir mensajes nuevos con responseStream.WriteAsync(message) simultáneamente.

Una manera segura de permitir que varios subprocesos interactúen con un método gRPC es usar el patrón productor-consumidor con System.Threading.Channels.

public override async Task DownloadResults(DataRequest request,
        IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
{
    var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5));

    var consumerTask = Task.Run(async () =>
    {
        // Consume messages from channel and write to response stream.
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            await responseStream.WriteAsync(message);
        }
    });

    var dataChunks = request.Value.Chunk(size: 10);

    // Write messages to channel from multiple threads.
    await Task.WhenAll(dataChunks.Select(
        async c =>
        {
            var message = new DataResult { BytesProcessed = c.Length };
            await channel.Writer.WriteAsync(message);
        }));

    // Complete writing and wait for consumer to complete.
    channel.Writer.Complete();
    await consumerTask;
}

El método de streaming del servidor gRPC anterior:

  • Crea un canal limitado para producir y consumir mensajes DataResult.
  • Inicia una tarea para leer mensajes del canal y escribirlos en el flujo de respuesta.
  • Escribe mensajes en el canal desde varios subprocesos.

Nota

Los métodos de streaming bidireccionales toman IAsyncStreamReader<TMessage> y IServerStreamWriter<TMessage> como argumentos. Es seguro usar estos tipos en subprocesos independientes entre sí.

Interacción con un método gRPC después de que finalice una llamada

Una llamada gRPC finaliza en el servidor una vez que se cierra el método gRPC. Los argumentos siguientes pasados a los métodos gRPC no son seguros de usar una vez finalizada la llamada:

  • ServerCallContext
  • IAsyncStreamReader<TMessage>
  • IServerStreamWriter<TMessage>

Si un método gRPC inicia tareas en segundo plano que usan estos tipos, debe completar las tareas antes de que salga el método gRPC. Seguir usando el contexto, el lector de secuencias o el escritor de secuencias después de que exista el método gRPC provoca errores y un comportamiento impredecible.

En el ejemplo siguiente, el método de streaming del servidor puede escribir en la secuencia de respuesta una vez finalizada la llamada:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    _ = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();
}

En el ejemplo anterior, la solución es esperar a la tarea de escritura antes de salir del método:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    var writeTask = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();

    await writeTask;
}

Recursos adicionales