Azure SDK for Java 中的异步编程
本文介绍 Azure SDK for Java 中的异步编程模型。
Azure SDK 最初只包含用于与 Azure 服务交互的非阻塞异步 API。 这些 API 让你能够使用 Azure SDK 生成可缩放的应用程序,从而高效地使用系统资源。 但是,Azure SDK for Java 还包含同步客户端,能够满足更广泛的受众需求,并使我们的客户端库对于不熟悉异步编程的用户来说更易于使用。 (请参阅 Azure SDK 设计指南中易于 访问。因此,Azure SDK for Java 中的所有 Java 客户端库都提供异步客户端和同步客户端。 但是,我们建议对生产系统使用异步客户端,以最大限度地利用系统资源。
反应式流
如果你查看 Java Azure SDK 设计指南中的异步服务客户端部分,你会注意到,我们的异步 API 使用的不是 Java 8 提供的 CompletableFuture
,而是反应式类型。 为什么我们选择反应式类型而不是 JDK 中本机可用的类型?
Java 8 引入了 Stream、Lambda 和 CompletableFuture 等特性。 这些特性提供了许多功能,但也存在一些局限性。
CompletableFuture
提供基于回调的非阻塞功能,并且 CompletionStage
接口能够轻松实现一系列异步操作的组合。 Lambda 使这些基于推送的 API 更具可读性。 Stream 提供功能式操作来处理数据元素的集合。 但是,流是同步的,不能重复使用。 CompletableFuture
使你能够发出单个请求,提供对回调的支持,并预期获得“单个”响应。 然而,许多云服务需要流式传输数据的能力,例如事件中心。
反应式流可以通过将元素从源流式传输到订阅服务器来帮助克服这些限制。 当订阅服务器从某个源请求数据时,源会发回任意数量的结果。 它不需要一次发送全部结果。 只要源包含要发送的数据,传输就会在一段时间内完成。
在此模型中,订阅服务器注册事件处理程序,以便在数据到达时处理数据。 这些基于推送的交互通过不同的信号通知订阅服务器:
onSubscribe()
调用表示数据传输即将开始。onError()
调用表示出现错误,这也标志着数据传输结束。onComplete()
调用表示数据传输成功完成。
与 Java 流不同,反应式流将错误视为第一类事件。 反应式流具有专用通道,供源向订阅服务器传送任何错误。 此外,反应式流允许订阅服务器协商数据传输速率,以将这些流转换为推送 - 请求模型。
反应式流规范提供了数据传输应该如何进行的标准。 概括而言,该规范定义了以下四个接口,并指定了如何实现这些接口的规则。
- “发布服务器”是数据流的源。
- “订阅服务器”是数据流的使用者。
- “订阅”管理发布服务器和订阅服务器之间的数据传输状态。
- “处理器”既是发布服务器又是订阅服务器。
有一些广为人知的 Java 库提供了此规范的实现,例如 RxJava、Akka Streams、Vert.x 和 Project Reactor。
Azure SDK for Java 采用 Project Reactor 提供异步 API。 推动做出此决定的主要因素是提供与 Spring Webflux 的平滑集成,后者也使用 Project Reactor。 选择 Project Reactor 而不是 RxJava 的另一个促成因素是 Project Reactor 使用 Java 8,但 RxJava 当时仍使用 Java 7。 Project Reactor 还提供了一组丰富的可组合运算符,让你能够编写用于生成数据处理管道的声明性代码。 Project Reactor 的另一个优点是它有适配器,可以将 Project Reactor 类型转换为其他常用的实现类型。
同步和异步操作的 API 比较
我们讨论了同步客户端和异步客户端的选项。 下表总结了使用这些选项设计的 API 的形态:
API 类型 | 无值 | 单值 | 多个值 |
---|---|---|---|
标准 Java - 同步 API | void |
T |
Iterable<T> |
标准 Java - 异步 API | CompletableFuture<Void> |
CompletableFuture<T> |
CompletableFuture<List<T>> |
反应式流接口 | Publisher<Void> |
Publisher<T> |
Publisher<T> |
反应式流的 Project Reactor 实现 | Mono<Void> |
Mono<T> |
Flux<T> |
为了完整起见,值得一提的是,Java 9 引入了 Flow 类,其中包含四个反应式流接口。 但是,此类不包括任何实现。
在 Azure SDK for Java 中使用异步 API
反应式流规范没有区分不同类型的发布服务器。 在反应式流规范中,发布服务器只生成零个或多个数据元素。 在许多情况下,在最多生成一个数据元素的发布服务器和生成零个或多个数据元素的发布服务器之间进行区分是很有用的。 在基于云的 API 中,这种区别表示请求是返回单值响应还是返回集合。 Project Reactor 提供了两种类型来进行这种区分:Mono 和 Flux。 返回 Mono
的 API 将包含最多一个值的响应,返回 Flux
的 API 将包含零个或多个值的响应。
例如,假设你使用 ConfigurationAsyncClient 检索使用 Azure 应用程序配置服务存储的配置。 (有关详细信息,请参阅什么是 Azure 应用程序配置?。)
如果创建 ConfigurationAsyncClient
并在客户端上调用 getConfigurationSetting()
,它将返回一个 Mono
,这表示响应包含单个值。 但是,单独调用此方法并没有任何作用。 客户端尚未向 Azure 应用程序配置服务发出请求。 在此阶段,此 API 返回的 Mono<ConfigurationSetting>
只是一个数据处理管道的“程序集”。 这意味着使用数据所需的设置已完成。 若要实际触发数据传输(即向服务发出请求并获得响应),必须订阅返回的 Mono
。 因此,在处理这些反应式流时,务必调用 subscribe()
,因为在进行调用之前不会发生任何操作。
下面的示例演示如何订阅 Mono
并将配置值打印到控制台。
ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
.connectionString("<your connection string>")
.buildAsyncClient();
asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
config -> System.out.println("Config value: " + config.getValue()),
ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
() -> System.out.println("Successfully retrieved configuration setting"));
System.out.println("Done");
注意,在客户端上调用 getConfigurationSetting()
之后,示例代码会订阅结果并提供三个单独的 lambda。 第一个 lambda 使用从服务接收的数据,它会在成功响应时触发。 如果检索配置时出错,则会触发第二个 lambda。 数据流完成,即意味着不再需要来自该流的数据元素时,会调用第三个 lambda。
注意
与所有异步编程一样,在创建订阅之后,执行照常进行。 如果没有任何情况可以使程序保持活动和执行状态,它可能会在异步操作完成之前终止。 调用 subscribe()
的主线程不会等到你对 Azure 应用程序配置进行网络调用并接收响应。 在生产系统中,你可能会继续处理其他内容,但在本例中,可以通过调用 Thread.sleep()
或使用 CountDownLatch
来添加一个小延迟,以便使异步操作有机会完成。
如下面的示例所示,返回 Flux
的 API 也遵循类似的模式。 不同之处在于,系统会针对响应中的每个数据元素,对提供给 subscribe()
方法的第一个回调进行多次调用。 错误或完成回调只调用一次,并被视为终端信号。 如果从发布服务器接收到这些信号中的任何一个,则不会调用其他回调。
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(
event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
背压
当源以订阅服务器无法处理的速度生成数据时,会发生什么情况? 订阅服务器可能会因数据量大而不堪重负,从而导致内存不足错误。 订阅服务器需要一种与发布服务器通信的方式,以便在无法跟上时放慢传输速度。 默认情况下,当你在 Flux
上调用 subscribe()
时,如上面的示例所示,订阅服务器正在请求一个未绑定数据流,指示发布服务器尽快发送数据。 这种行为并不总是可取的,订阅服务器可能必须通过“背压”来控制发布速率。 背压允许订阅服务器控制数据元素流。 订阅服务器将请求它们可以处理的有限数量的数据元素。 在订阅服务器处理完这些元素之后,订阅服务器可以请求更多元素。 通过使用背压,可以将用于数据传输的推送模型转换为推送 - 请求模型。
以下示例显示如何控制事件中心使用者接收事件的速率:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.request(1); // request another event when the subscriber is ready
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
当订阅服务器首次“连接”到发布服务器时,发布服务器会向订阅服务器提交一个 Subscription
实例,该实例管理数据传输的状态。 此 Subscription
是一个媒介,借助它,订阅服务器可以通过调用 request()
来应用背压,从而指定能够处理的数据元素数量。
如果订阅服务器每次调用 onNext()
(例如 request(10)
)来请求多个数据元素,则发布服务器将立即发送后 10 个数据元素(如果它们可用或变得可用)。 这些元素累积在订阅服务器端的缓冲区中,由于每次 onNext()
调用都会再请求 10 个元素,积压的元素会不断增加,直到发布服务器没有更多的数据元素发送,或者订阅服务器的缓冲区溢出,从而导致内存不足错误。
取消订阅
订阅管理发布服务器和订阅服务器之间的数据传输状态。 订阅将一直处于活动状态,直到发布服务器完成将所有数据传输到订阅服务器或订阅服务器不再接收数据为止。 可以通过多种方式来取消订阅,如下所示。
以下示例通过释放订阅服务器来取消订阅:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
Disposable disposable = asyncClient.receive().subscribe(
partitionEvent -> {
Long num = partitionEvent.getData().getSequenceNumber()
System.out.println("Sequence number of received event: " + num);
},
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();
以下示例通过对 Subscription
调用 cancel()
方法来取消订阅:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.cancel(); // Cancels the subscription. No further event is received.
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
结束语
线程是昂贵的资源,不应浪费在等待来自远程服务调用的响应上。 随着微服务体系结构采用率的提高,高效地缩放和使用资源的需求变得至关重要。 存在网络绑定操作时,异步 API 更为有利。 Azure SDK for Java 为异步操作提供了一组丰富的 API,可帮助你最大程度地利用系统资源。 我们强烈建议你试用异步客户端。
有关最适合你的特定任务的运算符的详细信息,请参阅 Reactor 3 参考指南中的我需要哪个运算符?。
后续步骤
现在,你已经对各种异步编程概念有了更深入的了解。接下来,学习如何对结果进行迭代也非常重要。 有关最佳迭代策略的更多信息,以及分页工作原理的详细信息,请参阅 Azure SDK for Java 中的分页和迭代。