创建 gRPC 客户端库

提示

此内容摘自电子书《为 Azure 构建云原生 .NET 应用程序》,可在 .NET 文档上获取,也可作为免费可下载的 PDF 脱机阅读。

《为 Azure 构建云原生 .NET 应用程序》电子书封面缩略图。

不需要为 gRPC 应用程序分发客户端库。 可以在组织中创建 .proto 文件的共享库,其他团队可以使用这些文件在其自己的项目中生成客户端代码。 但是,如果你具有专用 NuGet 存储库,并且许多其他团队在使用 .NET,则可以在你的服务项目中创建和发布客户端 NuGet 包。 此方法可能是共享和推广服务的一种好方式。

分发客户端库的一个优点是,可以使用有用的“便利”方法和属性来增强生成的 gRPC 和 Protobuf 类。 在客户端代码中,与在服务器中一样,所有类都声明为 partial,因此无需编辑生成的代码即可扩展它们。 此行为意味着很容易将构造函数、方法和计算属性添加到基本类型。

注意

不应使用自定义代码提供基本功能。 你不会希望将基本功能限制到使用共享库的 .NET 团队,而不将它提供给使用其他语言或平台(如 Python 或 Java)的团队。

确保尽可能多的团队可以访问 gRPC 服务。 实现此功能的最佳方法是共享 .proto 文件,以便开发人员可以生成自己的客户端。 此方法在多平台环境中尤其适用,在此环境中,不同的团队经常使用不同的编程语言和框架,或者可从外部访问你的 API。

有用的扩展

.NET 中有两个常用接口用于处理对象流:IEnumerable<T>IObservable<T>。 从 .NET Core 3.0 和 C# 8.0 开始,有用于异步处理流的 IAsyncEnumerable<T> 接口,以及供使用该接口的 await foreach 语法。 此部分提供用于将这些接口应用于 gRPC 流的可重用代码。

使用 .NET gRPC 客户端库时,IAsyncStreamReader<T> 有一个 ReadAllAsync 扩展方法,可创建 IAsyncEnumerable<T> 接口。 对于使用反应式编程的开发人员,创建 IObservable<T> 接口的等效扩展方法可能类似于以下部分中的示例。

IObservable

IObservable<T> 接口是 IEnumerable<T> 的“反应式”逆接口。 反应式方法允许流将项推送到订阅者,而不是从流中拉取项。 此行为与 gRPC 流非常相似,并且易于将接口 IObservable<T> 包装围绕 IAsyncStreamReader<T> 接口进行包装。

此代码比 IAsyncEnumerable<T> 代码长,因为 C# 没有用于处理可观察对象的内置支持。 必须手动创建实现类。 不过,它是一个泛型类,因此单个实现适用于所有类型。

namespace Grpc.Core;

public class GrpcStreamObservable<T> : IObservable<T>
{
    private readonly IAsyncStreamReader<T> _reader;
    private readonly CancellationToken _token;
    private int _used;

    public GrpcStreamObservable(IAsyncStreamReader<T> reader, CancellationToken token = default)
    {
        _reader = reader ?? throw new ArgumentNullException(nameof(reader));
        _token = token;
        _used = 0;
    }

    public IDisposable Subscribe(IObserver<T> observer) =>
        Interlocked.Exchange(ref _used, 1) == 0
            ? new GrpcStreamSubscription<T>(_reader, observer, _token)
            : throw new InvalidOperationException("Subscribe can only be called once.");

}

重要

此可观察对象实现仅允许调用 Subscribe 方法一次,因为让多个订阅者尝试从流读取会导致混沌。 有一些运算符(如 System.Reactive.Linq 中的 Replay)可实现可观察对象的缓冲和可重复共享,可以与此实现一起使用。

GrpcStreamSubscription 类处理 IAsyncStreamReader 的枚举:

public class GrpcStreamSubscription<T> : IDisposable
{
    private readonly IAsyncStreamReader<T> _reader;
    private readonly IObserver<T> _observer;

    private readonly CancellationTokenSource _tokenSource;

    private readonly Task _task;

    private bool _completed;

    public GrpcStreamSubscription(IAsyncStreamReader<T> reader, IObserver<T> observer, CancellationToken token = default)
    {
        _reader = reader ?? throw new ArgumentNullException(nameof(reader));
        _observer = observer ?? throw new ArgumentNullException(nameof(observer));

        _tokenSource = new CancellationTokenSource();
        token.Register(_tokenSource.Cancel);

        _task = Run(_tokenSource.Token);
    }

    private async Task Run(CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            try
            {
                if (!await _reader.MoveNext(token)) break;
            }
            catch (RpcException e) when (e.StatusCode == Grpc.Core.StatusCode.NotFound)
            {
                break;
            }
            catch (OperationCanceledException)
            {
                break;
            }
            catch (Exception e)
            {
                _observer.OnError(e);
                _completed = true;
                return;
            }

            _observer.OnNext(_reader.Current);
        }

        _completed = true;
        _observer.OnCompleted();
    }

    public void Dispose()
    {
        if (!_completed && !_tokenSource.IsCancellationRequested)
        {
            _tokenSource.Cancel();
        }

        _tokenSource.Dispose();
        _task.Dispose();
    }

}

现在只需一个简单的扩展方法,用于通过流读取器创建可观察对象。

namespace Grpc.Core;
public static class AsyncStreamReaderObservableExtensions
{
    public static IObservable<T> AsObservable<T>(
        this IAsyncStreamReader<T> reader,
        CancellationToken cancellationToken = default) =>
        new GrpcStreamObservable<T>(reader, cancellationToken);
}

摘要

IAsyncEnumerable<T>IObservable<T> 模型都是 .NET 中处理异步数据流的受到良好支持和记录的方法。 gRPC 流可很好地映射到这两种范例,从而提供与 .NET 的紧密集成以及反应式和异步编程样式。