你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 .NET 的 Azure 事件中心 事件处理程序客户端库 - 版本 5.9.3

Azure 事件中心是一种高度可缩放的发布-订阅服务,每秒可引入数百万个事件并将其流式传输到多个使用者。 这样,便可以处理和分析连接的设备和应用程序生成的大量数据。 事件中心收集数据后,可以使用任何实时分析提供程序或批处理/存储适配器来检索、转换和存储数据。 如果想要详细了解Azure 事件中心,可以查看:什么是事件中心

事件处理程序客户端库是Azure 事件中心客户端库的配套,它提供一个独立的客户端,用于以适用于大多数生产方案的强大、持久且可缩放的方式使用事件。 事件处理程序是使用 Azure 存储 Blob 生成的有意见的实现,建议用于:

  • 大规模读取和处理事件中心的所有分区的事件,可复原暂时性故障和间歇性网络问题。

  • 协作处理事件,其中多个处理器在使用者组的上下文中动态分配和分担责任,在从组中添加和删除处理器时,可正常管理负载。

  • 使用 Azure 存储 Blob 作为基础数据存储,以持久方式管理要处理的检查点和状态。

源代码 | 包 (NuGet) | API 参考文档 | 产品文档 | 故障排除指南

入门

先决条件

  • Azure 订阅:若要使用 Azure 服务(包括 Azure 事件中心),需要订阅。 如果没有现有的 Azure 帐户,可以在创建帐户时注册免费试用版或使用 Visual Studio 订阅权益。

  • 包含事件中心的事件中心命名空间:若要与Azure 事件中心交互,还需要提供命名空间和事件中心。 如果不熟悉如何创建 Azure 资源,可能需要按照使用 Azure 门户创建事件中心的分步指南进行操作。 还可以在此处找到有关使用 Azure CLI、Azure PowerShell或 Azure 资源管理器 (ARM) 模板创建事件中心的详细说明。

  • 具有 Blob 存储的 Azure 存储帐户: 若要在 Azure 存储中保留检查点并控制所有权,需要有一个 Azure 存储帐户,其中包含可用的 Blob。 用于处理器的 Azure 存储帐户应禁用软删除和 Blob 版本控制。 如果不熟悉 Azure 存储帐户,可能需要按照使用 Azure 门户创建存储帐户的分步指南进行操作。 还可以在此处找到有关使用 Azure CLI、Azure PowerShell或 Azure 资源管理器 (ARM) 模板创建存储帐户的详细说明。

  • Azure 存储 Blob 容器: Azure 存储中的检查点和所有权数据将写入特定容器中的 Blob。 EventProcessorClient需要现有容器,并且不会隐式创建一个容器来帮助防止意外配置错误。 建议为每个事件中心和使用者组组合使用唯一的容器。 如果不熟悉 Azure 存储容器,可以参阅有关 管理容器的文档。 可在此处找到有关使用 .NET、Azure CLI 或 Azure PowerShell 创建容器的详细说明。

  • C# 8.0:Azure 事件中心客户端库使用 C# 8.0 中引入的新功能。 为了利用 C# 8.0 语法,建议使用 .NET Core SDK 3.0 或更高版本进行编译, 语言版本latest

    希望充分利用 C# 8.0 语法的 Visual Studio 用户需要使用 Visual Studio 2019 或更高版本。 可在此处下载 Visual Studio 2019(包括免费的 Community Edition)。 Visual Studio 2017 的用户可以通过使用 Microsoft.Net.Compilers NuGet 包 和设置语言版本来利用 C# 8 语法,但编辑体验可能并不理想。

    你仍然可以将库与以前的 C# 语言版本一起使用,但需要手动管理异步可枚举和异步一次性成员,而不是受益于新语法。 仍可能面向 .NET Core SDK 支持的任何框架版本,包括早期版本的 .NET Core 或 .NET Framework。 有关详细信息,请参阅: 如何指定目标框架

    重要说明: 为了在不修改的情况下生成或运行 示例示例 ,必须使用 C# 11.0。 如果决定针对其他语言版本调整示例,仍可以运行这些示例。

若要在 Azure 中快速创建所需的资源并接收它们的连接字符串,可以通过单击以下方法部署示例模板:

部署到 Azure

安装包

使用 NuGet 安装适用于 .NET 的 Azure 事件中心 事件处理程序客户端库:

dotnet add package Azure.Messaging.EventHubs.Processor

验证客户端

获取事件中心连接字符串

若要使事件中心客户端库与事件中心交互,需要了解如何与之连接和授权。 执行此操作的最简单方法是使用连接字符串,该字符串是在创建事件中心命名空间时自动创建的。 如果不熟悉将连接字符串与事件中心配合使用,可能需要按照分步指南 获取事件中心连接字符串

