Partager via


Créer des services et des méthodes gRPC

Par James Newton-King

Ce document explique comment créer des services et des méthodes gRPC en C#. Les sujets abordés sont les suivants :

  • Comment définir des services et des méthodes dans les fichiers .proto.
  • Code généré à l’aide des outils C# gRPC.
  • Implémentation des services et méthodes gRPC.

Créer de nouveaux services gRPC

Les services gRPC avec C# ont introduit l’approche de contrat en premier de gRPC pour le développement d’API. Les services et les messages sont définis dans des fichiers .proto. Les outils C# génèrent ensuite un code à partir des fichiers .proto. Pour les ressources côté serveur, un type de base abstrait est généré pour chaque service, ainsi que des classes pour tous les messages.

Le fichier .proto suivant :

  • Définit un service Greeter.
  • Le service Greeter définit un appel SayHello.
  • SayHello envoie un message HelloRequest et reçoit un message HelloReply
syntax = "proto3";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

Les outils C# génèrent le type de base C# GreeterBase :

public abstract partial class GreeterBase
{
    public virtual Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        throw new RpcException(new Status(StatusCode.Unimplemented, ""));
    }
}

public class HelloRequest
{
    public string Name { get; set; }
}

public class HelloReply
{
    public string Message { get; set; }
}

Par défaut, le GreeterBase généré ne fait rien. Sa méthode SayHello virtuelle renvoie une erreur UNIMPLEMENTED à tous les clients qui l’appellent. Pour que le service soit utile, une application doit créer une implémentation concrète de GreeterBase :

public class GreeterService : GreeterBase
{
    public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        return Task.FromResult(new HelloReply { Message = $"Hello {request.Name}" });
    }
}

ServerCallContext donne le contexte d’un appel côté serveur.

L’implémentation du service est inscrite auprès de l’application. Si le service est hébergé par ASP.NET Core gRPC, il doit être ajouté au pipeline de routage avec la méthode MapGrpcService.

app.MapGrpcService<GreeterService>();

Pour plus d’informations, consultez Services gRPC avec ASP.NET Core.

Implémenter des méthodes gRPC

Un service gRPC peut avoir différents types de méthodes. La façon dont les messages sont envoyés et reçus par un service dépend du type de méthode défini. Les types de méthodes gRPC sont les suivants :

  • Unaire
  • Diffusion en continu du serveur
  • Diffusion en continu du client
  • Diffusion en continu bidirectionnelle

Les appels de diffusion en continu sont spécifiés par le mot clé streamdans le fichier .proto. stream peut être placé sur le message de requête d’un appel, le message de réponse ou les deux.

syntax = "proto3";

service ExampleService {
  // Unary
  rpc UnaryCall (ExampleRequest) returns (ExampleResponse);

  // Server streaming
  rpc StreamingFromServer (ExampleRequest) returns (stream ExampleResponse);

  // Client streaming
  rpc StreamingFromClient (stream ExampleRequest) returns (ExampleResponse);

  // Bi-directional streaming
  rpc StreamingBothWays (stream ExampleRequest) returns (stream ExampleResponse);
}

Chaque type d’appel a une signature de méthode différente. Le remplacement des méthodes générées à partir du type de service de base abstrait dans une implémentation concrète garantit l’utilisation des arguments et du type de retour corrects.

Méthode unaire

Une méthode unaire a le message de requête en tant que paramètre et renvoie la réponse. Un appel unaire est terminé lorsque la réponse est renvoyée.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var response = new ExampleResponse();
    return Task.FromResult(response);
}

Les appels unaires sont les plus similaires aux actions sur les contrôleurs d’API web. Une différence importante entre les méthodes gRPC et les actions est que les méthodes gRPC ne peuvent pas lier des parties d’une requête à de différents arguments de méthode. Les méthodes gRPC ont toujours un seul argument de message pour les données de requête entrantes. Plusieurs valeurs peuvent toujours être envoyées à un service gRPC en ajoutant des champs sur le message de requête :

message ExampleRequest {
    int32 pageIndex = 1;
    int32 pageSize = 2;
    bool isDescending = 3;
}

Méthode de diffusion en continu du serveur

Une méthode de diffusion en continu de serveur a le message de requête en tant que paramètre. Étant donné que plusieurs messages peuvent être diffusés en continu vers l’appelant, responseStream.WriteAsync est utilisé pour envoyer des messages de réponse. Un appel de diffusion en continu de serveur est terminé lorsque la méthode retourne.

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    for (var i = 0; i < 5; i++)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

Le client n’a aucun moyen d’envoyer des messages ou des données supplémentaires une fois que la méthode de diffusion en continu du serveur a démarré. Certaines méthodes de diffusion en continu sont conçues pour s’exécuter indéfiniment. Pour les méthodes de diffusion en continu, un client peut annuler l’appel lorsqu’il n’est plus nécessaire. En cas d’annulation, le client envoie un signal au serveur et serverCallContext.CancellationToken est déclenché. Le jeton CancellationToken doit être utilisé sur le serveur avec des méthodes asynchrones afin que :

  • Tout travail asynchrone soit annulé avec l’appel de diffusion en continu.
  • La méthode s’arrête rapidement.
