Teilen über


Erstellen von gRPC-Diensten und -Methoden

Von James Newton-King

In diesem Artikel wird erläutert, wie Sie gRPC-Dienste und -Methoden in C# erstellen. Dabei werden folgende Themen behandelt:

  • Definieren von Diensten und Methoden in .proto-Dateien
  • Generieren von Code mithilfe von gRPC-C#-Tools
  • Implementieren von gRPC-Diensten und -Methoden

Erstellen neuer gRPC-Dienste

gRPC-Dienste mit C# haben den Contract-First-Ansatz von gRPC an die API-Entwicklung eingeführt. Dienste und Nachrichten werden in .proto-Dateien definiert. C#-Tools generieren dann Code aus den .proto-Dateien. Bei serverseitigen Ressourcen wird pro Dienst ein abstrakter Basistyp generiert, zusammen mit Klassen für die einzelnen Nachrichten.

Die folgende .proto-Datei:

  • Die Datei definiert einen Greeter-Dienst.
  • Der Greeter-Dienst definiert einen SayHello-Aufruf.
  • SayHello sendet eine HelloRequest-Nachricht und empfängt eine HelloReply-Nachricht.
syntax = "proto3";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

C#-Tools generieren den GreeterBase-C#-Basistyp:

public abstract partial class GreeterBase
{
    public virtual Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        throw new RpcException(new Status(StatusCode.Unimplemented, ""));
    }
}

public class HelloRequest
{
    public string Name { get; set; }
}

public class HelloReply
{
    public string Message { get; set; }
}

Standardmäßig hat der generierte GreeterBase-Basistyp keine Aufgabe. Die dazugehörige virtuelle SayHello-Methode gibt einen UNIMPLEMENTED-Fehler für Clients zurück, die den Typ aufrufen. Damit der Dienst hilfreich ist, muss eine App eine konkrete Implementierung von GreeterBase erstellen:

public class GreeterService : GreeterBase
{
    public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        return Task.FromResult(new HelloReply { Message = $"Hello {request.Name}" });
    }
}

ServerCallContext gibt den Kontext für einen serverseitigen Aufruf an.

Die Dienstimplementierung wird für die App registriert. Wenn der Dienst vom ASP.NET Core-gRPC gehostet wird, sollte er mit der MapGrpcService-Methode der Routingpipeline hinzugefügt werden.

app.MapGrpcService<GreeterService>();

Weitere Informationen finden Sie unter gRPC-Dienste mit ASP.NET Core.

Implementieren von gRPC-Methoden

Ein gRPC-Dienst kann verschiedene Methodentypen aufweisen. Das Senden und Empfangen von Nachrichten durch einen Dienst hängt vom Typ der definierten Methode ab. Es gibt die folgenden gRPC-Methodentypen:

  • Unär
  • Serverstreaming
  • Clientstreaming
  • Bidirektionales Streaming

Streamingaufrufe werden mit dem Schlüsselwort stream in der Datei .proto angegeben. stream kann für die Anforderungsnachricht, Antwortnachricht oder beides eines Aufrufs verwendet werden.

syntax = "proto3";

service ExampleService {
  // Unary
  rpc UnaryCall (ExampleRequest) returns (ExampleResponse);

  // Server streaming
  rpc StreamingFromServer (ExampleRequest) returns (stream ExampleResponse);

  // Client streaming
  rpc StreamingFromClient (stream ExampleRequest) returns (ExampleResponse);

  // Bi-directional streaming
  rpc StreamingBothWays (stream ExampleRequest) returns (stream ExampleResponse);
}

Jeder Aufruftyp weist eine andere Methodensignatur auf. Das Überschreiben generierter Methoden aus dem Diensttyp mit abstrakter Basis in einer konkreten Implementierung sorgt dafür, dass die richtigen Argumente und Rückgabetypen verwendet werden.

Unäre Methode

Eine unäre Methode weist die Anforderungsnachricht als Parameter auf und gibt die Antwort zurück. Ein unärer Aufruf ist abgeschlossen, wenn die Antwort zurückgegeben wurde.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var response = new ExampleResponse();
    return Task.FromResult(response);
}