获取 Azure 存储连接字符串

若要使事件处理程序客户端使用 Azure 存储 Blob 进行检查点,它需要了解如何连接到存储帐户并对其授权。 执行此操作的最简单方法是使用连接字符串,该字符串是在创建存储帐户时生成的。 如果不熟悉 Azure 中的存储帐户连接字符串授权,建议按照分步指南 配置 Azure 存储连接字符串

获得连接字符串后,请参阅 创建事件处理程序客户端 ,获取如何使用它们创建处理器的示例。

关键概念

  • 事件处理程序是一种构造,旨在管理与连接到给定事件中心相关的职责,并在特定使用者组的上下文中处理其每个分区的事件。 处理从分区读取的事件和处理发生的任何错误的行为由事件处理程序委托给你提供的代码,使逻辑能够专注于提供业务价值,而处理器处理与读取事件相关的任务、管理分区并允许状态以检查点的形式持久保存。

  • 检查点 是一个过程,通过该过程,读取器可标记并持久保存已针对分区处理的事件的位置。 检查点由使用者负责,并且发生在每个分区上,通常在特定使用者组的上下文中。 EventProcessorClient对于 ,这意味着对于使用者组和分区组合,处理器必须跟踪其在事件流中的当前位置。 如需详细信息,请参阅事件中心产品文档中的 检查点

    当事件处理程序连接时,它将开始读取该使用者组中该分区的最后一个处理器先前保留的检查点上的事件(如果存在)。 当事件处理程序读取分区中的事件并对其进行操作时,它应定期创建检查点,以便下游应用程序将事件标记为“已完成”,并在事件处理程序或托管事件的环境发生故障时提供复原能力。 如有必要,可以通过此检查点过程指定先前的偏移量来重新处理以前标记为“已完成”的事件。

  • 分区是事件中心内保留的有序事件。 分区是一种与事件使用者所需的并行度关联的数据组织方式。 Azure 事件中心通过分区的使用者模式提供消息流,在此模式下,每个使用者只读取消息流的特定子集或分区。 当较新的事件到达时,它们将添加到此序列的末尾。 分区数量在创建事件中心时指定,无法更改。

  • 使用者组是整个事件中心的视图。 使用者组使多个消费应用程序都有各自独立的事件流视图,并按自身步调、从自身立场独立读取流。 每个使用者组的分区上最多可以有 5 个并发读取者,但建议给定分区和使用者组配对只有一个活动的使用者。 每个活动的读取者都会从其分区接收所有事件;如果同一个分区有多个读取者,他们将接收重复的事件。

有关更多概念和更深入的讨论,请参阅: 事件中心功能

客户端生存期

EventProcessorClient可以安全地缓存,并在应用程序的生存期内用作单一实例,这是定期读取事件时的最佳做法。 客户端负责高效管理网络、CPU 和内存使用,努力在非活动期间保持低使用率。 需要调用 StopProcessingAsync 处理器上的 或 StopProcessing ,以确保正确清理网络资源和其他非托管对象。

线程安全

我们保证所有客户端实例方法都是线程安全的,并且相互独立, (准则) 。 这可确保重用客户端实例的建议始终是安全的,即使跨线程也是如此。

数据模型类型(如 EventDataEventDataBatch )不是线程安全的。 它们不应跨线程共享,也不应与客户端方法同时使用。

其他概念

客户端选项 | 事件处理程序 | 处理失败 | 诊断 | 模拟 (处理器) | 模拟 (客户端类型)

示例

创建事件处理程序客户端

由于 EventProcessorClient 依赖于 Azure 存储 blob 以保持其状态,因此需要为处理器提供 BlobContainerClient,并且该客户端已针对应使用的存储帐户和容器进行了配置。 用于配置 的 EventProcessorClient 容器必须存在。

EventProcessorClient由于 无法知道指定不存在的容器的意图,因此它不会隐式创建容器。 这可以防范配置错误的容器,从而导致恶意处理器无法共享所有权并干扰使用者组中的其他处理器。

// The container specified when creating the BlobContainerClient must exist; it will
// not be implicitly created.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

配置事件和错误处理程序

若要使用 , EventProcessorClient必须提供用于事件处理和错误的处理程序。 这些处理程序被视为独立处理程序,开发人员负责确保处理程序代码中的异常得到考虑。

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

async Task processEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an event.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an error.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheError(eventArgs.Exception);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

启动和停止处理

EventProcessorClient显式启动后,将在后台执行其处理,并继续执行此操作,直到显式停止。 虽然这允许应用程序代码执行其他任务,但它还负责确保在处理期间,如果没有执行其他任务,进程不会终止。

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}

