Compartir por


Envío y recepción de mensajes entre Azure IoT MQ (versión preliminar) y Azure Event Hubs o Kafka

Importante

Operaciones de IoT de Azure, habilitado por Azure Arc, está actualmente en VERSIÓN PRELIMINAR. No se debería usar este software en versión preliminar en entornos de producción.

Consulte Términos de uso complementarios para las versiones preliminares de Microsoft Azure para conocer los términos legales que se aplican a las características de Azure que se encuentran en la versión beta, en versión preliminar o que todavía no se han publicado para que estén disponibles con carácter general.

El conector de Kafka inserta mensajes del agente MQTT de Azure IoT MQ (versión preliminar) en un punto de conexión de Kafka y, de forma similar, extrae mensajes de manera consecuente. Dado que Azure Event Hubs admite la API de Kafka, el conector funciona perfectamente con Event Hubs.

Configuración del conector de Event Hubs mediante el punto de conexión de Kafka

De forma predeterminada, el conector no está instalado con Azure IoT MQ. Debe habilitarse explícitamente con la asignación de temas y las credenciales de autenticación especificadas. Siga estos pasos para habilitar la comunicación bidireccional entre IoT MQ y Azure Event Hubs a través de su punto de conexión de Kafka.

  1. Cree un espacio de nombres de Event Hubs.

  2. Cree un centro de eventos para cada tema de Kafka.

Concesión del acceso del conector al espacio de nombres de Event Hubs

Conceder acceso a la extensión de Arc IoT MQ a un espacio de nombres de Event Hubs es la manera más cómoda de establecer una conexión segura desde el conector Kakfa de IoT MQ a Event Hubs.

Guarde la siguiente plantilla de Bicep en un archivo y aplíquela con la CLI de Azure después de establecer los parámetros válidos para su entorno:

Nota:

La plantilla de Bicep supone que el clúster conectado a Arc y el espacio de nombres de Event Hubs están en el mismo grupo de recursos, ajuste la plantilla si el entorno es diferente.

@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'

resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
  name: clusterName
}

resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
  name: mqExtensionName
  scope: connectedCluster
}

resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
  name: eventHubNamespaceName
}

// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
  scope: ehNamespace
  properties: {
     // ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}

// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
  scope: ehNamespace
  properties: {
    // ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}
# Set the required environment variables

# Resource group for resources
RESOURCE_GROUP=xxx

# Bicep template files name
TEMPLATE_FILE_NAME=xxx

# MQ Arc extension name
MQ_EXTENSION_NAME=xxx

# Arc connected cluster name
CLUSTER_NAME=xxx

# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx


az deployment group create \
      --name assign-RBAC-roles \
      --resource-group $RESOURCE_GROUP \
      --template-file $TEMPLATE_FILE_NAME \
      --parameters mqExtensionName=$MQ_EXTENSION_NAME \
      --parameters clusterName=$CLUSTER_NAME \
      --parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE

KafkaConnector

El recurso personalizado (CR) de KafkaConnector permite configurar un conector de Kafka que pueda comunicar un host de Kafka y Event Hubs. El conector de Kafka puede transferir datos entre temas MQTT y temas de Kafka mediante Event Hubs como punto de conexión compatible con Kafka.

En el ejemplo siguiente se muestra un recurso personalizado de KafkaConnector que se conecta a un punto de conexión de Event Hubs mediante distintos tipos de autenticación. Se supone que otros recursos de MQ se instalaron mediante el inicio rápido:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
  name: my-eh-connector
  namespace: azure-iot-operations # same as one used for other MQ resources
spec:
  image:
    pullPolicy: IfNotPresent
    repository: mcr.microsoft.com/azureiotoperations/kafka
    tag: 0.4.0-preview
  instances: 2
  clientIdPrefix: my-prefix
  kafkaConnection:
    # Port 9093 is Event Hubs' Kakfa endpoint
    # Plug in your Event Hubs namespace name
    endpoint: <NAMESPACE>.servicebus.windows.net:9093
    tls:
      tlsEnabled: true
    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          # plugin in your Event Hubs namespace name
          audience: "https://<NAMESPACE>.servicebus.windows.net" 
  localBrokerConnection:
    endpoint: "aio-mq-dmqtt-frontend:8883"
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
    authentication:
      kubernetes: {}

En la tabla siguiente, se describen los campos del recurso personalizado de KafkaConnector:

Campo Descripción Obligatorio
image La imagen del conector Kafka. Puede especificar pullPolicy, repository y tag de la imagen. Los valores predeterminados se muestran en el ejemplo anterior.
instances Número de instancias del conector de Kafka que se va a ejecutar.
clientIdPrefix Cadena que se va a anteponer a un identificador de cliente usado por el conector. No
kafkaConnection Detalles de conexión del punto de conexión de Event Hubs. Consulte Conexión de Kafka.
localBrokerConnection Detalles de conexión del agente local que invalida la conexión de agente predeterminada. Consulte Administrar la conexión de agente local. No
logLevel Nivel de registro del conector de Kafka. Los valores posibles son: trace, debug, info, warn, error, o fatal. El valor predeterminado es warn. No

Conexión de Kafka

El campo kafkaConnection define los detalles de conexión del punto de conexión de Kafka.

Campo Descripción Obligatorio
endpoint El host y el puerto del punto de conexión de Event Hubs. El puerto suele ser 9093. Puede especificar varios puntos de conexión separados por comas para usar la sintaxis de los servidores de arranque.
tls Configuración del cifrado TLS. Consulte TLS.
Autenticación Configuración de la autenticación. Consulte Autenticación. No

TLS

El campo tls habilita el cifrado TLS para la conexión y, de manera opcional, especifica una asignación de configuración de CA.

Campo Descripción Obligatorio
tlsEnabled Valor booleano que indica si el cifrado TLS está habilitado o no. Debe establecerse en true para la comunicación de Event Hubs.
trustedCaCertificateConfigMap Nombre del mapa de configuración que contiene el certificado CA para comprobar la identidad del servidor. Este campo no es necesario para la comunicación de Event Hubs, ya que Event Hubs usa CA conocidos y de confianza de forma predeterminada. Sin embargo, puede usar este campo si desea usar un certificado CA personalizado. No

Al especificar un CA de confianza, cree un ConfigMap que contenga la poción pública del CA en formato PEM y especifique el nombre en la propiedad trustedCaCertificateConfigMap.

kubectl create configmap ca-pem --from-file path/to/ca.pem

Autenticación

El campo de autenticación admite diferentes tipos de métodos de autenticación, como SASL, X509 o identidad administrada.

Campo Descripción Obligatorio
enabled Valor booleano que indica si la autenticación está habilitada o no.
authType Campo que contiene el tipo de autenticación usado. Consulte Tipo de autenticación
Tipo de autenticación
Campo Descripción Obligatorio
sasl Configuración de la autenticación SASL. Especifique saslType, que puede ser plain, scramSha256o scramSha512, y token para hacer referencia al secreto de Kubernetes secretName o Azure Key Vault keyVault que contiene la contraseña. Sí, si usa la autenticación SASL
systemAssignedManagedIdentity Configuración de la autenticación de identidad administrada. Especifique la audiencia de la solicitud de token, que debe coincidir con el espacio de nombres de Event Hubs (https://<NAMESPACE>.servicebus.windows.net) porque el conector es un cliente de Kafka. Se crea automáticamente una identidad administrada asignada por el sistema y se asigna al conector cuando está habilitada. Sí, si usa la autenticación de identidad administrada
x509 Configuración de la autenticación X509. Especifique el campo secretName o keyVault. El campo secretName es el nombre del secreto que contiene el certificado de cliente y la clave de cliente en formato PEM, almacenados como un secreto TLS. Sí, si usa la autenticación X509

Para aprender a usar Azure Key Vault y keyVault para administrar secretos de Azure IoT MQ en lugar de secretos de Kubernetes, consulte Administración de secretos mediante Azure Key Vault o secretos de Kubernetes.

Autenticación en Event Hubs

Para usar la identidad administrada, especifíquela como el único método en autenticación. También debe asignar un rol a la identidad administrada que concede permiso para enviar y recibir mensajes de Event Hubs, como el propietario de datos de Azure Event Hubs o el remitente o receptor de datos de Azure Event Hubs. Para más información, consulte Autenticación de una aplicación con Microsoft Entra ID para acceder a los recursos de Event Hubs.

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
  name: my-eh-connector
  namespace: azure-iot-operations # same as one used for other MQ resources
spec:
  image:
    pullPolicy: IfNotPresent
    repository: mcr.microsoft.com/azureiotoperations/kafka
    tag: 0.4.0-preview
  instances: 2
  clientIdPrefix: my-prefix
  kafkaConnection:
    # Port 9093 is Event Hubs' Kakfa endpoint
    # Plug in your Event Hubs namespace name
    endpoint: <NAMESPACE>.servicebus.windows.net:9093
    tls:
      tlsEnabled: true
    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          # plugin in your Event Hubs namespace name
          audience: "https://<NAMESPACE>.servicebus.windows.net" 
  localBrokerConnection:
    endpoint: "aio-mq-dmqtt-frontend:8883"
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
    authentication:
      kubernetes: {}

Administración de la conexión de agente local

Al igual que el puente MQTT, el conector de Event Hubs actúa como un cliente para el MQTT broker de IoT MQ. Si ha personalizado el puerto de escucha o la autenticación del MQTT broker de IoT MQ, reemplace también la configuración de conexión MQTT local para el conector de Event Hubs. Para más información, consulte conexión de agente local del puente MQTT.

KafkaConnectorTopicMap

El recurso personalizado KafkaConnectorTopicMap (CR) permite definir la asignación entre temas MQTT y temas de Kafka para la transferencia de datos bidireccional. Especifique una referencia a una CR de KafkaConnector y una lista de rutas. Cada ruta puede ser una ruta MQTT a Kafka o una ruta de Kafka a MQTT. Por ejemplo:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
  name: my-eh-topic-map
  namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
  kafkaConnectorRef: my-eh-connector
  compression: none
  batching:
    enabled: true
    latencyMs: 1000
    maxMessages: 100
    maxBytes: 1024
  partitionStrategy: property
  partitionKeyProperty: device-id
  copyMqttProperties: true
  routes:
    # Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
    - mqttToKafka:
        name: "route1"
        mqttTopic: temperature-alerts/#
        kafkaTopic: receiving-event-hub
        kafkaAcks: one
        qos: 1
        sharedSubscription:
          groupName: group1
          groupMinimumShareNumber: 3
    # Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
    - kafkaToMqtt:
        name: "route2"
        consumerGroupId: mqConnector
        kafkaTopic: sending-event-hub
        mqttTopic: heater-commands
        qos: 0

En la tabla siguiente, se describen los campos del CR KafkaConnectorTopicMap:

Campo Descripción Obligatorio
kafkaConnectorRef Nombre del CR de KafkaConnector al que pertenece este mapa de tema.
compression Configuración para la compresión de los mensajes enviados a temas de Kafka. Consulte Compresión. No
procesamiento por lotes Configuración para el procesamiento por lotes de los mensajes enviados a temas de Kafka. Consulte Procesamiento por lotes. No
partitionStrategy Estrategia para controlar particiones de Kafka al enviar mensajes a temas de Kafka. Consulte Estrategia de control de particiones. No
copyMqttProperties Valor booleano para controlar si las propiedades del usuario y del sistema MQTT se copian en el encabezado de mensaje de Kafka. Las propiedades del usuario se copian tal como están. Algunas transformaciones se realizan con las propiedades del sistema. El valor predeterminado es false. No
rutas Una lista de rutas para la transferencia de datos entre temas MQTT y temas de Kafka. Cada ruta puede tener un campo mqttToKafka o kafkaToMqtt, en función de la dirección de la transferencia de datos. Consulte Rutas.

Compresión

El campo de compresión habilita la compresión para los mensajes enviados a temas de Kafka. La compresión ayuda a reducir el ancho de banda de red y el espacio de almacenamiento necesarios para la transferencia de datos. Sin embargo, la compresión también agrega cierta sobrecarga y latencia al proceso. Los valores de los tipos de compresión y la compatibilidad se enumeran en la tabla siguiente.

Valor Descripción Compatible
None No se aplica ninguna compresión ni procesamiento por lotes. none es el valor predeterminado si no se especifica ninguna compresión.
gzip Se aplican la compresión y el procesamiento por lotes de GZIP. GZIP es un algoritmo de compresión de uso general que ofrece un buen equilibrio entre la relación de compresión y la velocidad. Sí. Se requiere el plan de tarifa Event Hubs Premium para la compresión GZIP.
snappy Se aplican la compresión y el procesamiento por lotes de Snappy. Snappy es un algoritmo de compresión rápido que ofrece una relación de compresión moderada y velocidad. No es compatible con Azure Event Hubs. Use Apache Kafka.
lz4 Se aplican la compresión y el procesamiento por lotes LZ4. LZ4 es un algoritmo de compresión rápido que ofrece una relación de compresión baja y alta velocidad. No es compatible con Azure Event Hubs. Use Apache Kafka.

Procesamiento por lotes

Además de la compresión, también puede configurar el procesamiento por lotes para los mensajes antes de enviarlos a temas de Kafka. El procesamiento por lotes permite agrupar varios mensajes y comprimirlos como una sola unidad, lo que puede mejorar la eficacia de compresión y reducir la sobrecarga de red.

Campo Descripción Obligatorio
enabled Valor booleano que indica si el procesamiento por lotes está habilitado o no. Si no se establece, el valor predeterminado es false.
latencyMs Intervalo de tiempo máximo en milisegundos que los mensajes se pueden almacenar en búfer antes de enviarse. Si se alcanza este intervalo, todos los mensajes almacenados en búfer se envían como un lote, independientemente de su cantidad o tamaño. Si no se establece, el valor predeterminado es 5. No
maxMessages Número máximo de mensajes que se pueden almacenar en búfer antes de enviarlos. Si se alcanza este número, todos los mensajes almacenados en búfer se envían como un lote, independientemente del tamaño que tengan o de cuánto tiempo se almacenan en búfer. Si no se establece, el valor predeterminado es 100 000. No
maxBytes Tamaño máximo en bytes que se pueden almacenar en el búfer antes de enviarlos. Si se alcanza este tamaño, todos los mensajes almacenados en búfer se envían como un lote, independientemente de cuántos sean o de cuánto tiempo se almacenan en búfer. El valor predeterminado es 100 0000 (1 MB). No

Un ejemplo de uso del procesamiento por lotes es:

batching:
  enabled: true
  latencyMs: 1000
  maxMessages: 100
  maxBytes: 1024

Esto significa que los mensajes se envían cuando hay 100 mensajes en el búfer, o cuando hay 1024 bytes en el búfer, o cuando transcurren 1000 milisegundos desde el último envío, lo que ocurra primero.

Estrategia de control de particiones

La estrategia de control de particiones es una característica que permite controlar cómo se asignan los mensajes a las particiones de Kafka al enviarlos a temas de Kafka. Las particiones de Kafka son segmentos lógicos de un tema de Kafka que habilitan el procesamiento paralelo y la tolerancia a errores. Cada mensaje de un tema de Kafka tiene una partición y un desplazamiento que se usan para identificar y ordenar los mensajes.

De forma predeterminada, el conector de Kafka asigna mensajes a particiones aleatorias mediante un algoritmo round robin. Sin embargo, puede usar diferentes estrategias para asignar mensajes a particiones en función de algunos criterios, como el nombre del tema MQTT o una propiedad de mensaje MQTT. Esto puede ayudarle a lograr un mejor equilibrio de carga, localidad de datos o ordenación de mensajes.

Valor Descripción
default Asigna mensajes a particiones aleatorias mediante un algoritmo round robin. Es el valor predeterminado si no se especifica ninguna estrategia.
static Asigna mensajes a un número fijo de partición derivado del identificador de instancia del conector. Esto significa que cada instancia del conector envía mensajes a una partición diferente. Esto puede ayudar a lograr un mejor equilibrio de carga y localidad de datos.
topic Usa el nombre del tema MQTT como clave para la creación de particiones. Esto significa que los mensajes con el mismo nombre de tema MQTT se envían a la misma partición. Esto puede ayudar a lograr una mejor ordenación de mensajes y localidad de datos.
propiedad Usa una propiedad de mensaje MQTT como clave para la creación de particiones. Especifique el nombre de la propiedad en el campo partitionKeyProperty. Esto significa que los mensajes con el mismo valor de propiedad se envían a la misma partición. Esto puede ayudar a lograr una mejor ordenación de mensajes y la localidad de datos en función de un criterio personalizado.

Un ejemplo de uso de la estrategia de control de particiones es:

partitionStrategy: property
partitionKeyProperty: device-id

Esto significa que los mensajes con la misma propiedad device-id se envían a la misma partición.

Rutas

El campo routes define una lista de rutas para la transferencia de datos entre temas MQTT y temas de Kafka. Cada ruta puede tener un campo mqttToKafka o kafkaToMqtt, en función de la dirección de la transferencia de datos.

MQTT a Kafka

El campo mqttToKafka define una ruta que transfiere datos de un tema MQTT a un tema de Kafka.

Campo Descripción Obligatorio
name Nombre único de la ruta.
mqttTopic El tema MQTT desde el que suscribirse. Puede usar caracteres comodín (# y +) para buscar coincidencias con varios temas.
kafkaTopic El tema de Kafka al que se va a enviar.
kafkaAcks El número de confirmaciones que requiere el conector desde el punto de conexión de Kafka. Los valores posibles son zero, one o all. No
qos Nivel de calidad de servicio (QoS) para la suscripción del tema MQTT. Los valores posibles son: 0 o 1 (valor predeterminado). QoS 2 no se admite actualmente.
sharedSubscription Configuración para el uso de suscripciones compartidas para temas de MQTT. Especifique groupName, que es un identificador único para un grupo de suscriptores y groupMinimumShareNumber, que es el número de suscriptores de un grupo que recibe mensajes de un tema. Por ejemplo, si groupName es "group1" y groupMinimumShareNumber es 3, el conector crea tres suscriptores con el mismo nombre de grupo para recibir mensajes de un tema. Esta característica permite distribuir mensajes entre varios suscriptores sin perder mensajes ni crear duplicados. No

Ejemplo de uso de ruta mqttToKafka:

mqttToKafka:
  mqttTopic: temperature-alerts/#
  kafkaTopic: receiving-event-hub
  kafkaAcks: one
  qos: 1
  sharedSubscription:
    groupName: group1
    groupMinimumShareNumber: 3

En este ejemplo, los mensajes de temas MQTT que coinciden con alertas de temperatura/# se envían al tema de Kafka receiving-event-hub con QoS equivalente a 1 y grupo de suscripciones compartido "group1" con el número de recurso compartido 3.

Kafka a MQTT

El campo kafkaToMqtt define una ruta que transfiere datos de un tema de Kafka a un tema MQTT.

Campo Descripción Obligatorio
name Nombre único de la ruta.
kafkaTopic El tema de Kafka del que se va a extraer.
mqttTopic El tema MQTT en el que se va a publicar.
consumerGroupId Prefijo del identificador del grupo de consumidores para cada ruta de Kafka a MQTT. Si no se establece, el identificador del grupo de consumidores se establece en el mismo que el nombre de ruta. No
qos Nivel de calidad de servicio (QoS) para los mensajes publicados en el tema MQTT. Los valores posibles son 0 o 1 (valor predeterminado). QoS 2 no se admite actualmente. Si QoS está establecido en 1, el conector publica el mensaje en el tema MQTT y, a continuación, espera la confirmación antes de confirmar el mensaje en Kafka. En el caso de QoS 0, el conector se confirma inmediatamente sin la confirmación MQTT. No

Ejemplo de uso de ruta kafkaToMqtt:

kafkaToMqtt:
  kafkaTopic: sending-event-hub
  mqttTopic: heater-commands
  qos: 0

En este ejemplo, los mensajes del tema de Kafka sending-event-hub se publican en el tema de MQTT heater-commands con el nivel 0 de QoS.

El nombre del centro de eventos debe coincidir con el tema de Kafka

Cada centro de eventos individual (no el espacio de nombres) debe denominarse exactamente igual que el tema de Kafka previsto especificado en las rutas. Además, la cadena de conexión EntityPath debe coincidir si la cadena de conexión tiene como ámbito un centro de eventos. Este requisito se debe a que el espacio de nombres de Event Hubs es análogo al clúster de Kafka y el nombre del centro de eventos es análogo a un tema de Kafka, por lo que el nombre del tema de Kafka debe coincidir con el nombre del centro de eventos.

Desplazamientos de grupos de consumidores de Kafka

Si el conector se desconecta o se quita y se vuelve a instalar con el mismo identificador de grupo de consumidores de Kafka, el desplazamiento del grupo de consumidores (la última posición desde donde los mensajes de lectura del consumidor de Kafka) se almacena en Azure Event Hubs. Para más información, consulte Grupo de consumidores de Event Hubs frente a grupo de consumidores de Kafka.

Versión de MQTT

Este conector solo usa MQTT v5.

Publicar y suscribir mensajes MQTT mediante Azure IoT MQ Preview