Azure SDK for Java 中的异步编程

本文介绍 Azure SDK for Java 中的异步编程模型。

Azure SDK 最初仅包含用于与 Azure 服务交互的非阻塞异步 API。 通过这些 API,可以使用 Azure SDK 生成高效使用系统资源的可缩放应用程序。 不过,Azure SDK for Java 也包含同步客户端,以面向更广泛的用户群体,并让不熟悉异步编程的用户也能更容易使用这些客户端库。 请参阅 Azure SDK 设计指南中的 易接近性)。因此,Azure SDK for Java 中的所有 Java 客户端库都提供同步和异步客户端。 但是,将异步客户端用于生产系统,以最大限度地使用系统资源。

Azure Java SDK 中的响应式流

如果你查看 Java Azure SDK 设计指南中的 Async Service Clients 部分,你会发现异步 API 使用的是响应式类型,而不是 Java 8 提供的 CompletableFuture。 为什么Azure SDK团队选择反应类型而不是 JDK 中本机可用的类型?

Java 8 引入了 LambdaCompletableFuture 等功能。 这些功能提供了许多功能,但它们有一些限制。

CompletableFuture 提供基于回调的非阻塞功能,并且 CompletionStage 该接口可以轻松编写一系列异步操作。 Lambda 使这些基于推送的 API 更具可读性。 流提供用于对数据元素集合进行处理的函数式操作。 但是,流是同步的,不能重复使用。 CompletableFuture 允许你发出单个请求,提供对回调的支持,并且需要 单个 响应。 但是,许多云服务需要能够流式传输数据 - 例如事件中心。

响应式流可以通过将元素从源持续传输到订阅者来帮助克服这些限制。 当订阅者从源请求数据时,源会将任意数量的结果发送回。 不需要将它们一次性全部发送。 每当源有要发送的数据时,传输就会在一段时间内发生。

在此模型中,订阅者注册事件处理程序以在数据到达时处理数据。 这些基于推送的交互通过不同的信号通知订阅者:

  • 调用 onSubscribe() 指示数据传输即将开始。
  • 调用 onError() 指示存在错误,这也会标记数据传输的末尾。
  • 调用 onComplete() 指示数据传输成功完成。

与 Java 流不同,反应式流将错误视为一流事件。 反应流有一个专用通道,用于源向订阅者传达任何错误。 此外,反应流允许订阅者协商数据传输速率,以将这些流转换为推拉模型。

反应式流规范为数据传输的发生方式提供了标准。 概括而言,规范定义了以下四个接口,并指定了如何实现这些接口的规则。

  • 发布服务器 是数据流的源。
  • 订阅者 是数据流的使用者。
  • 订阅 管理发布服务器和订阅服务器之间的数据传输状态。
  • 处理器 既是发布服务器,也是订阅服务器。

一些已知的Java库提供此规范的实现, 例如 RxJavaAkka StreamsVert.xProject Reactor

用于 Java 的 Azure SDK 采用 Project Reactor 提供其异步 API。 推动这一决定的主要因素是提供与 Spring Webflux 的顺利集成,后者也使用 Project Reactor。 选择 Project Reactor 而非 RxJava 的另一个因素是,Project Reactor 使用 Java 8,但当时的 RxJava 仍处于 Java 7。 Project Reactor 还提供一组丰富的运算符,这些运算符是可组合的,可用于编写声明性代码来生成数据处理管道。 Project Reactor 的另一件好事是,它具有将 Project Reactor 类型转换为其他常用实现类型的适配器。

比较同步和异步操作的 API

你了解了同步客户端以及异步客户端的相关选项。 下表总结了使用这些选项设计的 API 的外观:

API 类型 无值 单值 多个值
标准 Java - 同步 API void T Iterable<T>
标准 Java - 异步 API CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
反应式流接口 Publisher<Void> Publisher<T> Publisher<T>
项目反应器实现反应流 Mono<Void> Mono<T> Flux<T>

为了完整起见,值得一提的是,Java 9 引入了包含四个反应式流接口的 Flow 类。 但是,此类不包含任何实现。

在 Azure SDK for Java 中使用异步 API

反应式流规范不区分发布者类型。 在反应式流规范中,发布者只需生成零个或多个数据元素。 在许多情况下,发布者最多生成一个数据元素与生成零个或多个数据元素的发布者之间有一个有用的区别。 在基于云的 API 中,此区别指示请求是返回单值响应还是集合。 Project Reactor 提供两种类型,用于区分 MonoFlux。 返回一个 Mono API 将包含最多包含一个值的响应,返回一 Flux 个 API 将包含一个包含零个或多个值的响应。

