Поделиться через


Создание служб и методов gRPC

Автор: Джеймс Ньютон-Кинг (James Newton-King)

В этом документе объясняется, как создать службы и методы gRPC в C#. Разделы включают:

  • Определение служб и методов в .proto файлах.
  • Код, созданный с помощью инструментов gRPC в C#.
  • Реализация служб и методов gRPC.

Создание новых служб gRPC

Службы gRPC на языке C# предоставляют подход к разработке API на основе контракта gRPC. Службы и сообщения определены в .proto файлах. Затем средство C# создает код из .proto файлов. Для ресурсов на стороне сервера абстрактный базовый тип создается для каждой службы вместе с классами для любых сообщений.

Следующий файл .proto:

  • Определяет службу Greeter.
  • Служба Greeter определяет вызов SayHello.
  • SayHello отправляет сообщение HelloRequest и получает сообщение HelloReply.
syntax = "proto3";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

Средства C# создают базовый тип C# GreeterBase.

public abstract partial class GreeterBase
{
    public virtual Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        throw new RpcException(new Status(StatusCode.Unimplemented, ""));
    }
}

public class HelloRequest
{
    public string Name { get; set; }
}

public class HelloReply
{
    public string Message { get; set; }
}

По умолчанию созданный GreeterBase не выполняет никаких действий. Его виртуальный метод SayHello возвращает ошибку UNIMPLEMENTED для всех клиентов, которые ее вызывают. Чтобы службу можно было использовать, приложение должно создать конкретную реализацию GreeterBase.

public class GreeterService : GreeterBase
{
    public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        return Task.FromResult(new HelloReply { Message = $"Hello {request.Name}" });
    }
}

ServerCallContext предоставляет контекст для вызова на стороне сервера.

Реализация службы регистрируется в приложении. Если служба размещена ASP.NET Core gRPC, ее следует добавить в конвейер маршрутизации с помощью метода MapGrpcService.

app.MapGrpcService<GreeterService>();

Дополнительные сведения см. в статье Службы gRPC в ASP.NET Core.

Реализация методов gRPC

Служба gRPC может иметь различные типы методов. Способ отправки и получения сообщений службой зависит от типа определенного метода. Типы методов gRPC:

  • Унарный
  • Потоковая передача сервера
  • Потоковая передача клиента
  • Двунаправленная потоковая передача

Потоковые вызовы указываются с помощью ключевого слова stream в файле .proto. stream можно поместить в сообщение запроса вызова, ответное сообщение или и в то и в другое.

syntax = "proto3";

service ExampleService {
  // Unary
  rpc UnaryCall (ExampleRequest) returns (ExampleResponse);

  // Server streaming
  rpc StreamingFromServer (ExampleRequest) returns (stream ExampleResponse);

  // Client streaming
  rpc StreamingFromClient (stream ExampleRequest) returns (ExampleResponse);

  // Bi-directional streaming
  rpc StreamingBothWays (stream ExampleRequest) returns (stream ExampleResponse);
}

Каждый тип вызова имеет разную сигнатуру метода. Переопределение созданных методов из абстрактного базового типа службы в конкретной реализации гарантирует, что используются правильные аргументы и возвращаемый тип.

Унарный метод

Унарный метод использует сообщение запроса в качестве параметра и возвращает ответ. При возвращении ответа будет выполнен унарный вызов.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var response = new ExampleResponse();
    return Task.FromResult(response);
}

Унарные вызовы больше всего похожи на действия на контроллерах веб-API. Одно важное отличие от gRPC-методов заключается в том, что методы gRPC не могут привязывать части запроса к разным аргументам метода. Методы gRPC всегда имеют один аргумент message для данных входящего запроса. Несколько значений по-прежнему можно отправить в службу gRPC, добавив поля в сообщение запроса:

message ExampleRequest {
    int32 pageIndex = 1;
    int32 pageSize = 2;
    bool isDescending = 3;
}

Метод потоковой передачи сервера

Метод потоковой передачи сервера использует сообщение запроса в виде параметра. Так как несколько сообщений можно передать обратно вызывающему объекту, для отправки ответных сообщений используется responseStream.WriteAsync. Вызов потоковой передачи сервера завершается, когда метод возвращает результат.

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    for (var i = 0; i < 5; i++)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

Клиент не может отправить дополнительные сообщения или данные после запуска метода потоковой передачи сервера. Некоторые методы потоковой передачи предназначены для непрерывного выполнения. Для методов непрерывной потоковой передачи клиент может отменить вызов, если он больше не нужен. При отмене клиент отправляет сигнал на сервер и вызывается ServerCallContext.CancellationToken. Токен CancellationToken должен использоваться на сервере с асинхронными методами, чтобы:

  • любая асинхронная работа отменялась вместе с вызовом потоковой передачи;
  • метод быстро завершал свою работу.