public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    while (!context.CancellationToken.IsCancellationRequested)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

Méthode de diffusion en continu du client

Une méthode de diffusion en continu du client démarre sans que la méthode reçoive un message. Le paramètre requestStream est utilisé pour lire les messages du client. Un appel de diffusion en continu du client est terminé lorsqu’un message de réponse est retourn :

public override async Task<ExampleResponse> StreamingFromClient(
    IAsyncStreamReader<ExampleRequest> requestStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        // ...
    }
    return new ExampleResponse();
}

Méthode de diffusion en continu bidirectionnelle

Une méthode de diffusion en continu bidirectionnelle démarre sans que la méthode reçoive un message. Le paramètre requestStream est utilisé pour lire les messages du client. La méthode peut envoyer des messages avec responseStream.WriteAsync. Un appel de diffusion en continu bidirectionnelle est terminé lorsque la méthode retourne :

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        await responseStream.WriteAsync(new ExampleResponse());
    }
}

Le code précédent :

  • Envoie une réponse à chaque requête.
  • Est une utilisation de base de la diffusion en continu bidirectionnelle.

Il est possible de prendre en charge des scénarios plus complexes, tels que la lecture des requêtes et l’envoi de réponses simultanément :

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    // Read requests in a background task.
    var readTask = Task.Run(async () =>
    {
        await foreach (var message in requestStream.ReadAllAsync())
        {
            // Process request.
        }
    });

    // Send responses until the client signals that it is complete.
    while (!readTask.IsCompleted)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

Dans une méthode de diffusion en continu bidirectionnelle, le client et le service peuvent s’envoyer des messages l’un à l’autre à tout moment. La meilleure implémentation d’une méthode bidirectionnelle varie en fonction des exigences.

Accéder aux en-têtes de requête gRPC

Un message de requête n’est pas le seul moyen pour un client d’envoyer des données à un service gRPC. Les valeurs d’en-tête sont disponibles dans un service à l’aide de ServerCallContext.RequestHeaders.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var userAgent = context.RequestHeaders.GetValue("user-agent");
    // ...

    return Task.FromResult(new ExampleResponse());
}

Multithreading avec des méthodes de diffuser en continu gRPC

Quelques points importants concernant l’implémentation de méthodes de diffusion en continu gRPC utilisant plusieurs threads doivent être pris en compte.

Sécurité des threads du lecteur et de l’enregistreur

IAsyncStreamReader<TMessage> et IServerStreamWriter<TMessage> peuvent être utilisés par un seul thread à la fois. Pour une méthode gRPC de diffusion en continu, plusieurs threads ne peuvent pas lire de nouveaux messages en même temps que requestStream.MoveNext(). De même, plusieurs threads ne peuvent pas écrire de nouveaux messages en même temps que responseStream.WriteAsync(message).

Un moyen sûr de permettre à plusieurs threads d’interagir avec une méthode gRPC consiste à utiliser le modèle producteur-consommateur avec System.Threading.Channels.

public override async Task DownloadResults(DataRequest request,
        IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
{
    var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5));

    var consumerTask = Task.Run(async () =>
    {
        // Consume messages from channel and write to response stream.
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            await responseStream.WriteAsync(message);
        }
    });

    var dataChunks = request.Value.Chunk(size: 10);

    // Write messages to channel from multiple threads.
    await Task.WhenAll(dataChunks.Select(
        async c =>
        {
            var message = new DataResult { BytesProcessed = c.Length };
            await channel.Writer.WriteAsync(message);
        }));

    // Complete writing and wait for consumer to complete.
    channel.Writer.Complete();
    await consumerTask;
}

La méthode de diffusion en continu du serveur gRPC précédente :

  • Crée un canal délimité pour la production et la consommation de messages DataResult.
  • Démarre une tâche de lecture des messages du canal et de leur écriture dans le flux de réponses.
  • Écrit des messages dans le canal à partir de plusieurs threads.

Notes

Les méthodes de diffusion en continu bidirectionnelle prennent IAsyncStreamReader<TMessage> et IServerStreamWriter<TMessage> comme arguments. Il est sûr d’utiliser ces types sur des threads distincts les uns des autres.

Interaction avec une méthode gRPC après la fin d’un appel

Un appel gRPC se termine sur le serveur une fois la méthode gRPC s’arrête. Les arguments suivants passés aux méthodes gRPC ne peuvent pas être utilisés en toute sécurité une fois l’appel terminé :

  • ServerCallContext
  • IAsyncStreamReader<TMessage>
  • IServerStreamWriter<TMessage>

Si une méthode gRPC démarre des tâches en arrière-plan qui utilisent ces types, elle doit terminer les tâches avant l’arrêt de la méthode gRPC. Le fait de continuer à utiliser le contexte, le lecteur ou l’enregistreur de flux alors que la méthode gRPC existe entraîne des erreurs et un comportement imprévisible.

Dans l’exemple suivant, la méthode de diffusion en continu du serveur peut écrire dans le flux de réponses une fois l’appel terminé :

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    _ = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();
}

Dans l’exemple précédent, la solution consiste à attendre la tâche d’écriture avant de quitter la méthode :

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    var writeTask = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();

    await writeTask;
}

Ressources supplémentaires