Azure SDK for Java 中的异步编程

本文介绍 Azure SDK for Java 中的异步编程模型。

Azure SDK 最初仅包含用于与 Azure 服务交互的非阻塞异步 API。 通过这些 API,可以使用 Azure SDK 生成高效使用系统资源的可缩放应用程序。 但是, 用于 Java 的 Azure SDK 还包含同步客户端,以满足更广泛的受众需求,并使我们的客户端库对不熟悉异步编程的用户而言是可接近的。 请参阅 Azure SDK 设计指南中的 易接近性)。因此,Azure SDK for Java 中的所有 Java 客户端库都提供同步和异步客户端。 但是,我们建议将异步客户端用于生产系统,以最大程度地利用系统资源。

反应流

如果你在 Java Azure SDK 设计指南中查看异步服务客户端部分,你会注意到,我们的异步 API 使用反应类型,而不是使用 CompletableFuture Java 8 提供的。 为什么我们选择反应类型而不是在 JDK 中本机可用的类型?

Java 8 引入了 LambdaCompletableFuture 等功能。 这些功能提供了许多功能,但存在一些限制。

CompletableFuture 提供基于回调的非阻塞功能,而 CompletionStage 接口允许轻松组合一系列异步操作。 Lambda 使这些基于推送的 API 更具可读性。 流提供用于对数据元素集合进行处理的函数式操作。 但是,流是同步的,不能重复使用。 CompletableFuture 允许你发出单个请求,提供对回调的支持,并且需要 单个 响应。 但是,许多云服务需要能够流式传输数据 - 例如事件中心。

响应式流可以通过从源头向订阅者传输元素来帮助克服这些限制。 当订阅者从源请求数据时,源会将任意数量的结果发送回。 不需要将它们一次性全部发送。 每当源有要发送的数据时,传输就会在一段时间内发生。

在此模型中,订阅者注册事件处理程序以在数据到达时处理数据。 这些基于推送的交互通过不同的信号通知订阅者:

  • 调用 onSubscribe() 指示数据传输即将开始。
  • 调用 onError() 指示存在错误,这也会标记数据传输的末尾。
  • 调用 onComplete() 指示数据传输成功完成。

与 Java 流不同,反应式流将错误视为一流事件。 反应流有一个专用通道,用于源向订阅者传达任何错误。 此外,反应流允许订阅者协商数据传输速率,以将这些流转换为推拉模型。

反应式流规范为数据传输的发生方式提供了标准。 概括而言,规范定义了以下四个接口,并指定了如何实现这些接口的规则。

  • 发布服务器 是数据流的源。
  • 订阅者 是数据流的使用者。
  • 订阅 管理发布服务器和订阅服务器之间的数据传输状态。
  • 处理器 既是发布服务器,也是订阅服务器。

有一些已知的 Java 库提供此规范的实现,例如 RxJavaAkka StreamsVert.xProject Reactor

用于 Java 的 Azure SDK 采用 Project Reactor 提供其异步 API。 推动这一决定的主要因素是提供与 Spring Webflux 的顺利集成,后者也使用 Project Reactor。 选择 Project Reactor 而非 RxJava 的另一个因素是,Project Reactor 使用 Java 8,但当时的 RxJava 仍处于 Java 7。 Project Reactor 还提供一组丰富的运算符,这些运算符是可组合的,可用于编写声明性代码来生成数据处理管道。 Project Reactor 的另一件好事是,它具有将 Project Reactor 类型转换为其他常用实现类型的适配器。

比较同步操作和异步操作的 API

我们讨论了同步客户端和异步客户端的选项。 下表总结了使用这些选项设计的 API 的外观:

API 类型 无值 单值 多个值
标准 Java - 同步 API void T Iterable<T>
标准 Java - 异步 API CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
反应式流接口 Publisher<Void> Publisher<T> Publisher<T>
项目反应器实现反应流 Mono<Void> Mono<T> Flux<T>

为了完整起见,值得一提的是,Java 9 引入了包含四个反应式流接口的 Flow 类。 但是,此类不包含任何实现。

在 Azure SDK for Java 中使用异步 API

反应式流规范不区分发布者类型。 在反应式流规范中,发布者只需生成零个或多个数据元素。 在许多情况下,发布者最多生成一个数据元素与生成零个或多个数据元素的发布者之间有一个有用的区别。 在基于云的 API 中,此区别指示请求是返回单值响应还是集合。 Project Reactor 提供两种类型,用于区分 MonoFlux。 返回一个 Mono API 将包含最多包含一个值的响应,返回一 Flux 个 API 将包含一个包含零个或多个值的响应。

