你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

在 Azure IoT MQ 预览版与 Azure 事件中心或 Kafka 之间发送和接收消息

重要

Azure IoT 操作预览版(由 Azure Arc 启用)当前处于预览状态。 不应在生产环境中使用此预览版软件。

有关 beta 版本、预览版或尚未正式发布的版本的 Azure 功能所适用的法律条款,请参阅 Microsoft Azure 预览版的补充使用条款

Kafka 连接器将消息从 Azure IoT MQ 预览版的 MQTT 代理推送到 Kafka 终结点,并以类似的方式在另一端拉取消息。 由于 Azure 事件中心支持 Kafka API,连接器可直接与事件中心一起使用。

通过 Kafka 终结点配置事件中心连接器

默认情况下,连接器未随 Azure IoT MQ 一起安装。 必须使用指定的主题映射和身份验证凭据显式启用它。 按照以下步骤通过 Kafka 终结点在 IoT MQ 和 Azure 事件中心启用双向通信。

  1. 创建事件中心命名空间

  2. 为每个 Kafka 主题创建事件中心

授予连接器对事件中心命名空间的访问权限

授予 IoT MQ Arc 扩展对事件中心命名空间的访问权限是建立从 IoT MQ 的 Kakfa 连接器到事件中心的安全连接的最便捷方法。

将以下 Bicep 模板保存到文件,并在为环境设置有效参数后将其应用于 Azure CLI:

注意

Bicep 模板假定 Arc 连接群集和事件中心命名空间位于同一资源组中,如果环境不同,请调整模板。

@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'

resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
  name: clusterName
}

resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
  name: mqExtensionName
  scope: connectedCluster
}

resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
  name: eventHubNamespaceName
}

// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
  scope: ehNamespace
  properties: {
     // ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}

// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
  scope: ehNamespace
  properties: {
    // ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}
# Set the required environment variables

# Resource group for resources
RESOURCE_GROUP=xxx

# Bicep template files name
TEMPLATE_FILE_NAME=xxx

# MQ Arc extension name
MQ_EXTENSION_NAME=xxx

# Arc connected cluster name
CLUSTER_NAME=xxx

# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx


az deployment group create \
      --name assign-RBAC-roles \
      --resource-group $RESOURCE_GROUP \
      --template-file $TEMPLATE_FILE_NAME \
      --parameters mqExtensionName=$MQ_EXTENSION_NAME \
      --parameters clusterName=$CLUSTER_NAME \
      --parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE

KafkaConnector

KafkaConnector 自定义资源 (CR) 允许配置可以与 Kafka 主机和事件中心通信的 Kafka 连接器。 Kafka 连接器可以使用事件中心作为 Kafka 兼容的终结点在 MQTT 主题和 Kafka 主题之间传输数据。

以下示例演示使用 IoT MQ 的 Azure 标识连接到事件中心终结点的 KafkaConnector CR,它假定使用快速入门安装了其他 MQ 资源:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
  name: my-eh-connector
  namespace: azure-iot-operations # same as one used for other MQ resources
spec:
  image:
    pullPolicy: IfNotPresent
    repository: mcr.microsoft.com/azureiotoperations/kafka
    tag: 0.4.0-preview
  instances: 2
  clientIdPrefix: my-prefix
  kafkaConnection:
    # Port 9093 is Event Hubs' Kakfa endpoint
    # Plug in your Event Hubs namespace name
    endpoint: <NAMESPACE>.servicebus.windows.net:9093
    tls:
      tlsEnabled: true
    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          # plugin in your Event Hubs namespace name
          audience: "https://<NAMESPACE>.servicebus.windows.net" 
  localBrokerConnection:
    endpoint: "aio-mq-dmqtt-frontend:8883"
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
    authentication:
      kubernetes: {}

下表描述了 KafkaConnector CR 中的字段:

字段 说明 必须
image Kafka 连接器的图像。 可以指定映像的 pullPolicyrepositorytag。 上表列出了默认值。
instances 要运行的 Kafka 连接器的实例数。
clientIdPrefix 要追加到连接器使用的客户端 ID 的字符串。
kafkaConnection 事件中心终结点的连接详细信息。 请参阅 Kafka 连接
localBrokerConnection 替代默认代理连接的本地代理的连接详细信息。 请参阅管理本地代理连接
logLevel Kafka 连接器的日志级别。 可能的值包括:跟踪调试信息警告错误致命。 默认值为“警告”。

Kafka 连接

kafkaConnection 字段定义 Kafka 终结点的连接详细信息。

字段 说明 必须
endpoint 事件中心终结点的主机和端口。 端口通常是 9093。 可以指定用逗号分隔的多个终结点,以使用启动服务器语法。
tls TLS 加密的配置。 请参阅 TLS
authentication 身份验证的配置。 请参阅身份验证

TLS

tls 字段为连接启用 TLS 加密,并选择性地指定 CA 配置映射。

字段 说明 必须
tlsEnabled 指示是否启用了 TLS 加密的布尔值。 对于事件中心通信,必须将其设置为 true。
trustedCaCertificateConfigMap 配置映射的名称,其中包含用于验证服务器的标识的 CA 证书。 事件中心通信不需要此字段,因为事件中心默认使用受信任的已知 CA。 但是,如果要使用自定义 CA 证书,可以使用此字段。

指定受信任的 CA 时,请创建一个包含 PEM 格式的 CA 公共部分的 ConfigMap,并在 trustedCaCertificateConfigMap 属性中指定名称。

kubectl create configmap ca-pem --from-file path/to/ca.pem

身份验证

身份验证字段支持不同类型的身份验证方法,例如 SASL、X509 或托管标识。

字段 说明 必须
enabled 指示是否启用了身份验证的布尔值。
authType 包含所使用的身份验证类型的字段。 请参阅身份验证类型
身份验证类型
字段 说明 必须
sasl SASL 身份验证的配置。 指定 saslType,它可以是纯文本、scramSha256 或 scramSha512,以及指定 token 以引用包含密码的 Kubernetes secretName 或 Azure Key Vault keyVault 机密 是的,如果使用 SASL 身份验证
systemAssignedManagedIdentity 托管标识身份验证的配置。 指定令牌请求的访问群体,该请求必须与事件中心命名空间(https://<NAMESPACE>.servicebus.windows.net匹配,因为连接器是 Kafka 客户端。 系统分配的托管标识会在启用后自动创建并分配给连接器。 是的,如果使用托管标识身份验证
x509 X509 身份验证的配置。 指定 secretNamekeyVault 字段。 secretName 字段是机密的名称,其中包含客户端证书和 PEM 格式的客户端密钥,存储为 TLS 机密。 是的,如果使用 X509 身份验证

若要了解如何使用 Azure Key Vault 和 keyVault 来管理 Azure IoT MQ 的机密而不是 Kubernetes 机密,请参阅使用 Azure Key Vault 或 Kubernetes 机密管理机密

向事件中心进行身份验证

要使用连接字符串和 Kubernetes 机密连接到事件中心,请使用 plain SASL 类型和 $ConnectionString 作为用户名,并将完整连接字符串用作密码。 首先,创建 Kubernetes 机密:

kubectl create secret generic cs-secret -n azure-iot-operations \
  --from-literal=username='$ConnectionString' \
  --from-literal=password='Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY>'

然后,在配置中引用该机密:

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        secretName: cs-secret

若要使用 Azure Key Vault 而不是 Kubernetes 机密,请使用连接字符串 Endpoint=sb://.. 创建一个 Azure Key Vault 机密,使用 vaultSecret 引用该机密,并在配置中将用户名指定为 "$ConnectionString"

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        keyVault:
          username: "$ConnectionString"
          vault:
            name: my-key-vault
            directoryId: <AKV directory ID>
            credentials:
              servicePrincipalLocalSecretName: aio-akv-sp
          vaultSecret:
            name: my-cs # Endpoint=sb://..
            # version: 939ecc2...

要使用托管标识,请将其指定为身份验证下的唯一方法。 还需要将角色分配给托管标识,该标识授予从事件中心(例如 Azure 事件中心数据所有者或 Azure 事件中心数据发送方/接收方)发送和接收消息的权限。 要了解详情,请参阅使用 Microsoft Entra ID 对访问事件中心资源的应用程序进行身份验证

authentication:
  enabled: true
  authType:
    systemAssignedManagedIdentity:
      audience: https://<NAMESPACE>.servicebus.windows.net
X.509

对于 X.509,请使用包含公共证书和私钥的 Kubernetes TLS 机密。

kubectl create secret tls my-tls-secret -n azure-iot-operations \
  --cert=path/to/cert/file \
  --key=path/to/key/file

然后,在配置中指定 secretName

authentication:
  enabled: true
  authType:
    x509:
      secretName: my-tls-secret

若要改用 Azure Key Vault,请确保证书和私钥已正确导入,然后使用 vaultCert 指定引用。

authentication:
  enabled: true
  authType:
    x509:
      keyVault:
        vault:
          name: my-key-vault
          directoryId: <AKV directory ID>
          credentials:
            servicePrincipalLocalSecretName: aio-akv-sp
        vaultCert:
          name: my-cert
          # version: 939ecc2...
        ## If presenting full chain also  
        # vaultCaChainSecret:
        #   name: my-chain

或者,如果需要呈现完整链,请将完整链证书和密钥作为 PFX 文件上传到 AKV,并改用 vaultCaChainSecret 字段。

# ...
keyVault:
  vaultCaChainSecret:
    name: my-cert
    # version: 939ecc2...

管理本地代理连接

与 MQTT 桥一样,事件中心连接器充当 IoT MQ MQTT 代理的客户端。 如果已自定义 IoT MQ MQTT 代理的侦听器端口和/或身份验证,请同时替代事件中心连接器的本地 MQTT 连接配置。 要了解详细信息,请参阅 MQTT 桥本地代理连接

KafkaConnectorTopicMap

通过 KafkaConnectorTopicMap 自定义资源 (CR),可以定义 MQTT 主题与 Kafka 主题之间的映射,以便进行双向数据传输。 指定对 KafkaConnector CR 和路由列表的引用。 每个路由可以是 MQTT 到 Kafka 路由,也可以是 Kafka 到 MQTT 路由。 例如:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
  name: my-eh-topic-map
  namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
  kafkaConnectorRef: my-eh-connector
  compression: snappy
  batching:
    enabled: true
    latencyMs: 1000
    maxMessages: 100
    maxBytes: 1024
  partitionStrategy: property
  partitionKeyProperty: device-id
  copyMqttProperties: true
  routes:
    # Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
    - mqttToKafka:
        name: "route1"
        mqttTopic: temperature-alerts/#
        kafkaTopic: receiving-event-hub
        kafkaAcks: one
        qos: 1
        sharedSubscription:
          groupName: group1
          groupMinimumShareNumber: 3
    # Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
    - kafkaToMqtt:
        name: "route2"
        consumerGroupId: mqConnector
        kafkaTopic: sending-event-hub
        mqttTopic: heater-commands
        qos: 0

下表描述了 KafkaConnectorTopicMap CR 中的字段:

字段 说明 必须
kafkaConnectorRef 本主题映射所属的 KafkaConnector CR 的名称。
compression 发送到 Kafka 主题的消息的压缩配置。 请参阅压缩
批处理 将消息批量发送到 Kafka 主题的配置。 请参阅批处理
partitionStrategy 将消息发送到 Kafka 主题时处理 Kafka 分区的策略。 请参阅分区处理策略
copyMqttProperties 用于控制 MQTT 系统和用户属性是否复制到 Kafka 消息标头的布尔值。 用户属性按原样复制。 某些转换是使用系统属性完成的。 默认为 false。
routes MQTT 主题和 Kafka 主题之间数据传输的路由列表。 每个路由都可以有 mqttToKafkakafkaToMqtt 字段,具体取决于数据传输的方向。 请参阅 路由

压缩

压缩字段可为发送到 Kafka 主题的消息启用压缩配置。 压缩有助于减少数据传输所需的网络带宽和存储空间。 但是,压缩还会给进程增加一些开销和延迟。 下表中列出了支持的压缩类型。

说明
不应用压缩或批处理。 如果未指定压缩,则默认值为“”。
gzip 应用 GZIP 压缩和批处理。 GZIP 是一种通用压缩算法,提供压缩比率和速度之间的良好平衡。
snappy 应用 Snappy 压缩和批处理。 Snappy 是一种快速压缩算法,可提供中等压缩比率和速度。
lz4 应用 LZ4 压缩和批处理。 LZ4 是一种快速压缩算法,可提供低压缩比率和高速度。

批处理

除了压缩之外,还可以在将消息发送到 Kafka 主题之前为消息配置批处理。 批处理允许将多个消息组合在一起,并将其压缩为单个单元,从而提高压缩效率并减少网络开销。

字段 说明 必须
enabled 指示是否启用了批处理的布尔值。 如果未设置,则默认值为 false。
latencyMs 消息在发送前可以缓冲的最大时间间隔(以毫秒为单位)。 如果达到此间隔,则所有缓冲消息都会通过批处理发送,而不管它们的数量或大小。 如果未设置,则默认值为 5。
maxMessages 在发送之前可以缓冲的最大消息数。 如果达到此数量,则所有缓冲消息都会通过批处理发送,而不管它们的大小或缓冲时间。 如果未设置,则默认值为 100000。
maxBytes 在发送之前可以缓冲的最大大小(以字节为单位)。 如果达到此大小,则所有缓冲消息都会通过批处理发送,而不管它们的数量或缓冲时间。 默认值为 1000000 (1 MB)。

下面是使用批处理的示例:

batching:
  enabled: true
  latencyMs: 1000
  maxMessages: 100
  maxBytes: 1024

这意味着,当缓冲区中有 100 条消息,缓冲区中有 1024 个字节时,或距离上次发送已有 1000 毫秒时(不论先达到哪个条件),将发送消息。

分区处理策略

分区处理策略是一项功能,可用于控制将消息发送到 Kafka 主题时如何将其分配给 Kafka 分区。 Kafka 分区是支持并行处理和容错的 Kafka 主题的逻辑段。 Kafka 主题中的每个消息都有一个分区和一个偏移量,用于标记消息并进行排序。

默认情况下,Kafka 连接器使用轮循机制算法将消息分配给随机分区。 但是,你可以使用不同的策略根据某些条件(例如 MQTT 主题名称或 MQTT 消息属性)将消息分配到分区。 这有助于实现更好的负载均衡、数据区域或消息排序。

说明
default 使用轮循机制算法将消息分配给随机分区。 如果未指定策略,则使用默认值。
static 将消息分配给派生自连接器实例 ID 的固定分区号。 这意味着每个连接器实例都会将消息发送到不同的分区。 这有助于实现更好的负载均衡和数据区域。
主题 使用 MQTT 主题名称作为分区的键。 这意味着具有相同 MQTT 主题名称的消息将发送到同一分区。 这有助于实现更好的消息排序和数据区域。
property 使用 MQTT 消息属性作为分区的键。 指定 partitionKeyProperty 字段中的属性名称。 这意味着具有相同属性值的消息将发送到同一分区。 这有助于根据自定义条件实现更好的消息排序和数据区域。

使用分区处理策略的示例是:

partitionStrategy: property
partitionKeyProperty: device-id

这意味着具有相同设备 ID 的消息将发送到同一分区。

路由

路由字段定义 MQTT 主题和 Kafka 主题之间数据传输的路由列表。 每个路由都可以有 mqttToKafkakafkaToMqtt 字段,具体取决于数据传输的方向。

MQTT 到 Kafka

mqttToKafka 字段定义将数据从 MQTT 主题传输到 Kafka 主题的路由。

字段 说明 必需
name 路由的唯一名称。
mqttTopic 要从中订阅的 MQTT 主题。 可以使用通配符(#+)来匹配多个主题。
kafkaTopic 要发送到的 Kafka 主题。
kafkaAcks 连接器需要 Kafka 终结点的确认数。 可能的值为 zerooneall
qos MQTT 主题订阅的服务质量 (QoS) 级别。 可能的值:0 或 1(默认值)。 目前不支持 QoS 2。
sharedSubscription 将共享订阅用于 MQTT 主题的配置。 指定 groupName,这是一组订阅者的唯一标识符,以及指定 groupMinimumShareNumber,这是从主题接收消息的组中的订阅服务器数量。 例如,如果 groupName 为“group1”且 groupMinimumShareNumber 为 3,则连接器会创建三个具有相同组名称的订阅服务器,以便从主题接收消息。 此功能允许在多个订阅服务器之间分发消息,而不会丢失任何消息或创建重复消息。

使用 mqttToKafka 路由的示例:

mqttToKafka:
  mqttTopic: temperature-alerts/#
  kafkaTopic: receiving-event-hub
  kafkaAcks: one
  qos: 1
  sharedSubscription:
    groupName: group1
    groupMinimumShareNumber: 3

在此示例中,与 temperature-alerts/# 匹配的 MQTT 主题的消息发送到 Kafka 主题 receiving-event-hub(其中 QoS 等效于 1)和共享订阅组“group1”(其中共享数量为 3)。

Kafka 到 MQTT

kafkaToMqtt 字段定义将数据从 Kafka 主题传输到 MQTT 主题的路由。

字段 说明 必需
name 路由的唯一名称。
kafkaTopic 要从中拉取消息的 Kafka 主题。
mqttTopic 要将消息发布到其中的 MQTT 主题。
consumerGroupId 每个 Kafka 到 MQTT 路由的使用者组 ID 的前缀。 如果未设置,使用者组 ID 将设置为与路由名称相同的 ID。
qos 发送到 MQTT 主题的消息的服务质量 (QoS) 级别。 可能的值为 0 或 1(默认值)。 目前不支持 QoS 2。 如果 QoS 设置为 1,连接器会将消息发布到 MQTT 主题,然后在将消息提交回 Kafka 之前等待确认。 对于 QoS 0,连接器会立即提交回 Kafka,无需 MQTT 确认。

使用 kafkaToMqtt 路由的示例:

kafkaToMqtt:
  kafkaTopic: sending-event-hub
  mqttTopic: heater-commands
  qos: 0

在此示例中,来自 Kafka 主题 sending-event-hub* 的消息将发布到 MQTT 主题 heater-commands,且 QoS 级别为 0。

事件中心名称必须与 Kafka 主题匹配

每个单独的事件中心(而非命名空间)都必须与路由中指定的预期 Kafka 主题的名称完全相同。 此外,如果连接字符串限定为一个事件中心,则连接字符串 EntityPath 必须相匹配。 此要求是因为事件中心命名空间类似于 Kafka 群集,事件中心名称类似于 Kafka 主题,因此 Kafka 主题名称必须与事件中心名称匹配。

Kafka 使用者组偏移。

如果连接器断开或被删除,并使用同一 Kafka 使用者组 ID 重新安装,则使用者组偏移量(Kafka 使用者读取消息的最后一个位置)将存储在 Azure 事件中心。 要了解详细信息,请参阅事件中心使用者组与Kafka 使用者组

MQTT 版本

此连接器仅使用 MQTT v5。

使用 Azure IoT MQ 预览版发布和订阅 MQTT 消息