将 Active Directory 主体与事件处理程序客户端配合使用

Azure 标识库提供 Azure Active Directory 身份验证支持,可用于 Azure 客户端库,包括事件中心和 Azure 存储。

若要使用 Active Directory 主体,请在创建事件中心客户端时指定库中的一个可用凭据 Azure.Identity 。 此外,提供完全限定的事件中心命名空间和所需事件中心的名称,以代替事件中心连接字符串。

若要将 Active Directory 主体与 Azure 存储 Blob 容器配合使用,必须在创建存储客户端时提供容器的完全限定 URL。 有关访问 Blob 存储的有效 URI 格式的详细信息,请参阅 命名和引用容器、Blob 和元数据

var credential = new DefaultAzureCredential();
var blobStorageUrl ="<< FULLY-QUALIFIED CONTAINER URL (like https://myaccount.blob.core.windows.net/mycontainer) >>";

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(new Uri(blobStorageUrl), credential);

var processor = new EventProcessorClient
(
    storageClient,
    consumerGroup,
    fullyQualifiedNamespace,
    eventHubName,
    credential
);

将 Azure Active Directory 与事件中心配合使用时,必须为主体分配一个角色,该角色允许从事件中心读取数据,例如 Azure Event Hubs Data Receiver 角色。 有关将 Azure Active Directory 授权与事件中心配合使用的详细信息,请参阅 相关文档

将 Azure Active Directory 与 Azure 存储配合使用时,必须为主体分配一个角色,该角色允许读取、写入和删除对 Blob(如角色) Storage Blob Data Contributor 的访问。 有关将 Active Directory 授权与 Azure 存储配合使用的详细信息,请参阅 相关文档Azure 存储授权示例

疑难解答

有关详细的故障排除信息,请参阅 事件中心故障排除指南

异常处理

事件处理程序客户端异常

事件处理程序客户端在遇到异常时都会尝试复原,并且会采取必要的操作继续处理,除非无法执行此操作。 无需开发人员执行任何操作 即可执行此操作;它本身是处理器行为的一部分。

为了让开发人员有机会检查和响应事件处理程序客户端操作中发生的异常,它们通过 ProcessError 事件显示。 此事件的参数提供有关异常及其观察到的上下文的详细信息。 开发人员可以从此事件处理程序内部对事件处理程序客户端执行正常操作,例如停止和/或重新启动它以响应错误,但可能不会以其他方式影响处理器的异常行为。

有关实现错误处理程序的基本示例,请参阅示例: 事件处理程序处理程序

事件处理程序中的异常

由于事件处理程序客户端缺少适当的上下文来了解开发人员提供的事件处理程序中异常的严重性,因此它无法假设哪些操作是对其的合理响应。 因此,开发人员被视为对使用 try/catch 块和其他标准语言构造提供的事件处理程序中发生的异常负责。

事件处理程序客户端不会尝试检测开发人员代码中的异常,也不会显式显示这些异常。 结果行为将取决于处理器的托管环境和调用事件处理程序的上下文。 由于这可能因不同方案而异,因此强烈建议开发人员以防御方式编写其事件处理程序,并考虑潜在的异常。

日志记录和诊断

完全检测事件处理程序客户端库,以便使用 .NET EventSource 发出信息来记录各种详细级别的信息。 日志记录针对每个操作执行,并遵循标记操作的起点、操作完成和遇到的任何异常的模式。 可能提供见解的其他信息也会记录在关联操作的上下文中。

可以通过选择加入名为“Azure-Messaging-EventHubs-Processor-EventProcessorClient”的源或选择加入具有“AzureEventSource”特征的所有源来获取 EventListener 事件处理程序客户端日志。 为了更轻松地从 Azure 客户端库捕获日志, Azure.Core 事件中心使用的库提供了 AzureEventSourceListener。 有关详细信息,请参阅 使用 AzureEventSourceListener 捕获事件中心日志

还会使用 Application Insights 或 OpenTelemetry 检测事件处理程序库的分布式跟踪。 有关详细信息,请参阅 Azure.Core 诊断示例

后续步骤

除了讨论的方案之外,Azure 事件中心处理器库还提供对其他方案的支持,以帮助利用 的完整功能集EventProcessorClient。 为了帮助探索其中的某些方案,事件中心处理器客户端库提供了一个示例项目,作为常见方案的插图。 有关详细信息,请参阅示例 自述文件

贡献

本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 https://cla.microsoft.com

提交拉取请求时,CLA 机器人将自动确定你是否需要提供 CLA,并相应地修饰 PR(例如标签、注释)。 直接按机器人提供的说明操作。 只需使用 CLA 对所有存储库执行一次这样的操作。

此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。

有关详细信息,请参阅 我们的贡献指南

曝光数