你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 Java 的Azure 事件中心客户端库 - 版本 5.16.0

Azure 事件中心是一种高度可缩放的发布-订阅服务,每秒可以引入数百万个事件并将其流式传输到多个使用者。 这样,便可以处理和分析连接的设备和应用程序生成的大量数据。 事件中心收集数据后,可以使用任何实时分析提供程序或批处理/存储适配器来检索、转换和存储数据。 如果想要了解有关Azure 事件中心的详细信息,请查看:什么是事件中心

使用 Azure 事件中心客户端库可发布和使用 Azure 事件中心事件,还可以:

  • 出于商业智能和诊断目的,发出有关应用程序的遥测数据。
  • 发布有关应用程序状态的信息,相关方可能会需要观察该状态并将其视为采取措施的触发器。
  • 观察业务或其他生态系统内发生的重要操作和交互,使松散耦合的系统无需结合在一起即可相互交互。
  • 接收来自一个或多个发布者的事件,对其进行转换以更好地满足生态系统的需求,然后将转换后的事件发布到新流供使用者观察。

源代码 | API 参考文档 | 产品文档 | 样品 | 故障 排除

目录

入门

先决条件

添加包

包括 BOM 文件

请将 azure-sdk-bom 包含在项目中,以依赖于库的正式发布 (GA) 版本。 在以下代码段中,将 {bom_version_to_target} 占位符替换为版本号。 若要详细了解 BOM,请参阅 AZURE SDK BOM 自述文件

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

然后在 dependencies 节中包含直接依赖项,不带版本标记,如下所示。

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
  </dependency>
</dependencies>

包括直接依赖项

如果要依赖于 BOM 中不存在的特定版本的库,请将直接依赖项添加到项目中,如下所示。

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.16.0</version>
</dependency>

验证客户端

若要使事件中心客户端库与事件中心交互,需要了解如何与其连接和授权。

使用连接字符串创建事件中心生成者

执行此操作的最简单方法是使用连接字符串,该字符串是在创建事件中心命名空间时自动创建的。 如果不熟悉 Azure 中的共享访问策略,可能需要按照分步指南 获取事件中心连接字符串

可以使用 创建 EventHubClientBuilder异步和同步事件中心生成者和使用者客户端。 调用 build*Client() 会创建同步生成者或使用者,同时 build*AsyncClient() 创建其异步对应方。

以下代码片段创建同步事件中心生成者。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

使用以前的 Azure Active Directory Microsoft 标识平台 () 创建事件中心客户端

Azure SDK for Java 支持 Azure 标识包,因此可以轻松地从Microsoft 标识平台获取凭据。 首先,添加包:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-identity</artifactId>
    <version>1.10.1</version>
</dependency>

请求凭据的所有实现方式都可以在 包下 com.azure.identity.credential 找到。 下面的示例演示如何使用 Azure Active Directory (AAD) 应用程序客户端密码通过 Azure 事件中心 授权。

使用 DefaultAzureCredential 授权

使用 DefaultAzureCredential 进行授权最简单。 它查找要在其运行环境中使用的最佳凭据。 有关将 Azure Active Directory 授权与事件中心配合使用的详细信息,请参阅 相关文档

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

关键概念

  • 事件中心生成者是遥测数据、诊断信息、使用情况日志或其他日志数据的源,作为嵌入式设备解决方案、移动设备应用程序、在主机或其他设备上运行的游戏、某些基于客户端或服务器的业务解决方案或网站的一部分。

  • 事件中心使用者从事件中心获取此类信息并对其进行处理。 处理可能涉及聚合、复杂计算和筛选。 也可能涉及以原始或转换方式分发或存储信息。 事件中心使用者通常是具有内置分析功能(如 Azure 流分析、Apache Spark 或 Apache Storm)的强大的大规模平台基础结构部件。

  • 分区是事件中心内保留的有序事件。 Azure 事件中心通过分区的使用者模式提供消息流,在此模式下,每个使用者只读取消息流的特定子集或分区。 当较新的事件到达时,它们将添加到此序列的末尾。 分区数量在创建事件中心时指定,无法更改。

  • 使用者组是整个事件中心的视图。 使用者组使多个消费应用程序都有各自独立的事件流视图,并按自身步调、从自身立场独立读取流。 每个使用者组的分区上最多可以有 5 个并发读取者,但建议给定分区和使用者组配对只有一个活动的使用者。 每个活动读取器从其分区接收事件;如果同一分区上有多个读取器,则它们将收到重复事件。

有关更多概念和更深入的讨论,请参阅: 事件中心功能。 此外,AMQP 的概念在 OASIS 高级消息队列协议 (AMQP) 版本 1.0 中进行了充分阐述。

示例

将事件发布到事件中心

若要发布事件,需要创建异步 EventHubProducerAsyncClient 或同步 EventHubProducerClient。 每个生成者可以将事件发送到特定分区,或者允许事件中心服务决定应发布到哪些分区事件。 当发布事件需要高度可用或事件数据应在分区之间均匀分布时,建议使用自动路由。

创建事件中心生成者并发布事件

开发人员可以使用 并调用 buildProducer*Client()创建生成者EventHubClientBuilder。 指定 CreateBatchOptions.setPartitionId(String) 会将事件发送到特定分区。 如果未 partitionId 指定 ,则事件会自动路由到分区。 指定 CreateBatchOptions.setPartitionKey(String) 将告知事件中心服务对事件进行哈希处理,并将其发送到同一分区。

以下代码片段创建同步生成者并将事件发送到任何分区,从而允许事件中心服务将事件路由到可用分区。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
EventDataBatch eventDataBatch = producer.createBatch();

