Condividi tramite


Creare servizi e metodi gRPC

Di James Newton-King

Questo documento illustra come creare servizi e metodi gRPC in C#. Gli argomenti includono:

  • Come definire i servizi e i metodi nei .proto file.
  • Codice generato con gli strumenti GRPC C#.
  • Implementazione di metodi e servizi gRPC.

Creare nuovi servizi gRPC

I servizi gRPC con C# hanno introdotto l'approccio contract-first di gRPC per lo sviluppo di API. I servizi e i messaggi sono definiti nei .proto file. Gli strumenti C# generano quindi codice dai .proto file. Per gli asset lato server, viene generato un tipo di base astratto per ogni servizio, insieme alle classi per tutti i messaggi.

Il file seguente .proto :

  • Definisce un Greeter servizio.
  • Il Greeter servizio definisce una SayHello chiamata.
  • SayHello invia un HelloRequest messaggio e riceve un HelloReply messaggio
syntax = "proto3";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

Gli strumenti C# generano il tipo di base C# GreeterBase :

public abstract partial class GreeterBase
{
    public virtual Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        throw new RpcException(new Status(StatusCode.Unimplemented, ""));
    }
}

public class HelloRequest
{
    public string Name { get; set; }
}

public class HelloReply
{
    public string Message { get; set; }
}

Per impostazione predefinita, l'oggetto generato GreeterBase non esegue alcuna operazione. Il metodo virtuale SayHello restituirà un UNIMPLEMENTED errore a tutti i client che la chiamano. Affinché il servizio sia utile, un'app deve creare un'implementazione concreta di GreeterBase:

public class GreeterService : GreeterBase
{
    public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        return Task.FromResult(new HelloReply { Message = $"Hello {request.Name}" });
    }
}

fornisce ServerCallContext il contesto per una chiamata lato server.

L'implementazione del servizio viene registrata con l'app. Se il servizio è ospitato da ASP.NET Core gRPC, deve essere aggiunto alla pipeline di routing con il MapGrpcService metodo .

app.MapGrpcService<GreeterService>();

Per altre informazioni, vedere gRPC services with ASP.NET Core (Servizi gRPC con ASP.NET Core ).

Implementare metodi gRPC

Un servizio gRPC può avere diversi tipi di metodi. Il modo in cui i messaggi vengono inviati e ricevuti da un servizio dipende dal tipo di metodo definito. I tipi di metodo gRPC sono:

  • Unario
  • Streaming del server
  • Streaming client
  • Streaming bidirezionale

Le chiamate di streaming vengono specificate con la stream parola chiave nel .proto file. stream può essere inserito nel messaggio di richiesta di una chiamata, nel messaggio di risposta o in entrambi.

syntax = "proto3";

service ExampleService {
  // Unary
  rpc UnaryCall (ExampleRequest) returns (ExampleResponse);

  // Server streaming
  rpc StreamingFromServer (ExampleRequest) returns (stream ExampleResponse);

  // Client streaming
  rpc StreamingFromClient (stream ExampleRequest) returns (ExampleResponse);

  // Bi-directional streaming
  rpc StreamingBothWays (stream ExampleRequest) returns (stream ExampleResponse);
}

Ogni tipo di chiamata ha una firma del metodo diversa. L'override dei metodi generati dal tipo di servizio di base astratto in un'implementazione concreta garantisce che vengano usati gli argomenti corretti e il tipo restituito.

Metodo unario

Un metodo unario ha il messaggio di richiesta come parametro e restituisce la risposta. Una chiamata unaria viene completata quando viene restituita la risposta.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var response = new ExampleResponse();
    return Task.FromResult(response);
}

Le chiamate unarie sono le azioni più simili ai controller API Web. Una differenza importante dei metodi gRPC da azioni è che i metodi gRPC non sono in grado di associare parti di una richiesta a argomenti di metodo diversi. I metodi gRPC hanno sempre un argomento di messaggio per i dati della richiesta in ingresso. È comunque possibile inviare più valori a un servizio gRPC aggiungendo campi al messaggio di richiesta:

message ExampleRequest {
    int32 pageIndex = 1;
    int32 pageSize = 2;
    bool isDescending = 3;
}

Metodo di streaming del server

Un metodo di streaming del server ha il messaggio di richiesta come parametro. Poiché più messaggi possono essere trasmessi al chiamante, responseStream.WriteAsync viene usato per inviare messaggi di risposta. Una chiamata di streaming del server viene completata al termine del metodo .

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    for (var i = 0; i < 5; i++)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

Il client non è in grado di inviare messaggi o dati aggiuntivi dopo l'avvio del metodo di streaming del server. Alcuni metodi di streaming sono progettati per essere eseguiti per sempre. Per i metodi di streaming continuo, un client può annullare la chiamata quando non è più necessaria. Quando si verifica l'annullamento, il client invia un segnale al server e viene generato ServerCallContext.CancellationToken . Il CancellationToken token deve essere usato nel server con metodi asincroni in modo che:

  • Qualsiasi lavoro asincrono viene annullato insieme alla chiamata di streaming.
  • Il metodo viene chiuso rapidamente.
