Sdílet prostřednictvím


Vytváření služeb a metod gRPC

Poznámka:

Toto není nejnovější verze tohoto článku. Aktuální verzi najdete ve verzi .NET 8 tohoto článku.

Upozorňující

Tato verze ASP.NET Core se už nepodporuje. Další informace najdete v tématu .NET a .NET Core Zásady podpory. Aktuální verzi najdete ve verzi .NET 8 tohoto článku.

Důležité

Tyto informace se týkají předběžného vydání produktu, který může být podstatně změněn před komerčním vydáním. Microsoft neposkytuje žádné záruky, výslovné ani předpokládané, týkající se zde uváděných informací.

Aktuální verzi najdete ve verzi .NET 8 tohoto článku.

Autor: James Newton-King

Tento dokument vysvětluje, jak vytvořit služby a metody gRPC v jazyce C#. Témata:

  • Jak definovat služby a metody v .proto souborech
  • Vygenerovaný kód pomocí nástrojů gRPC C#.
  • Implementace služeb a metod gRPC

Vytvoření nových služeb gRPC

Služby gRPC s jazykem C# zavedly přístup gRPC pro vývoj rozhraní API jako první kontrakt. Služby a zprávy jsou definovány v .proto souborech. Nástroje jazyka C# pak vygenerují kód ze .proto souborů. U prostředků na straně serveru se pro každou službu generuje abstraktní základní typ spolu s třídami pro všechny zprávy.

.proto Následující soubor:

  • Definuje Greeter službu.
  • Služba Greeter definuje SayHello volání.
  • SayHelloHelloRequest odešle zprávu a obdrží HelloReply zprávu.
syntax = "proto3";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

Nástroje jazyka C# generují základní typ jazyka C# GreeterBase :

public abstract partial class GreeterBase
{
    public virtual Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        throw new RpcException(new Status(StatusCode.Unimplemented, ""));
    }
}

public class HelloRequest
{
    public string Name { get; set; }
}

public class HelloReply
{
    public string Message { get; set; }
}

Ve výchozím nastavení vygenerované GreeterBase nic nedělá. Její virtuální SayHello metoda vrátí UNIMPLEMENTED chybu všem klientům, kteří ji volají. Aby služba byla užitečná, musí aplikace vytvořit konkrétní implementaci GreeterBase:

public class GreeterService : GreeterBase
{
    public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        return Task.FromResult(new HelloReply { Message = $"Hello {request.Name}" });
    }
}

Poskytuje ServerCallContext kontext pro volání na straně serveru.

Implementace služby je zaregistrovaná v aplikaci. Pokud je služba hostovaná ASP.NET Core gRPC, měla by se přidat do kanálu směrování pomocí MapGrpcService metody.

app.MapGrpcService<GreeterService>();

Další informace najdete v tématu služby gRPC s ASP.NET Core .

Implementace metod gRPC

Služba gRPC může mít různé typy metod. Způsob odesílání a přijetí zpráv službou závisí na typu definované metody. Typy metod gRPC jsou:

  • Unární
  • Streamování serveru
  • Streamování klienta
  • Obousměrné streamování

Volání streamování se zadají pomocí klíčového stream slova v .proto souboru. stream lze umístit na zprávu žádosti o hovor, odpověď nebo obojí.

syntax = "proto3";

service ExampleService {
  // Unary
  rpc UnaryCall (ExampleRequest) returns (ExampleResponse);

  // Server streaming
  rpc StreamingFromServer (ExampleRequest) returns (stream ExampleResponse);

  // Client streaming
  rpc StreamingFromClient (stream ExampleRequest) returns (ExampleResponse);

  // Bi-directional streaming
  rpc StreamingBothWays (stream ExampleRequest) returns (stream ExampleResponse);
}

Každý typ volání má jiný podpis metody. Přepsání generovaných metod z abstraktního základního typu služby v konkrétní implementaci zajišťuje použití správných argumentů a návratového typu.

Unární metoda

Unární metoda má zprávu požadavku jako parametr a vrátí odpověď. Unární volání je dokončeno po vrácení odpovědi.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var response = new ExampleResponse();
    return Task.FromResult(response);
}

Unární volání jsou nejvíce podobná akcím na řadičích webového rozhraní API. Jedním z důležitých rozdílů metod gRPC je, že metody gRPC nemůžou svázat části požadavku s různými argumenty metody. Metody gRPC vždy obsahují jeden argument zprávy pro příchozí data požadavku. Do služby gRPC je stále možné odeslat více hodnot přidáním polí do zprávy požadavku:

message ExampleRequest {
    int32 pageIndex = 1;
    int32 pageSize = 2;
    bool isDescending = 3;
}

Metoda streamování serveru

Metoda streamování serveru má jako parametr zprávu požadavku. Vzhledem k tomu, že se do volajícího dá streamovat více zpráv, responseStream.WriteAsync slouží k odesílání zpráv s odpověďmi. Volání streamování serveru je dokončeno, když metoda vrátí.

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    for (var i = 0; i < 5; i++)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