Unäre Aufrufe ähneln Aktionen für Web-API-Controller am stärksten. Ein wichtiger Unterschied zwischen gRPC-Methoden und Aktionen ist der, dass gRPC-Methoden keine Teile einer Anforderung an andere Methodenargumente binden können. gRPC-Methoden weisen immer ein Nachrichtenargument für die eingehenden Anforderungsdaten auf. Es können weiterhin mehrere Werte an einen gRPC-Dienst gesendet werden, indem der Anforderungsnachricht Felder hinzugefügt werden:

message ExampleRequest {
    int32 pageIndex = 1;
    int32 pageSize = 2;
    bool isDescending = 3;
}

Serverstreamingmethode

Eine Serverstreamingmethode weist die Anforderungsnachricht als Parameter auf. Da mehrere Nachrichten zurück zum Aufrufer gestreamt werden können, wird responseStream.WriteAsync verwendet, um Antwortnachrichten zu senden. Der Serverstreamingaufruf ist abgeschlossen, wenn die Methode etwas zurückgibt.

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    for (var i = 0; i < 5; i++)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

Der Client hat keine Möglichkeit, weitere Nachrichten oder Daten zu senden, sobald die Serverstreamingmethode gestartet wurde. Einige Streamingmethoden sind so entworfen, dass sie fortlaufend ausgeführt werden. Bei fortlaufenden Streamingmethoden kann ein Client den Aufruf abbrechen, wenn dieser länger dauert als nötig. Bei einem Abbruch sendet der Client ein Signal an den Server, und ServerCallContext.CancellationToken wird ausgelöst. Das CancellationToken-Token sollte für den Server mit asynchronen Methoden verwendet werden, sodass Folgendes möglich ist:

  • Asynchrone Aufgaben werden zusammen mit dem Streamingaufruf abgebrochen.
  • Die Methode kann schnell beendet werden.
public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    while (!context.CancellationToken.IsCancellationRequested)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

Clientstreamingmethode

Eine Clientstreamingmethode beginnt, ohne dass die Methode eine Nachricht empfängt. Der requestStream-Parameter wird verwendet, um Nachrichten des Clients zu lesen. Ein Clientstreamingaufruf ist abgeschlossen, wenn eine Antwortnachricht zurückgegeben wird:

public override async Task<ExampleResponse> StreamingFromClient(
    IAsyncStreamReader<ExampleRequest> requestStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        // ...
    }
    return new ExampleResponse();
}

Bidirektionale Streamingmethode

Eine bidirektionale Streamingmethode beginnt, ohne dass die Methode eine Nachricht empfängt. Der requestStream-Parameter wird verwendet, um Nachrichten des Clients zu lesen. Die Methode kann Nachrichten mit responseStream.WriteAsync senden. Ein bidirektionaler Streamingaufruf ist abgeschlossen, wenn die Methode beendet wird:

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    await foreach (var message in requestStream.ReadAllAsync())
    {
        await responseStream.WriteAsync(new ExampleResponse());
    }
}

Der vorangehende Code:

  • Sendet eine Antwort pro Anforderung.
  • Ist eine einfache Verwendung des bidirektionalen Streamings.

Es ist möglich, auch komplexere Szenarios zu unterstützen, z. B. das gleichzeitige Lesen von Anforderungen und Senden von Antworten.

public override async Task StreamingBothWays(IAsyncStreamReader<ExampleRequest> requestStream,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    // Read requests in a background task.
    var readTask = Task.Run(async () =>
    {
        await foreach (var message in requestStream.ReadAllAsync())
        {
            // Process request.
        }
    });

    // Send responses until the client signals that it is complete.
    while (!readTask.IsCompleted)
    {
        await responseStream.WriteAsync(new ExampleResponse());
        await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
    }
}

Bei einer bidirektionalen Streamingmethode können sich der Client und der Dienst jederzeit gegenseitig Nachrichten senden. Die beste Implementierung einer bidirektionalen Methode variiert je nach Anforderungen.

Zugriff auf gRPC-Anforderungsheader