public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    while (!context.CancellationToken.IsCancellationRequested)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

Metodo di streaming client

Un metodo di streaming client viene avviato senza che il metodo riceva un messaggio. Il requestStream parametro viene usato per leggere i messaggi dal client. Una chiamata di streaming client viene completata quando viene restituito un messaggio di risposta:

public override async Task<ExampleResponse> StreamingFromClient(
    IAsyncStreamReader<ExampleRequest> requestStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        // ...
    }
    return new ExampleResponse();
}

Metodo di streaming bidirezionale

Un metodo di streaming bidirezionale inizia senza che il metodo riceva un messaggio. Il requestStream parametro viene usato per leggere i messaggi dal client. Il metodo può scegliere di inviare messaggi con responseStream.WriteAsync. Una chiamata di streaming bidirezionale viene completata quando il metodo restituisce:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        await responseStream.WriteAsync(new ExampleResponse());
    }
}

Il codice precedente:

  • Invia una risposta per ogni richiesta.
  • Utilizzo di base dello streaming bidirezionale.

È possibile supportare scenari più complessi, ad esempio la lettura delle richieste e l'invio simultaneo di risposte:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    // Read requests in a background task.
    var readTask = Task.Run(async () =>
    {
        await foreach (var message in requestStream.ReadAllAsync())
        {
            // Process request.
        }
    });

    // Send responses until the client signals that it is complete.
    while (!readTask.IsCompleted)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

In un metodo di streaming bidirezionale, il client e il servizio possono inviare messaggi tra loro in qualsiasi momento. L'implementazione migliore di un metodo bidirezionale varia a seconda dei requisiti.

Accedere alle intestazioni della richiesta gRPC

Un messaggio di richiesta non è l'unico modo per un client di inviare dati a un servizio gRPC. I valori di intestazione sono disponibili in un servizio usando ServerCallContext.RequestHeaders.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var userAgent = context.RequestHeaders.GetValue("user-agent");
    // ...

    return Task.FromResult(new ExampleResponse());
}

Multithreading con metodi di streaming gRPC

Esistono considerazioni importanti per l'implementazione di metodi di streaming gRPC che usano più thread.

Thread safety lettore e writer

IAsyncStreamReader<TMessage> e IServerStreamWriter<TMessage> possono essere usati da un solo thread alla volta. Per un metodo gRPC di streaming, più thread non possono leggere nuovi messaggi contemporaneamente requestStream.MoveNext() . E più thread non possono scrivere nuovi messaggi contemporaneamente responseStream.WriteAsync(message) .

Un modo sicuro per consentire a più thread di interagire con un metodo gRPC consiste nell'usare il modello producer-consumer con System.Threading.Channels.

public override async Task DownloadResults(DataRequest request,
        IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
{
    var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5));

    var consumerTask = Task.Run(async () =>
    {
        // Consume messages from channel and write to response stream.
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            await responseStream.WriteAsync(message);
        }
    });

    var dataChunks = request.Value.Chunk(size: 10);

    // Write messages to channel from multiple threads.
    await Task.WhenAll(dataChunks.Select(
        async c =>
        {
            var message = new DataResult { BytesProcessed = c.Length };
            await channel.Writer.WriteAsync(message);
        }));

    // Complete writing and wait for consumer to complete.
    channel.Writer.Complete();
    await consumerTask;
}

Metodo di streaming del server gRPC precedente:

  • Crea un canale delimitato per la produzione e l'utilizzo di DataResult messaggi.
  • Avvia un'attività per leggere i messaggi dal canale e scriverli nel flusso di risposta.
  • Scrive messaggi nel canale da più thread.

Nota

I metodi di streaming bidirezionali accettano IAsyncStreamReader<TMessage> e IServerStreamWriter<TMessage> come argomenti. È possibile usare questi tipi in thread separati l'uno dall'altro.

Interazione con un metodo gRPC al termine di una chiamata

Una chiamata gRPC termina sul server dopo l'uscita dal metodo gRPC. Gli argomenti seguenti passati ai metodi gRPC non sono sicuri da usare al termine della chiamata:

  • ServerCallContext
  • IAsyncStreamReader<TMessage>
  • IServerStreamWriter<TMessage>

Se un metodo gRPC avvia attività in background che usano questi tipi, deve completare le attività prima dell'uscita dal metodo gRPC. Continuando a usare il contesto, il lettore di flusso o il writer di flusso dopo che il metodo gRPC esiste, causa errori e comportamenti imprevedibili.

Nell'esempio seguente il metodo di streaming del server potrebbe scrivere nel flusso di risposta al termine della chiamata:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    _ = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();
}

Per l'esempio precedente, la soluzione deve attendere l'attività di scrittura prima di uscire dal metodo :

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    var writeTask = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();

    await writeTask;
}

Risorse aggiuntive