你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于 Java 的 Azure 事件中心 检查点存储客户端库 - 版本 1.17.0
使用存储 Blob
Azure 事件中心检查点存储可用于在处理来自 Azure 事件中心 的事件时存储检查点。
此包使用存储 Blob 作为永久性存储来维护检查点和分区所有权信息。
BlobCheckpointStore
此包中提供的 可以插入 到 EventProcessor
。
入门
先决条件
- Java 开发工具包 (JDK) 8 或更高版本。
- Maven
- Microsoft Azure 订阅
- 可以在以下位置创建免费帐户: https://azure.microsoft.com
- Azure 事件中心 实例
- Azure 存储帐户
添加包
包括 BOM 文件
请将 azure-sdk-bom 包含在项目中,以依赖于库的正式发布 (GA) 版本。 在以下代码段中,将 {bom_version_to_target} 占位符替换为版本号。 若要详细了解 BOM,请参阅 AZURE SDK BOM 自述文件。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-sdk-bom</artifactId>
<version>{bom_version_to_target}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
然后在依赖项部分中包括直接依赖项,不带版本标记,如下所示。
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
</dependency>
</dependencies>
包括直接依赖项
如果要依赖于 BOM 中不存在的特定库版本,请将直接依赖项添加到项目,如下所示。
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.17.0</version>
</dependency>
对存储容器客户端进行身份验证
若要创建 的 BlobCheckpointStore
实例, ContainerAsyncClient
应首先使用具有写入访问权限和连接字符串的相应 SAS 令牌创建 。 为此,需要帐户 SAS (存储帐户字符串) 共享访问签名。 有关详细信息,请参阅 SAS 令牌。
关键概念
检查点
检查点是读取者在分区事件序列中标记或提交其位置时执行的过程。 检查点操作由使用者负责,并在使用者组中的每个分区上进行。 这种责任意味着,对于每个使用者组而言,每个分区读取者必须跟踪它在事件流中的当前位置,当它认为数据流已完成时,可以通知服务。 如果读取者与分区断开连接,当它重新连接时,将开始读取前面由该使用者组中该分区的最后一个读取者提交的检查点。 当读取者建立连接时,它会将此偏移量传递给事件中心,以指定要从其开始读取数据的位置。 这样,用户便可以使用检查点将事件标记为已由下游应用程序“完成”,并且在不同计算机上运行的读取者之间发生故障转移时,还可以提供弹性。 若要返回到较旧的数据,可以在此检查点过程中指定较低的偏移量。 借助此机制,检查点可以实现故障转移复原和事件流重放。
&偏移序列号
两个偏移 & 序列号都表示事件在分区中的位置。 可以将它们视为客户端游标。 偏移量是事件的字节编号。 偏移量/序列号使事件使用者 (读取者) 可以指定事件流中要从中开始读取事件的点。 可以指定时间戳,以便接收仅在给定时间戳之后排队的事件。 使用者负责在事件中心服务外部存储自己的偏移值。 在分区中,每个事件都包含偏移量、序列号和排队时的时间戳。
示例
使用 SAS 令牌创建存储容器的实例
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
.containerName("<CONTAINER_NAME>")
.sasToken("<SAS_TOKEN>")
.buildAsyncClient();
使用事件处理程序客户端使用事件
若要使用事件中心的所有分区的事件,需要为特定使用者组创建 EventProcessorClient
。 创建事件中心时,它提供可用于入门的默认使用者组。
会将 EventProcessorClient
事件的处理委托给你提供的回调函数,使你能够专注于提供值所需的逻辑,而处理器负责管理基础使用者操作。
在我们的示例中,我们将重点介绍如何生成 EventProcessor
、使用 BlobCheckpointStore
和简单的回调函数来处理从事件中心接收的事件、写入控制台并在每个事件后更新 Blob 存储中的检查点。
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
.containerName("<CONTAINER_NAME>")
.sasToken("<SAS_TOKEN>")
.buildAsyncClient();
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.connectionString("<< EVENT HUB CONNECTION STRING >>")
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
})
.buildEventProcessorClient();
// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();
// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);
// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();
故障排除
启用客户端日志记录
Azure SDK for Java 提供了一致的日志记录故事,可帮助排查应用程序错误并加快解决。 生成的日志会在到达终端状态之前捕获应用程序的流,以帮助查找根本问题。 有关启用 日志记录 的指南,请查看日志记录 Wiki。
默认 SSL 库
默认情况下,所有客户端库均使用 Tomcat 原生 Boring SSL 库来为 SSL 操作启用原生级别性能。 Boring SSL 库是一个 uber jar,其中包含适用于 Linux/macOS/Windows 的原生库。与 JDK 内的默认 SSL 实现相比,它提供更好的性能。 有关详细信息(包括如何减小依赖项大小),请参阅 Wiki 的性能优化部分。
后续步骤
首先浏览 此处的示例。
贡献
如果你想成为此项目的活跃参与者,请参阅我们的贡献指南了解详细信息。