你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure 事件中心 .NET 客户端库 - 版本 5.9.3

Azure 事件中心是一种高度可缩放的发布-订阅服务,每秒可引入数百万个事件并将其流式传输到多个使用者。 这样,便可以处理和分析连接的设备和应用程序生成的大量数据。 事件中心收集数据后,可以使用任何实时分析提供程序或批处理/存储适配器来检索、转换和存储数据。 如果想要详细了解Azure 事件中心,可以查看:什么是事件中心

使用 Azure 事件中心客户端库可发布和使用 Azure 事件中心事件,还可以:

  • 出于商业智能和诊断目的,发出有关应用程序的遥测数据。

  • 发布有关应用程序状态的信息,相关方可能会需要观察该状态并将其视为采取措施的触发器。

  • 观察业务或其他生态系统内发生的重要操作和交互,使松散耦合的系统无需结合在一起即可相互交互。

  • 接收来自一个或多个发布者的事件,对其进行转换以更好地满足生态系统的需求,然后将转换后的事件发布到新流供使用者观察。

源代码 | 包 (NuGet) | API 参考文档 | 产品文档 | 迁移指南 | 故障排除指南

入门

先决条件

  • Azure 订阅:若要使用 Azure 服务(包括 Azure 事件中心),需要订阅。 如果没有现有的 Azure 帐户,可以在创建帐户时注册免费试用版或使用 Visual Studio 订阅权益。

  • 包含事件中心的事件中心命名空间:若要与Azure 事件中心交互,还需要提供命名空间和事件中心。 如果不熟悉如何创建 Azure 资源,可能需要按照使用 Azure 门户创建事件中心的分步指南进行操作。 还可以在此处找到有关使用 Azure CLI、Azure PowerShell或 Azure 资源管理器 (ARM) 模板创建事件中心的详细说明。

  • C# 8.0:Azure 事件中心客户端库使用 C# 8.0 中引入的新功能。 为了利用 C# 8.0 语法,建议使用 .NET Core SDK 3.0 或更高版本进行编译, 语言版本latest

    希望充分利用 C# 8.0 语法的 Visual Studio 用户需要使用 Visual Studio 2019 或更高版本。 可在此处下载 Visual Studio 2019(包括免费的 Community Edition)。 Visual Studio 2017 的用户可以通过使用 Microsoft.Net.Compilers NuGet 包 和设置语言版本来利用 C# 8 语法,但编辑体验可能并不理想。

    你仍然可以将库与以前的 C# 语言版本一起使用,但需要手动管理异步可枚举和异步一次性成员,而不是受益于新语法。 仍可能面向 .NET Core SDK 支持的任何框架版本,包括早期版本的 .NET Core 或 .NET Framework。 有关详细信息,请参阅: 如何指定目标框架
    重要说明: 为了在不修改的情况下生成或运行 示例示例 ,必须使用 C# 11.0。 如果决定针对其他语言版本调整示例,仍可以运行这些示例。 示例中提供了执行此操作的示例: 早期语言版本

若要在 Azure 中快速创建一组基本的事件中心资源并接收它们的连接字符串,可以通过单击以下方法部署示例模板:

部署到 Azure

安装包

使用 NuGet 安装适用于 .NET 的 Azure 事件中心 客户端库:

dotnet add package Azure.Messaging.EventHubs

验证客户端

若要使事件中心客户端库与事件中心交互,需要了解如何与之连接和授权。 执行此操作的最简单方法是使用连接字符串,该字符串是在创建事件中心命名空间时自动创建的。 如果不熟悉将连接字符串与事件中心配合使用,可能需要按照分步指南 获取事件中心连接字符串

有了连接字符串后,可以使用它创建任何事件中心客户端类型:

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

// It is recommended that you cache the Event Hubs clients for the lifetime of your
// application, closing or disposing when application ends.  This example disposes
// after the immediate scope for simplicity.

await using var producer = new EventHubProducerClient(connectionString, eventHubName);

有关使用凭据类型对事件中心客户端进行身份验证的示例,请参阅 使用 Azure Active Directory (AAD) 主体标识和共享访问凭据 示例。

有关为 ASP.NET Core应用程序对事件中心客户端进行身份验证的示例,请参阅注册 ASP.NET Core依赖项注入