for (EventData eventData : allEvents) {
    if (!eventDataBatch.tryAdd(eventData)) {
        producer.send(eventDataBatch);
        eventDataBatch = producer.createBatch();

        // Try to add that event that couldn't fit before.
        if (!eventDataBatch.tryAdd(eventData)) {
            throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                + eventDataBatch.getMaxSizeInBytes());
        }
    }
}

// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
    producer.send(eventDataBatch);
}

// Clients are expected to be long-lived objects.
// Dispose of the producer to close any underlying resources when we are finished with it.
producer.close();

请注意, EventDataBatch.tryAdd(EventData) 不是线程安全的。 请确保在使用多个线程添加事件时同步方法访问。

使用分区标识符发布事件

许多事件中心操作都在特定分区范围内进行。 任何客户端都可以调用 getPartitionIds()getEventHubProperties() 来获取其事件中心实例中的分区 ID 和元数据。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

// Creating a batch with partitionId set will route all events in that batch to partition `0`.
CreateBatchOptions options = new CreateBatchOptions().setPartitionId("0");
EventDataBatch batch = producer.createBatch(options);

// Add events to batch and when you want to send the batch, send it using the producer.
producer.send(batch);

使用分区键发布事件

当一组事件未与任何特定分区关联时,可能需要请求事件中心服务在同一分区上保留不同的事件或一批事件。 这可以通过在发布事件时设置 partition key 来实现。 在下面的方案中,所有事件都与城市相关,因此发送这些事件时,分区键设置为“cities”。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
    new EventData("New York"));

SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
producer.send(events, sendOptions);

使用事件中心分区中的事件

若要使用事件,请 EventHubConsumerAsyncClient 为特定的使用者组创建 或 EventHubConsumerClient 。 此外,使用者需要指定在事件流中的哪个位置开始接收事件。

通过 EventHubConsumerAsyncClient 使用事件

在以下代码片段中,我们将创建一个异步使用者,该使用者从 partitionId 接收事件,并仅侦听推送到分区的最新事件。 开发人员可以通过使用另一个分区 ID 调用 receiveFromPartition(String, EventPosition) 来开始使用同一EventHubConsumerAsyncClient个 从多个分区接收事件。

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        new DefaultAzureCredentialBuilder().build())
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildAsyncConsumerClient();

// Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
String partitionId = "0";
EventPosition startingPosition = EventPosition.latest();

// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
//
// NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
// operation.  If the program ends after this, or the class is immediately disposed, no events will be
// received.
Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
    .subscribe(partitionEvent -> {
        PartitionContext partitionContext = partitionEvent.getPartitionContext();
        EventData event = partitionEvent.getData();

        System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
        System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
    }, error -> {
        // This is a terminal signal.  No more events will be received from the same Flux object.
        System.err.print("An error occurred:" + error);
    }, () -> {
        // This is a terminal signal.  No more events will be received from the same Flux object.
        System.out.print("Stream has ended.");
    });

通过 EventHubConsumerClient 使用事件

开发人员可以使用 创建一个同步使用者,该使用者以 EventHubConsumerClient批处理方式返回事件。 在下面的代码片段中,创建了一个使用者,该使用者从分区的事件流的开头开始读取事件。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubConsumerClient consumer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildConsumerClient();

Instant twelveHoursAgo = Instant.now().minus(Duration.ofHours(12));
EventPosition startingPosition = EventPosition.fromEnqueuedTime(twelveHoursAgo);
String partitionId = "0";

// Reads events from partition '0' and returns the first 100 received or until the 30 seconds has elapsed.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 100,
    startingPosition, Duration.ofSeconds(30));

Long lastSequenceNumber = -1L;
for (PartitionEvent partitionEvent : events) {
    // For each event, perform some sort of processing.
    System.out.print("Event received: " + partitionEvent.getData().getSequenceNumber());
    lastSequenceNumber = partitionEvent.getData().getSequenceNumber();
}

// Figure out what the next EventPosition to receive from is based on last event we processed in the stream.
// If lastSequenceNumber is -1L, then we didn't see any events the first time we fetched events from the
// partition.
if (lastSequenceNumber != -1L) {
    EventPosition nextPosition = EventPosition.fromSequenceNumber(lastSequenceNumber, false);

    // Gets the next set of events from partition '0' to consume and process.
    IterableStream<PartitionEvent> nextEvents = consumer.receiveFromPartition(partitionId, 100,
        nextPosition, Duration.ofSeconds(30));
}

使用 EventProcessorClient 使用事件

若要使用事件中心的所有分区的事件,可以为特定的使用者组创建 EventProcessorClient

EventProcessorClient会将事件的处理委托给你提供的回调函数,使你能够专注于提供值所需的逻辑,而处理器负责管理基础使用者操作。

在本示例中,我们将重点介绍如何生成 EventProcessorClient、使用 SampleCheckpointStore 示例中提供的 ,以及处理从事件中心接收的事件并写入控制台的回调函数。 对于生产应用程序,建议将持久存储(如 Checkpoint Store)与 Azure 存储 Blob 配合使用

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
    .consumerGroup("<< CONSUMER GROUP NAME >>")
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .checkpointStore(new SampleCheckpointStore())
    .processEvent(eventContext -> {
        System.out.printf("Partition id = %s and sequence number of event = %s%n",
            eventContext.getPartitionContext().getPartitionId(),
            eventContext.getEventData().getSequenceNumber());
    })
    .processError(errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    })
    .buildEventProcessorClient();

疑难解答

请参阅 TROUBLESHOOTING.md

后续步骤

除了讨论的内容之外,Azure 事件中心客户端库还支持许多其他方案,以利用 Azure 事件中心 服务的完整功能集。 若要浏览其中一些方案,检查示例自述文件

贡献

如果你想成为此项目的活动参与者,请参阅我们的贡献指南了解详细信息。

曝光数