例如,假设使用 ConfigurationAsyncClient 检索使用 Azure 应用配置服务存储的配置。 (有关详细信息,请参阅 什么是 Azure 应用配置?

如果在客户端上创建 ConfigurationAsyncClient 并调用 getConfigurationSetting() ,它将返回一个 Mono指示响应包含单个值。 但是,单独调用此方法不会产生任何作用。 客户端尚未向Azure 应用程序配置服务发出请求。 在此阶段,此 API 返回的 Mono<ConfigurationSetting> 只是数据处理管道的“组合”。 此体系结构意味着使用数据所需的设置已完成。 若要实际触发数据传输(即向服务发出请求并获取响应),必须订阅返回 Mono的数据。 因此,处理这些反应式流时,必须记住调用 subscribe() ,因为在执行该操作之前不会发生任何事情。

以下示例演示如何订阅 Mono 配置值并将其打印到控制台。

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

请注意,在客户端上调用 getConfigurationSetting() 后,示例代码会订阅结果,并提供三个单独的 lambda 表达式。 第一个 lambda 使用从服务接收的数据,该数据在成功响应时触发。 如果在获取配置时出错,则会触发第二个 Lambda 函数。 当数据流完成时,将调用第三个 lambda,这意味着此流中不需要更多数据元素。

注释

与所有异步编程一样,创建订阅后,执行会照常进行。 如果没有任何内容使程序保持活动并执行,则它可能会在异步操作完成之前终止。 调用 subscribe() 的主线程不会一直等待你向 Azure 应用程序配置 发出网络调用并收到响应。 在生产系统中,您可能会继续处理其他内容,但在此示例中,您可以通过调用 Thread.sleep() 或使用 CountDownLatch 来添加一个小延迟,以便让异步操作有机会完成。

如以下示例所示,返回的 Flux API 也遵循类似的模式。 区别在于,提供给 subscribe() 方法的第一个回调被多次调用,每次调用针对响应中的每个数据元素。 错误或完成回调仅被调用一次,并被视为终结信号。 如果从发布者接收到其中任何一个信号,则不会调用其他回调。

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

反压力

当源以比订阅者可以处理更快的速度生成数据时会发生什么情况? 订阅者可能会被过多的数据淹没,从而导致内存不足的错误。 订阅者需要一种方式来与发布者沟通,以便在无法跟上时放慢速度。 默认情况下,如前面的示例所示,当你在 Flux 上调用 subscribe() 时,订阅者会请求一个无界的数据流,这表示发布者应尽快发送数据。 这种行为并不总是理想的,订阅者可能不得不通过“背压”来控制发布速率。 反压允许订阅者控制数据元素的流。 订阅者请求其可处理的有限数量的数据元素。 订阅者完成处理这些元素后,可以请求更多内容。 通过使用反压,可以将数据传输的推送模式转变为推拉模式。

以下示例演示如何控制事件中心使用者接收事件的速率:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

当订阅服务器首次“连接”到发布服务器时,发布者会向订阅者发送一个 Subscription 实例,该实例管理数据传输的状态。 此 Subscription 是订阅者可以通过调用 request() 来施加反压的媒介,以指定它可以处理多少个数据元素。

例如,如果订阅者每次调用 onNext() 时请求多个数据元素(如 request(10)),那么发布者会立即发送接下来的 10 个元素(如果这些元素已可用),或者在这些元素可用时发送。 这些元素会累积在订阅者一端的缓冲区中,并且由于每次 onNext() 调用都会再请求 10 个元素,积压数据会持续增长,直到发布者没有更多数据元素可发送,或者订阅者的缓冲区发生溢出,从而导致内存溢出错误。

取消响应式流订阅

订阅管理发布服务器和订阅服务器之间的数据传输状态。 在发布者完成向订阅服务器传输所有数据或订阅者不再有兴趣接收数据之前,订阅将保持活动状态。 可以通过多种方式取消订阅,如以下示例所示。

以下示例通过处置订阅者来取消订阅:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

以下示例通过在cancel()上调用Subscription方法取消订阅:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

结论

线程是昂贵的资源,不应该浪费它们来等待远程服务调用的响应。 随着微服务架构的日益普及,实现高效扩展和资源高效利用变得至关重要。 在网络绑定操作的情况下,异步 API 是有利的。 Azure SDK for Java 为异步作提供了一组丰富的 API,以帮助最大化系统资源。 试用异步客户端。

有关最适合您特定任务的运算符的详细信息,请参阅 Reactor 3 参考指南中的我需要哪个运算符?

后续步骤

现在,你已更好地了解了各种异步编程概念,因此学习如何迭代结果是非常重要的。 有关最佳迭代策略以及分页工作原理的详细信息,请参阅 Azure SDK for Java 中的分页和迭代