关键概念

  • 事件中心客户端是与事件中心客户端库交互的开发人员的主要接口。 存在几个不同的事件中心客户端,每个客户端都专用于事件中心的特定用途,例如发布或使用事件。

  • 事件中心生成者是这样一种客户端:它充当遥测数据、诊断信息、使用日志或其他日志数据的源,同时作为嵌入式设备解决方案、移动设备应用程序、控制台或其他设备上运行的游戏标题、某些基于客户端或基于服务器的业务解决方案或网站的一部分。

  • 事件中心使用者是这样一种客户端:它从事件中心读取信息,并允许对其进行处理。 处理过程可能涉及聚合、复杂的计算和筛选, 也可能涉及以原始或转换方式分发或存储信息。 事件中心使用者通常是具有内置分析功能(如 Azure 流分析、Apache Spark 或 Apache Storm)的强大的大规模平台基础结构部件。

  • 分区是事件中心内保留的有序事件。 分区是一种与事件使用者所需的并行度关联的数据组织方式。 Azure 事件中心通过分区的使用者模式提供消息流,在此模式下,每个使用者只读取消息流的特定子集或分区。 当较新的事件到达时,它们将添加到此序列的末尾。 分区数量在创建事件中心时指定,无法更改。

  • 使用者组是整个事件中心的视图。 使用者组使多个消费应用程序都有各自独立的事件流视图,并按自身步调、从自身立场独立读取流。 每个使用者组的分区上最多可以有 5 个并发读取者,但建议给定分区和使用者组配对只有一个活动的使用者。 每个活动的读取者都会从其分区接收所有事件;如果同一个分区有多个读取者,他们将接收重复的事件。

有关更多概念和更深入的讨论,请参阅: 事件中心功能

客户端生存期

每个事件中心客户端类型都可以安全地缓存,并在应用程序的生存期内作为单一实例使用,这是在定期发布或读取事件时最佳做法。 客户端负责高效管理网络、CPU 和内存使用,努力在非活动期间保持低使用率。 CloseAsync若要确保正确清理网络资源和其他非托管对象,需要在客户端上调用 或 DisposeAsync

线程安全

我们保证所有客户端实例方法都是线程安全的,并且相互独立, (准则) 。 这可确保重用客户端实例的建议始终是安全的,即使跨线程也是如此。

数据模型类型(如 EventDataEventDataBatch )不是线程安全的。 它们不应跨线程共享,也不应与客户端方法同时使用。

其他概念

客户端选项 | 处理失败 | 诊断 | 嘲笑

示例

检查事件中心

许多事件中心操作都在特定分区范围内进行。 由于分区由事件中心拥有,因此在创建时将分配名称。 要了解哪些分区可用,可以使用一个事件中心客户端查询事件中心。 为了举例说明,在这些示例中演示了 EventHubProducerClient,但概念和形式在客户端之间是通用的。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

// It is recommended that you cache the Event Hubs clients for the lifetime of your
// application, closing or disposing when application ends.  This example disposes
// after the immediate scope for simplicity.

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

将事件发布到事件中心

若要发布事件,将需要创建 EventHubProducerClient。 生成者分批发布事件,并可能请求特定分区,或允许事件中心服务决定应将事件发布到哪些分区。 当发布事件需要高度可用或事件数据应在分区之间均匀分布时,建议使用自动路由。 我们的示例将利用自动路由。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

// It is recommended that you cache the Event Hubs clients for the lifetime of your
// application, closing or disposing when application ends.  This example disposes
// after the immediate scope for simplicity.

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();

    if ((!eventBatch.TryAdd(new EventData("First"))) ||
        (!eventBatch.TryAdd(new EventData("Second"))))
    {
       throw new ApplicationException("Not all events could be added to the batch!");
    }

    await producer.SendAsync(eventBatch);
}

从事件中心读取事件

若要从事件中心读取事件,将需要为给定的使用者组创建 EventHubConsumerClient。 创建事件中心时,它会提供一个默认的使用者组,可用来开始探索事件中心。 在我们的示例中,我们将着重使用枚举器读取已发布到事件中心的所有事件。

注意: 请务必注意,这种使用方法旨在改善探索事件中心客户端库和原型制作的体验。 建议不要用于生产情形。 对于生产用途,建议使用事件处理器客户端,因为它可以提供更强大和更高效的体验。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

// It is recommended that you cache the Event Hubs clients for the lifetime of your
// application, closing or disposing when application ends.  This example disposes
// after the immediate scope for simplicity.

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

从事件中心分区读取事件

若要读取事件中心分区的事件,需要为给定的使用者组创建 EventHubConsumerClient 。 创建事件中心时,它会提供一个默认的使用者组,可用来开始探索事件中心。 若要从特定分区读取数据,使用者还需要指定在事件流中的哪个位置开始接收事件;在我们的示例中,我们将重点读取事件中心第一个分区的所有已发布事件。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

// It is recommended that you cache the Event Hubs clients for the lifetime of your
// application, closing or disposing when application ends.  This example disposes
// after the immediate scope for simplicity.

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

使用事件处理器客户端处理事件

对于大多数生产方案,建议使用 事件处理程序客户端 来读取和处理事件。 处理器旨在提供可靠的体验,以便以高性能和容错的方式跨事件中心的所有分区处理事件,同时提供一种保持其状态的方法。 事件处理器客户端还能够在给定事件中心的使用者组上下文中协作工作,当实例对组可用或不可用时,它们将自动管理工作的分布和均衡。

