你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure Functions 的 Azure 事件中心触发器和绑定

本文介绍如何使用 Azure Functions 的 Azure 事件中心绑定。 Azure Functions 支持事件中心的触发器和输出绑定。

操作 类型
响应发送到事件中心事件流的事件。 触发器
将事件写入到事件流 输出绑定

安装扩展

你安装的扩展 NuGet 包取决于你在函数应用中使用的 C# 模式:

函数在独立的 C# 工作进程中执行。 若要了解详细信息,请参阅有关在独立工作进程中运行 C# Azure Functions 的指南

扩展的功能因扩展版本而异:

此版本引入了使用标识而不是机密进行连接的功能。 有关使用托管标识配置函数应用的教程,请参阅使用基于标识的连接创建函数应用教程

通过安装 NuGet 包版本 5.x 将该扩展添加到你的项目。

安装捆绑包

事件中心扩展是在 host.json 项目文件中指定的扩展捆绑包的一部分。 你可能需要修改此捆绑包以更改绑定的版本,或者如果尚未安装捆绑包。 若要了解详细信息,请参阅扩展捆绑包

此版本引入了使用标识而不是机密进行连接的功能。 有关使用托管标识配置函数应用的教程,请参阅使用基于标识的连接创建函数应用教程

可以通过在 host.json 文件中添加或替换以下代码,从扩展捆绑包 v3 添加该扩展的这一版本:

{
    "version": "2.0",
    "extensionBundle": {
        "id": "Microsoft.Azure.Functions.ExtensionBundle",
        "version": "[3.3.0, 4.0.0)"
    }
}

若要了解详细信息,请参阅更新扩展

绑定类型

.NET 支持的绑定类型取决于扩展版本和 C# 执行模式,可以是以下类型之一:

独立工作进程类库的已编译 C# 函数在独立于运行时的进程中运行。

请选择一个版本来查看模式和版本的绑定类型详细信息。

独立工作进程支持下表所示的参数类型。 对绑定到 Azure.Messaging.EventHubs 中类型的支持目前处于预览阶段。

事件中心触发器

如果你希望函数处理单个事件,可将事件中心触发器绑定到以下类型:

类型 说明
string 字符串形式的事件。 当事件为简单文本时使用。
byte[] 事件的字节数。
JSON 可序列化类型 当事件包含 JSON 数据时,Functions 会尝试将 JSON 数据反序列化为普通的旧 CLR 对象 (POCO) 类型。
Azure.Messaging.EventHubs.EventData1 事件对象。
如果要从事件中心 SDK 的任何旧版本迁移,请注意,此版本将放弃对旧 Body 类型的支持,转而支持 EventBody

如果你希望函数处理批量事件,可将事件中心触发器绑定到以下类型:

类型 说明
string[] 批处理中的事件数组,以字符串形式表示。 每个条目表示一个事件。
EventData[] 1 批处理中的事件数组,以 Azure.Messaging.EventHubs.EventData 的实例形式表示。 每个条目表示一个事件。
T[],其中 T 是 JSON 可序列化类型1 批处理中的事件数组,以自定义 POCO 类型的实例形式表示。 每个条目表示一个事件。

1 若要使用这些类型,需要引用 Microsoft.Azure.Functions.Worker.Extensions.EventHubs 5.5.0 或更高版本以及 SDK 类型绑定的常见依赖项

事件中心输出绑定

如果希望函数写入单个事件,事件中心输出绑定可以绑定到以下类型:

类型 说明
string 字符串形式的事件。 当事件为简单文本时使用。
byte[] 事件的字节数。
JSON 可序列化类型 表示事件的对象。 函数尝试将普通的旧 CLR 对象 (POCO) 类型序列化为 JSON 数据。

如果希望函数写入多个事件,事件中心输出绑定可以绑定到以下类型:

类型 说明
T[],其中 T 是单事件类型之一 包含多个事件的数组。 每个条目表示一个事件。

对于其他输出方案,请直接从 Microsoft.Azure.EventHubs 创建和使用类型。

host.json 设置

host.json 文件包含控制事件中心触发器行为的设置。 配置因扩展版本而异。