例如,假设使用 ConfigurationAsyncClient 检索使用 Azure 应用配置服务存储的配置。 (有关详细信息,请参阅 什么是 Azure 应用配置?

如果在客户端上创建 ConfigurationAsyncClient 并调用 getConfigurationSetting() ,它将返回一个 Mono指示响应包含单个值。 但是,单独调用此方法不会产生任何作用。 客户端尚未向 Azure 应用配置服务发出请求。 在此阶段,此 API 返回的 Mono<ConfigurationSetting> 只是数据处理管道的“组合”。 这意味着使用数据所需的设置已完成。 若要实际触发数据传输(即向服务发出请求并获取响应),必须订阅返回 Mono的数据。 因此,处理这些反应式流时,必须记住调用 subscribe() ,因为在执行该操作之前不会发生任何事情。

以下示例演示如何订阅 Mono 配置值并将其打印到控制台。

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

请注意,在客户端上调用 getConfigurationSetting() 后,示例代码会订阅结果,并提供三个单独的 lambda 表达式。 第一个 lambda 使用从服务接收的数据,该数据在成功响应时触发。 如果在检索配置时出错,则会触发第二个 lambda 函数。 当数据流完成时,将调用第三个 lambda,这意味着此流中不需要更多数据元素。

注释

与所有异步编程一样,创建订阅后,执行会照常进行。 如果没有任何东西保持程序处于活动状态并继续执行,它可能会在异步操作完成之前终止。 调用 subscribe() 的主要线程不会等待您对 Azure 应用配置进行网络调用并接收响应。 在生产系统中,您可能会继续处理其他内容,但在此示例中,您可以通过调用 Thread.sleep() 或使用 CountDownLatch 来添加一个小延迟,以便让异步操作有机会完成。

如以下示例所示,返回的 Flux API 也遵循类似的模式。 区别在于,提供给 subscribe() 方法的第一个回调被多次调用,每次调用针对响应中的每个数据元素。 错误或完成回调仅被调用一次,并被视为终结信号。 如果从发布者接收到其中任何一个信号,则不会调用其他回调。

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

反压力

当源以比订阅者可以处理更快的速度生成数据时会发生什么情况? 订阅者可能会被过多的数据淹没,从而导致内存不足的错误。 订阅者需要一种方式来与发布者沟通,以便在无法跟上时放慢速度。 默认情况下,当您在subscribe()上调用Flux时,如上例所示,订阅者请求一个无限的数据流,指示发布者尽快发送数据。 此行为并不总是理想的,订阅者可能必须通过"背压"来控制发布的速率。 反压允许订阅者控制数据元素的流。 订阅者将请求其可以处理的有限数量的数据元素。 订阅服务器完成处理这些元素后,订阅者可以请求更多内容。 通过使用反压,可以将数据传输的推送模式转变为推拉模式。

以下示例演示如何控制事件中心使用者接收事件的速率:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

当订阅服务器首次“连接”到发布服务器时,发布者会向订阅者发送一个 Subscription 实例,该实例管理数据传输的状态。 此 Subscription 是订阅者可以通过调用 request() 来施加反压的媒介,以指定它可以处理多少个数据元素。

例如,onNext()如果订阅服务器每次调用request(10)时请求多个数据元素,则发布者会在可用或可用时立即发送接下来的 10 个元素。 这些元素累积在订阅方的缓冲区中,由于每个 onNext() 调用将请求 10 个更多,积压持续增加,直到发布者没有要发送的数据元素或订阅者的缓冲区溢出,从而导致内存溢出错误。

取消订阅

订阅管理发布服务器和订阅服务器之间的数据传输状态。 在发布者完成向订阅服务器传输所有数据或订阅者不再有兴趣接收数据之前,订阅将处于活动状态。 有几种方法可以取消订阅,如下所示。

以下示例通过处置订阅者来取消订阅:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

以下示例通过在cancel()上调用Subscription方法取消订阅:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

结论

线程是昂贵的资源,不应该浪费它们来等待远程服务调用的响应。 随着微服务体系结构的采用不断增加,缩放和有效利用资源的需求变得至关重要。 在网络绑定操作的情况下,异步 API 是有利的。 Azure SDK for Java 为异步作提供了一组丰富的 API,以帮助最大化系统资源。 我们强烈建议你试用异步客户端。

有关最适合您特定任务的运算符的详细信息,请参阅 Reactor 3 参考指南中的我需要哪个运算符?

后续步骤

现在,你已更好地了解了各种异步编程概念,因此学习如何迭代结果是非常重要的。 有关最佳迭代策略以及分页工作原理的详细信息,请参阅 Azure SDK for Java 中的分页和迭代