由于 EventProcessorClient 依赖于 Azure 存储 blob 以保持其状态,因此需要为处理器提供 BlobContainerClient,并且该客户端已针对应使用的存储帐户和容器进行了配置。

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}

有关详细信息,请参阅事件处理程序客户端 自述文件 和随附 的示例

将 Active Directory 主体与事件中心客户端配合使用

Azure 标识库提供 Azure Active Directory (AAD) 身份验证支持,可用于 Azure 客户端库,包括事件中心。

若要使用 Active Directory 主体,请在创建事件中心客户端时指定库中的一个可用凭据 Azure.Identity 。 此外,提供完全限定的事件中心命名空间和所需事件中心的名称,而不是事件中心连接字符串。 为了举例说明,在这些示例中演示了 EventHubProducerClient,但概念和形式在客户端之间是通用的。

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var credential = new DefaultAzureCredential();

// It is recommended that you cache the Event Hubs clients for the lifetime of your
// application, closing or disposing when application ends.  This example disposes
// after the immediate scope for simplicity.

await using (var producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();

    if ((!eventBatch.TryAdd(new EventData("First"))) ||
        (!eventBatch.TryAdd(new EventData("Second"))))
    {
       throw new ApplicationException("Not all events could be added to the batch!");
    }

    await producer.SendAsync(eventBatch);
}

使用 Azure Active Directory 时,必须为主体分配一个角色,该角色允许访问事件中心,例如 Azure Event Hubs Data Owner 角色。 有关将 Azure Active Directory 授权与事件中心配合使用的详细信息,请参阅 相关文档

注册 ASP.NET Core依赖项注入

若要将其中一个事件中心客户端作为依赖项注入到 ASP.NET Core 应用程序中,请安装适用于 ASP.NET Core 包的 Azure 客户端库集成。

dotnet add package Microsoft.Extensions.Azure

安装后,在 方法中 Startup.ConfigureServices 注册所需的事件中心客户端类型:

public void ConfigureServices(IServiceCollection services)
{
    services.AddAzureClients(builder =>
    {
        builder.AddEventHubProducerClient(Configuration.GetConnectionString("EventHubs"));
    });
  
    services.AddControllers();
}

若要使用上述代码,请将此添加到应用程序的配置中:

{
  "ConnectionStrings": {
    "EventHubs": "<connection_string>"
  }
}

对于喜欢为其客户端使用共享 Azure.Identity 凭据的应用程序,注册看起来略有不同:

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";

public void ConfigureServices(IServiceCollection services)
{
    services.AddAzureClients(builder =>
    {
        // This will register the EventHubProducerClient using the default credential.
        builder.AddEventHubProducerClientWithNamespace(fullyQualifiedNamespace);

        // By default, DefaultAzureCredential is used, which is likely desired for most
        // scenarios. If you need to restrict to a specific credential instance, you could
        // register that instance as the default credential instead.
        builder.UseCredential(new ManagedIdentityCredential());
    });
  
    services.AddControllers();
}

有关详细信息,请参阅 使用 Azure SDK for .NET 进行依赖关系注入

疑难解答

有关详细的故障排除信息,请参阅 事件中心故障排除指南

日志记录和诊断

完全检测事件中心客户端库,以便使用 .NET EventSource 发出信息来记录不同详细级别的信息。 日志记录针对每个操作执行,并遵循标记操作的起点、操作完成和遇到的任何异常的模式。 可能提供见解的其他信息也会记录在关联操作的上下文中。

可以通过选择加入名为“Azure-Messaging-EventHubs”的源或选择加入具有“AzureEventSource”特征的所有源来获取 EventListener 事件中心客户端日志。 为了更轻松地从 Azure 客户端库捕获日志, Azure.Core 事件中心使用的库提供了 AzureEventSourceListener。 有关详细信息,请参阅 使用 AzureEventSourceListener 捕获事件中心日志

还可以使用 Application Insights 或 OpenTelemetry 检测事件中心客户端库进行分布式跟踪。 有关详细信息,请参阅 Azure.Core 诊断示例

后续步骤

除了讨论的介绍性方案之外,Azure 事件中心客户端库还提供对其他方案的支持,以帮助利用Azure 事件中心服务的完整功能集。 为了帮助探索其中一些方案,事件中心客户端库提供了一个示例项目,作为常见方案的插图。 有关详细信息,请参阅示例 自述文件

贡献

本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 https://cla.microsoft.com

提交拉取请求时,CLA 机器人将自动确定你是否需要提供 CLA,并相应地修饰 PR(例如标签、注释)。 直接按机器人提供的说明操作。 只需使用 CLA 对所有存储库执行一次这样的操作。

此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。

有关详细信息,请参阅 我们的贡献指南

曝光数