你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 Azure Functions 的 Apache Kafka 绑定概述

使用适用于 Azure Functions 的 Kafka 扩展,你可以通过输出绑定将值写入到 Apache Kafka 主题。 还可以使用触发器来调用函数,以响应 Kafka 主题中的消息。

重要

Kafka 绑定仅可用于弹性高级计划专用(应用服务)计划中的 Functions。 它们仅在 3.x 版及更高版本的 Functions 运行时上受支持。

操作 类型
基于新的 Kafka 事件来运行函数。 触发器
写入到 Kafka 事件流。 输出绑定

安装扩展

你安装的扩展 NuGet 包取决于你在函数应用中使用的 C# 模式:

函数在独立的 C# 工作进程中执行。 若要了解详细信息,请参阅有关在独立工作进程中运行 C# Azure Functions 的指南

通过安装此 NuGet 包将该扩展添加到你的项目。

安装捆绑包

Kafka 扩展是在 host.json 项目文件中指定的扩展捆绑包的一部分。 在创建面向 Functions 3.x 或更高版本的项目时,应该已经安装好此捆绑包。 若要了解详细信息,请参阅扩展捆绑包

启用运行时缩放

若要允许函数在使用 Kafka 触发器和绑定时在高级计划上正确进行缩放,需要启用运行时缩放监视。

在 Azure 门户的函数应用中,选择“配置”,然后在“函数运行时设置”选项卡上将“运行时缩放监视”切换到“打开”。

用于启用运行时缩放的 Azure 门户面板的屏幕截图。

host.json 设置

本部分介绍 3.x 及更高版本中可用于此绑定的配置设置。 host.json 文件中的设置将应用于函数应用实例中的所有函数。 若要详细了解 3.x 及更高版本中的函数应用配置设置,请参阅 Azure Functions 的 host.json 参考

{
    "version": "2.0",
    "extensions": {
        "kafka": {
            "maxBatchSize": 64,
            "SubscriberIntervalInSeconds": 1,
            "ExecutorChannelCapacity": 1,
            "ChannelFullRetryIntervalInMs": 50
        }
    }
}

属性 默认 类型 说明
ChannelFullRetryIntervalInMs 50 触发器 定义尝试将项添加到满负荷通道时使用的订阅服务器重试间隔(以毫秒为单位)。
ExecutorChannelCapacity 1 两者 定义通道消息容量。 达到容量后,Kafka 订阅服务器会暂停,直到函数赶上。
MaxBatchSize 64 触发器 调用 Kafka 触发的函数时的最大批大小。
SubscriberIntervalInSeconds 1 触发器 定义每个函数执行传入消息的最小频率,按秒计算。 前提条件是消息量小于 MaxBatchSize / SubscriberIntervalInSeconds

host.json 的 kafka 节也支持继承自 Apache Kafka C/C++ 客户端库的以下属性,不管是对于触发器,还是对于输出绑定和触发器:

属性 适用于 librdkafka 等效项
AutoCommitIntervalMs 触发器 auto.commit.interval.ms
AutoOffsetReset 触发器 auto.offset.reset
FetchMaxBytes 触发器 fetch.max.bytes
LibkafkaDebug 两者 debug
MaxPartitionFetchBytes 触发器 max.partition.fetch.bytes
MaxPollIntervalMs 触发器 max.poll.interval.ms
MetadataMaxAgeMs 两者 metadata.max.age.ms
QueuedMinMessages 触发器 queued.min.messages
QueuedMaxMessagesKbytes 触发器 queued.max.messages.kbytes
ReconnectBackoffMs 触发器 reconnect.backoff.max.ms
ReconnectBackoffMaxMs 触发器 reconnect.backoff.max.ms
SessionTimeoutMs 触发器 session.timeout.ms
SocketKeepaliveEnable 两者 socket.keepalive.enable
StatisticsIntervalMs 触发器 statistics.interval.ms

后续步骤