public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    while (!context.CancellationToken.IsCancellationRequested)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

Метод потоковой передачи клиента

Метод потоковой передачи клиента запускается без метода, получающего сообщение. Параметр requestStream используется для чтения сообщений от клиента. Метод потоковой передачи клиента завершается при возвращении ответного сообщения.

public override async Task<ExampleResponse> StreamingFromClient(
    IAsyncStreamReader<ExampleRequest> requestStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        // ...
    }
    return new ExampleResponse();
}

Метод двунаправленной потоковой передачи

Метод двунаправленной потоковой передачи запускается без метода, получающего сообщение. Параметр requestStream используется для чтения сообщений от клиента. Клиент может выбрать отправку сообщений с помощью responseStream.WriteAsync. Вызов двунаправленной потоковой передачи завершается, когда метод возвращает:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        await responseStream.WriteAsync(new ExampleResponse());
    }
}

Предыдущий код:

  • Отправляет ответ для каждого запроса.
  • Является базовым сценарием использования двунаправленной потоковой передачи.

Существует возможность поддержки более сложных сценариев, таких как чтение запросов и отправка ответов одновременно.

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    // Read requests in a background task.
    var readTask = Task.Run(async () =>
    {
        await foreach (var message in requestStream.ReadAllAsync())
        {
            // Process request.
        }
    });

    // Send responses until the client signals that it is complete.
    while (!readTask.IsCompleted)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

В методе двунаправленной потоковой передачи клиент и служба могут обмениваться сообщениями в любое время. Оптимальная реализация двунаправленного метода зависит от требований.

Доступ к заголовкам запроса gRPC

Сообщение запроса — не единственный способ отправки данных клиентом в службу gRPC. Значения заголовков доступны в службе с помощью ServerCallContext.RequestHeaders.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var userAgent = context.RequestHeaders.GetValue("user-agent");
    // ...

    return Task.FromResult(new ExampleResponse());
}

Многопотоковое использование с помощью методов gRPC потоковой передачи

Реализации методов gRPC потоковой передачи с несколькими потоками имеет важные аспекты.

Безопасность потоков чтения и записи

Несколько потоков не могут использовать одновременно ни IAsyncStreamReader<TMessage>, ни IServerStreamWriter<TMessage>. При выполнении метода gRPC потоковой передачи несколько потоков не могут одновременно считать новые сообщения с использованием requestStream.MoveNext(). Несколько потоков также не могут одновременно записывать новые сообщения с помощью responseStream.WriteAsync(message).

Безопасный способ обеспечить взаимодействие нескольких потоков с использованием метода gRPC — применить шаблон "производитель — потребитель" с System.Threading.Channels.

public override async Task DownloadResults(DataRequest request,
        IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
{
    var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5));

    var consumerTask = Task.Run(async () =>
    {
        // Consume messages from channel and write to response stream.
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            await responseStream.WriteAsync(message);
        }
    });

    var dataChunks = request.Value.Chunk(size: 10);

    // Write messages to channel from multiple threads.
    await Task.WhenAll(dataChunks.Select(
        async c =>
        {
            var message = new DataResult { BytesProcessed = c.Length };
            await channel.Writer.WriteAsync(message);
        }));

    // Complete writing and wait for consumer to complete.
    channel.Writer.Complete();
    await consumerTask;
}

Предыдущий метод gRPC потоковой передачи сервера:

  • создает ограниченный канал для создания и использования сообщений DataResult;
  • запускает задачу чтения сообщений из канала и их записи в поток ответа;
  • записывает сообщения в канал из нескольких потоков.

Примечание.

Двунаправленные методы потоковой передачи принимают IAsyncStreamReader<TMessage> и IServerStreamWriter<TMessage> в качестве аргументов. Эти типы можно использовать в потоках, отдельных друг от друга.

Взаимодействие с методом gRPC после завершения вызова

Вызов gRPC завершается на сервере после завершения работы метода gRPC. Следующие аргументы, передаваемые методам gRPC, небезопасно использовать после завершения вызова:

  • ServerCallContext
  • IAsyncStreamReader<TMessage>
  • IServerStreamWriter<TMessage>

Если метод gRPC запускает фоновые задачи, использующие эти типы, он должен завершить задачи, прежде чем метод gRPC завершит работу. Если существует метод gRPC, средство чтения или записи потоков создает ошибки и непредсказуемое поведение, продолжая использовать контекст.

В следующем примере метод потоковой передачи сервера может выполнять запись в поток ответа после завершения вызова:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    _ = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();
}

В предыдущем примере решение ожидает задачу записи перед выходом из метода:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    var writeTask = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();

    await writeTask;
}

Дополнительные ресурсы