{
    "version": "2.0",
    "extensions": {
        "eventHubs": {
            "maxEventBatchSize" : 100,
            "minEventBatchSize" : 25,
            "maxWaitTime" : "00:05:00",            
            "batchCheckpointFrequency" : 1,
            "prefetchCount" : 300,
            "transportType" : "amqpWebSockets",
            "webProxy" : "https://proxyserver:8080",
            "customEndpointAddress" : "amqps://company.gateway.local",
            "targetUnprocessedEventThreshold" : 75,
            "initialOffsetOptions" : {
                "type" : "fromStart",
                "enqueuedTimeUtc" : ""
            },
            "clientRetryOptions":{
                "mode" : "exponential",
                "tryTimeout" : "00:01:00",
                "delay" : "00:00:00.80",
                "maximumDelay" : "00:01:00",
                "maximumRetries" : 3
            }
        }
    }
}  
属性 默认 说明
maxEventBatchSize2 100 单次调用的批中包含的最大事件数。 必须至少为 1。
minEventBatchSize1 1 批中所需的最小事件数。 仅当函数接收多个事件并且事件数必须小于 maxEventBatchSize 时,最小事件数才适用。
不严格保证最小大小。 如果在 maxWaitTime 内无法准备好整批,则将调度部分批。 在缩放发生后,首次调用函数时,也可能会调度部分批。
maxWaitTime1 00:01:00 在调用函数之前触发器应等待填充批的最大时间间隔。 仅当 minEventBatchSize 大于 1 时才考虑等待时间,否则忽略。 如果在达到等待时间前可用事件数少于 minEventBatchSize,则会使用部分批调用函数。 允许的最长等待时间为 10 分钟。

注意:此间隔不能严格保证调用函数的确切时间。 由于计时器精度,可能存在很小的误差。 在缩放发生时,使用部分批进行的第一次调用可能会更快,或者最多需要所配置等待时间的两倍。
batchCheckpointFrequency 1 为事件中心创建检查点之前要处理的批数。
prefetchCount 300 从事件中心请求并在本地缓存中保留以支持读取避免等待网络操作的事件数
transportType amqpTcp 用于与事件中心通信的协议和传输。 可用选项:amqpTcpamqpWebSockets
webProxy Null 用于通过 Web 套接字与事件中心进行通信的代理。 代理不能与 amqpTcp 传输一起使用。
customEndpointAddress Null 与事件中心建立连接时要使用的地址,使网络请求可以通过应用程序网关或主机环境所需的其他路径路由。 使用自定义终结点地址时,仍然需要事件中心的完全限定命名空间,并且必须显式指定或通过连接字符串指定。
targetUnprocessedEventThreshold1 Null 每个函数实例所需的未处理事件数。 此阈值在基于目标的缩放中用于替代从 maxEventBatchSize 选项推断的默认缩放阈值。 设置后,将未处理事件总数除以此值,可确定所需的函数实例数。 实例计数将向上舍入为创建均衡分区分布的数字。
initialOffsetOptions/type fromStart 当存储中不存在检查点时要在事件流中开始处理事件的位置。 应用于所有分区。 有关详细信息,请参阅 OffsetType 文档。 可用选项:fromStartfromEndfromEnqueuedTime
initialOffsetOptions/enqueuedTimeUtc Null 指定在事件流中开始处理的事件的排队时间。 如果 initialOffsetOptions/type 配置为 fromEnqueuedTime,则必需此设置。 支持 DateTime.Parse() 支持的任何格式的时间,如 2020-10-26T20:31Z。 为清楚起见,还应指定时区。 如果未指定时区,Functions 将假设是运行函数应用的计算机的本地时区,即在 Azure 上运行时的 UTC。
clientRetryOptions/mode 指数 用于计算重试延迟的方法。 指数模式可根据一个退避策略来重试带延迟的尝试,该策略规定每次尝试都会增加重试前的等待时间。 固定模式会按固定间隔重试,并且每个延迟的持续时间一致。 可用选项:exponentialfixed
clientRetryOptions/tryTimeout 00:01:00 每次尝试时等待事件中心操作完成的最大持续时间。
clientRetryOptions/delay 00:00:00.80 要在两次重试之间应用的延迟或回退因子。
clientRetryOptions/maximumDelay 00:00:01 允许出现在两次重试之间的最大延迟。
clientRetryOptions/maximumRetries 3 将关联的操作视为失败之前的最大重试次数。

1 使用 minEventBatchSizemaxWaitTime 需要 v5.3.0 或更高版本的 Microsoft.Azure.WebJobs.Extensions.EventHubs 包。

2 默认的 maxEventBatchSizeMicrosoft.Azure.WebJobs.Extensions.EventHubs 包的 v6.0.0 中发生了更改。 在早期版本中,它是 10。

clientRetryOptions 用于在 Functions 主机和事件中心之间重试操作(例如提取事件和发送事件)。 有关将重试策略应用于各个函数的信息,请参阅关于 Azure Functions 错误处理和重试的指导。

有关 Azure Functions 2.x 及更高版本中的 host.json 参考,请参阅 Azure Functions 的 host.json 参考

后续步骤