Klient nemá způsob, jak po spuštění metody streamování serveru odesílat další zprávy nebo data. Některé metody streamování jsou navržené tak, aby běžely navždy. U metod průběžného streamování může klient zrušit volání, když už není potřeba. Když dojde ke zrušení, klient odešle signál na server a ServerCallContext.CancellationToken je vyvolán . Token CancellationToken by měl být použit na serveru s asynchronními metodami, aby:

  • Všechna asynchronní práce se zruší společně s voláním streamování.
  • Metoda se rychle ukončí.
public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    while (!context.CancellationToken.IsCancellationRequested)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

Metoda streamování klienta

Metoda streamování klienta se spustí bez toho, aby metoda přijímala zprávu. Parametr requestStream se používá ke čtení zpráv z klienta. Volání streamování klienta se dokončí, když se vrátí zpráva odpovědi:

public override async Task<ExampleResponse> StreamingFromClient(
    IAsyncStreamReader<ExampleRequest> requestStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        // ...
    }
    return new ExampleResponse();
}

Obousměrná metoda streamování

Obousměrná metoda streamování se spustí bez toho, aby metoda přijímala zprávu. Parametr requestStream se používá ke čtení zpráv z klienta. Metoda může zvolit odesílání zpráv pomocí responseStream.WriteAsync. Obousměrné volání streamování je dokončeno, když metoda vrátí:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        await responseStream.WriteAsync(new ExampleResponse());
    }
}

Předchozí kód:

  • Odešle odpověď pro každý požadavek.
  • Je základním využitím obousměrného streamování.

Je možné podporovat složitější scénáře, například čtení požadavků a odesílání odpovědí současně:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    // Read requests in a background task.
    var readTask = Task.Run(async () =>
    {
        await foreach (var message in requestStream.ReadAllAsync())
        {
            // Process request.
        }
    });

    // Send responses until the client signals that it is complete.
    while (!readTask.IsCompleted)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

V obousměrné metodě streamování může klient a služba kdykoli odesílat zprávy mezi sebou. Nejlepší implementace obousměrné metody se liší v závislosti na požadavcích.

Hlavičky požadavku gRPC pro Access

Zpráva požadavku není jediným způsobem, jak klient odesílat data do služby gRPC. Hodnoty hlaviček jsou ve službě k dispozici pomocí ServerCallContext.RequestHeaders.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var userAgent = context.RequestHeaders.GetValue("user-agent");
    // ...

    return Task.FromResult(new ExampleResponse());
}

Více vláken pomocí metod streamování gRPC

Při implementaci metod streamování gRPC, které používají více vláken, je potřeba vzít v úvahu důležité aspekty.

Zabezpečení vláken pro čtení a zápis

IAsyncStreamReader<TMessage> a IServerStreamWriter<TMessage> každý může být používán pouze jedním vláknem najednou. U metody gRPC streamování nemůže více vláken číst nové zprávy requestStream.MoveNext() současně. A více vláken nemůže psát nové zprávy současně responseStream.WriteAsync(message) .

Bezpečným způsobem, jak povolit interakci s metodou gRPC více vláken, je použití vzoru producenta se system.Threading.Channels.

public override async Task DownloadResults(DataRequest request,
        IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
{
    var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5));

    var consumerTask = Task.Run(async () =>
    {
        // Consume messages from channel and write to response stream.
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            await responseStream.WriteAsync(message);
        }
    });

    var dataChunks = request.Value.Chunk(size: 10);

    // Write messages to channel from multiple threads.
    await Task.WhenAll(dataChunks.Select(
        async c =>
        {
            var message = new DataResult { BytesProcessed = c.Length };
            await channel.Writer.WriteAsync(message);
        }));

    // Complete writing and wait for consumer to complete.
    channel.Writer.Complete();
    await consumerTask;
}

Předchozí metoda streamování serveru gRPC:

  • Vytvoří ohraničený kanál pro vytváření a využívání DataResult zpráv.
  • Spustí úlohu pro čtení zpráv z kanálu a jejich zápis do streamu odpovědí.
  • Zapisuje zprávy do kanálu z více vláken.

Poznámka:

Obousměrné metody streamování přebírají IAsyncStreamReader<TMessage> jako IServerStreamWriter<TMessage> argumenty. Je bezpečné používat tyto typy na samostatných vláknech od sebe.

Interakce s metodou gRPC po ukončení volání

Volání gRPC skončí na serveru po ukončení metody gRPC. Následující argumenty předané metodám gRPC nejsou bezpečné použít po ukončení volání:

  • ServerCallContext
  • IAsyncStreamReader<TMessage>
  • IServerStreamWriter<TMessage>

Pokud metoda gRPC spustí úlohy na pozadí, které tyto typy používají, musí dokončit úlohy před ukončením metody gRPC. Pokračování v používání kontextu, čtenáře datových proudů nebo zapisovače datových proudů po existenci metody gRPC způsobuje chyby a nepředvídatelné chování.

V následujícím příkladu by metoda streamování serveru mohla po dokončení volání zapisovat do streamu odpovědi:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    _ = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();
}

V předchozím příkladu řešení před ukončením metody očekává úlohu zápisu:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    var writeTask = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();

    await writeTask;
}

Další materiály