Eine Anforderungsnachricht ist nicht die einzige Möglichkeit für einen Client, Daten an einen gRPC-Dienst zu senden. Headerwerte stehen in einem Dienst mithilfe von ServerCallContext.RequestHeaders zur Verfügung.

public override Task<ExampleResponse> UnaryCall(ExampleRequest request,
    ServerCallContext context)
{
    var userAgent = context.RequestHeaders.GetValue("user-agent");
    // ...

    return Task.FromResult(new ExampleResponse());
}

Multithreading mit gRPC-Streamingmethoden

Es gibt wichtige Überlegungen zum Implementieren von gRPC-Streamingmethoden, die mehrere Threads verwenden.

Lese- und Schreibthreadsicherheit

IAsyncStreamReader<TMessage> und IServerStreamWriter<TMessage> können jeweils nur von einem Thread verwendet werden. Bei einer gRPC-Streamingmethode können mehrere Threads keine neuen Nachrichten mit requestStream.MoveNext() gleichzeitig lesen. Und mehrere Threads können keine neuen Nachrichten mit responseStream.WriteAsync(message) gleichzeitig schreiben.

Eine sichere Möglichkeit, mehrere Threads zur Interaktion mit einer gRPC-Methode zu ermöglichen, besteht darin, das Produzenten-Consumer-Muster mit System.Threading.Channels zu verwenden.

public override async Task DownloadResults(DataRequest request,
        IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
{
    var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5));

    var consumerTask = Task.Run(async () =>
    {
        // Consume messages from channel and write to response stream.
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            await responseStream.WriteAsync(message);
        }
    });

    var dataChunks = request.Value.Chunk(size: 10);

    // Write messages to channel from multiple threads.
    await Task.WhenAll(dataChunks.Select(
        async c =>
        {
            var message = new DataResult { BytesProcessed = c.Length };
            await channel.Writer.WriteAsync(message);
        }));

    // Complete writing and wait for consumer to complete.
    channel.Writer.Complete();
    await consumerTask;
}

Die vorherige gRPC-Serverstreamingmethode:

  • Erstellt einen gebundenen Kanal zum Produzieren und Verwenden von DataResult Nachrichten.
  • Startet eine Aufgabe, um Nachrichten aus dem Kanal zu lesen und in den Antwortdatenstrom zu schreiben.
  • Schreibt Nachrichten aus mehreren Threads in den Kanal.

Hinweis

Bidirektionale Streamingmethoden nehmen IAsyncStreamReader<TMessage> und IServerStreamWriter<TMessage> als Argumente. Diese Typen können sicher für voneinander separate Threads verwendet werden.

Interagieren mit einer gRPC-Methode nach Beendigung eines Aufrufs

Ein gRPC-Aufruf endet auf dem Server, sobald die gRPC-Methode beendet wird. Die folgenden Argumente, die an gRPC-Methoden übergeben werden, können nicht sicher verwendet werden, nachdem der Aufruf beendet wurde:

  • ServerCallContext
  • IAsyncStreamReader<TMessage>
  • IServerStreamWriter<TMessage>

Wenn eine gRPC-Methode Hintergrundaufgaben startet, die diese Typen verwenden, müssen die Aufgaben abgeschlossen werden, bevor die gRPC-Methode beendet wird. Wenn der Kontext, der Streamleser oder der Stream-Writer weiterhin verwendet wird, nachdem die gRPC-Methode beendet wurde, treten Fehler und unvorhersehbares Verhalten auf.

Im folgenden Beispiel könnte die Serverstreamingmethode nach Abschluss des Aufrufs in den Antwortdatenstrom schreiben:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    _ = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();
}

Im vorherigen Beispiel wartet die Lösung auf die Schreibaufgabe, bevor die Methode beendet wird:

public override async Task StreamingFromServer(ExampleRequest request,
    IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
    var writeTask = Task.Run(async () =>
    {
        for (var i = 0; i < 5; i++)
        {
            await responseStream.WriteAsync(new ExampleResponse());
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
    });

    await PerformLongRunningWorkAsync();

    await writeTask;
}

Zusätzliche Ressourcen