Envío y recepción de mensajes entre Azure IoT MQ (versión preliminar) y Azure Event Hubs o Kafka
Importante
Operaciones de IoT de Azure, habilitado por Azure Arc, está actualmente en VERSIÓN PRELIMINAR. No se debería usar este software en versión preliminar en entornos de producción.
Consulte Términos de uso complementarios para las versiones preliminares de Microsoft Azure para conocer los términos legales que se aplican a las características de Azure que se encuentran en la versión beta, en versión preliminar o que todavía no se han publicado para que estén disponibles con carácter general.
El conector de Kafka inserta mensajes del agente MQTT de Azure IoT MQ (versión preliminar) en un punto de conexión de Kafka y, de forma similar, extrae mensajes de manera consecuente. Dado que Azure Event Hubs admite la API de Kafka, el conector funciona perfectamente con Event Hubs.
Configuración del conector de Event Hubs mediante el punto de conexión de Kafka
De forma predeterminada, el conector no está instalado con Azure IoT MQ. Debe habilitarse explícitamente con la asignación de temas y las credenciales de autenticación especificadas. Siga estos pasos para habilitar la comunicación bidireccional entre IoT MQ y Azure Event Hubs a través de su punto de conexión de Kafka.
Cree un centro de eventos para cada tema de Kafka.
Concesión del acceso del conector al espacio de nombres de Event Hubs
Conceder acceso a la extensión de Arc IoT MQ a un espacio de nombres de Event Hubs es la manera más cómoda de establecer una conexión segura desde el conector Kakfa de IoT MQ a Event Hubs.
Guarde la siguiente plantilla de Bicep en un archivo y aplíquela con la CLI de Azure después de establecer los parámetros válidos para su entorno:
Nota:
La plantilla de Bicep supone que el clúster conectado a Arc y el espacio de nombres de Event Hubs están en el mismo grupo de recursos, ajuste la plantilla si el entorno es diferente.
@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'
resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
name: clusterName
}
resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
name: mqExtensionName
scope: connectedCluster
}
resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
name: eventHubNamespaceName
}
// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
scope: ehNamespace
properties: {
// ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde')
principalId: mqExtension.identity.principalId
principalType: 'ServicePrincipal'
}
}
// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
scope: ehNamespace
properties: {
// ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975')
principalId: mqExtension.identity.principalId
principalType: 'ServicePrincipal'
}
}
# Set the required environment variables
# Resource group for resources
RESOURCE_GROUP=xxx
# Bicep template files name
TEMPLATE_FILE_NAME=xxx
# MQ Arc extension name
MQ_EXTENSION_NAME=xxx
# Arc connected cluster name
CLUSTER_NAME=xxx
# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx
az deployment group create \
--name assign-RBAC-roles \
--resource-group $RESOURCE_GROUP \
--template-file $TEMPLATE_FILE_NAME \
--parameters mqExtensionName=$MQ_EXTENSION_NAME \
--parameters clusterName=$CLUSTER_NAME \
--parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE
KafkaConnector
El recurso personalizado (CR) de KafkaConnector permite configurar un conector de Kafka que pueda comunicar un host de Kafka y Event Hubs. El conector de Kafka puede transferir datos entre temas MQTT y temas de Kafka mediante Event Hubs como punto de conexión compatible con Kafka.
En el ejemplo siguiente se muestra un recurso personalizado de KafkaConnector que se conecta a un punto de conexión de Event Hubs mediante distintos tipos de autenticación. Se supone que otros recursos de MQ se instalaron mediante el inicio rápido:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
name: my-eh-connector
namespace: azure-iot-operations # same as one used for other MQ resources
spec:
image:
pullPolicy: IfNotPresent
repository: mcr.microsoft.com/azureiotoperations/kafka
tag: 0.4.0-preview
instances: 2
clientIdPrefix: my-prefix
kafkaConnection:
# Port 9093 is Event Hubs' Kakfa endpoint
# Plug in your Event Hubs namespace name
endpoint: <NAMESPACE>.servicebus.windows.net:9093
tls:
tlsEnabled: true
authentication:
enabled: true
authType:
systemAssignedManagedIdentity:
# plugin in your Event Hubs namespace name
audience: "https://<NAMESPACE>.servicebus.windows.net"
localBrokerConnection:
endpoint: "aio-mq-dmqtt-frontend:8883"
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
authentication:
kubernetes: {}
En la tabla siguiente, se describen los campos del recurso personalizado de KafkaConnector:
Campo | Descripción | Obligatorio |
---|---|---|
image | La imagen del conector Kafka. Puede especificar pullPolicy , repository y tag de la imagen. Los valores predeterminados se muestran en el ejemplo anterior. |
Sí |
instances | Número de instancias del conector de Kafka que se va a ejecutar. | Sí |
clientIdPrefix | Cadena que se va a anteponer a un identificador de cliente usado por el conector. | No |
kafkaConnection | Detalles de conexión del punto de conexión de Event Hubs. Consulte Conexión de Kafka. | Sí |
localBrokerConnection | Detalles de conexión del agente local que invalida la conexión de agente predeterminada. Consulte Administrar la conexión de agente local. | No |
logLevel | Nivel de registro del conector de Kafka. Los valores posibles son: trace, debug, info, warn, error, o fatal. El valor predeterminado es warn. | No |
Conexión de Kafka
El campo kafkaConnection
define los detalles de conexión del punto de conexión de Kafka.
Campo | Descripción | Obligatorio |
---|---|---|
endpoint | El host y el puerto del punto de conexión de Event Hubs. El puerto suele ser 9093. Puede especificar varios puntos de conexión separados por comas para usar la sintaxis de los servidores de arranque. | Sí |
tls | Configuración del cifrado TLS. Consulte TLS. | Sí |
Autenticación | Configuración de la autenticación. Consulte Autenticación. | No |
TLS
El campo tls
habilita el cifrado TLS para la conexión y, de manera opcional, especifica una asignación de configuración de CA.
Campo | Descripción | Obligatorio |
---|---|---|
tlsEnabled | Valor booleano que indica si el cifrado TLS está habilitado o no. Debe establecerse en true para la comunicación de Event Hubs. | Sí |
trustedCaCertificateConfigMap | Nombre del mapa de configuración que contiene el certificado CA para comprobar la identidad del servidor. Este campo no es necesario para la comunicación de Event Hubs, ya que Event Hubs usa CA conocidos y de confianza de forma predeterminada. Sin embargo, puede usar este campo si desea usar un certificado CA personalizado. | No |
Al especificar un CA de confianza, cree un ConfigMap que contenga la poción pública del CA en formato PEM y especifique el nombre en la propiedad trustedCaCertificateConfigMap
.
kubectl create configmap ca-pem --from-file path/to/ca.pem
Autenticación
El campo de autenticación admite diferentes tipos de métodos de autenticación, como SASL, X509 o identidad administrada.
Campo | Descripción | Obligatorio |
---|---|---|
enabled | Valor booleano que indica si la autenticación está habilitada o no. | Sí |
authType | Campo que contiene el tipo de autenticación usado. Consulte Tipo de autenticación | Sí |
Tipo de autenticación
Campo | Descripción | Obligatorio |
---|---|---|
sasl | Configuración de la autenticación SASL. Especifique saslType , que puede ser plain, scramSha256o scramSha512, y token para hacer referencia al secreto de Kubernetes secretName o Azure Key Vault keyVault que contiene la contraseña. |
Sí, si usa la autenticación SASL |
systemAssignedManagedIdentity | Configuración de la autenticación de identidad administrada. Especifique la audiencia de la solicitud de token, que debe coincidir con el espacio de nombres de Event Hubs (https://<NAMESPACE>.servicebus.windows.net ) porque el conector es un cliente de Kafka. Se crea automáticamente una identidad administrada asignada por el sistema y se asigna al conector cuando está habilitada. |
Sí, si usa la autenticación de identidad administrada |
x509 | Configuración de la autenticación X509. Especifique el campo secretName o keyVault . El campo secretName es el nombre del secreto que contiene el certificado de cliente y la clave de cliente en formato PEM, almacenados como un secreto TLS. |
Sí, si usa la autenticación X509 |
Para aprender a usar Azure Key Vault y keyVault
para administrar secretos de Azure IoT MQ en lugar de secretos de Kubernetes, consulte Administración de secretos mediante Azure Key Vault o secretos de Kubernetes.
Autenticación en Event Hubs
Para usar la identidad administrada, especifíquela como el único método en autenticación. También debe asignar un rol a la identidad administrada que concede permiso para enviar y recibir mensajes de Event Hubs, como el propietario de datos de Azure Event Hubs o el remitente o receptor de datos de Azure Event Hubs. Para más información, consulte Autenticación de una aplicación con Microsoft Entra ID para acceder a los recursos de Event Hubs.
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
name: my-eh-connector
namespace: azure-iot-operations # same as one used for other MQ resources
spec:
image:
pullPolicy: IfNotPresent
repository: mcr.microsoft.com/azureiotoperations/kafka
tag: 0.4.0-preview
instances: 2
clientIdPrefix: my-prefix
kafkaConnection:
# Port 9093 is Event Hubs' Kakfa endpoint
# Plug in your Event Hubs namespace name
endpoint: <NAMESPACE>.servicebus.windows.net:9093
tls:
tlsEnabled: true
authentication:
enabled: true
authType:
systemAssignedManagedIdentity:
# plugin in your Event Hubs namespace name
audience: "https://<NAMESPACE>.servicebus.windows.net"
localBrokerConnection:
endpoint: "aio-mq-dmqtt-frontend:8883"
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
authentication:
kubernetes: {}
Administración de la conexión de agente local
Al igual que el puente MQTT, el conector de Event Hubs actúa como un cliente para el MQTT broker de IoT MQ. Si ha personalizado el puerto de escucha o la autenticación del MQTT broker de IoT MQ, reemplace también la configuración de conexión MQTT local para el conector de Event Hubs. Para más información, consulte conexión de agente local del puente MQTT.
KafkaConnectorTopicMap
El recurso personalizado KafkaConnectorTopicMap (CR) permite definir la asignación entre temas MQTT y temas de Kafka para la transferencia de datos bidireccional. Especifique una referencia a una CR de KafkaConnector y una lista de rutas. Cada ruta puede ser una ruta MQTT a Kafka o una ruta de Kafka a MQTT. Por ejemplo:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
name: my-eh-topic-map
namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
kafkaConnectorRef: my-eh-connector
compression: none
batching:
enabled: true
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
partitionStrategy: property
partitionKeyProperty: device-id
copyMqttProperties: true
routes:
# Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
- mqttToKafka:
name: "route1"
mqttTopic: temperature-alerts/#
kafkaTopic: receiving-event-hub
kafkaAcks: one
qos: 1
sharedSubscription:
groupName: group1
groupMinimumShareNumber: 3
# Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
- kafkaToMqtt:
name: "route2"
consumerGroupId: mqConnector
kafkaTopic: sending-event-hub
mqttTopic: heater-commands
qos: 0
En la tabla siguiente, se describen los campos del CR KafkaConnectorTopicMap:
Campo | Descripción | Obligatorio |
---|---|---|
kafkaConnectorRef | Nombre del CR de KafkaConnector al que pertenece este mapa de tema. | Sí |
compression | Configuración para la compresión de los mensajes enviados a temas de Kafka. Consulte Compresión. | No |
procesamiento por lotes | Configuración para el procesamiento por lotes de los mensajes enviados a temas de Kafka. Consulte Procesamiento por lotes. | No |
partitionStrategy | Estrategia para controlar particiones de Kafka al enviar mensajes a temas de Kafka. Consulte Estrategia de control de particiones. | No |
copyMqttProperties | Valor booleano para controlar si las propiedades del usuario y del sistema MQTT se copian en el encabezado de mensaje de Kafka. Las propiedades del usuario se copian tal como están. Algunas transformaciones se realizan con las propiedades del sistema. El valor predeterminado es false. | No |
rutas | Una lista de rutas para la transferencia de datos entre temas MQTT y temas de Kafka. Cada ruta puede tener un campo mqttToKafka o kafkaToMqtt , en función de la dirección de la transferencia de datos. Consulte Rutas. |
Sí |
Compresión
El campo de compresión habilita la compresión para los mensajes enviados a temas de Kafka. La compresión ayuda a reducir el ancho de banda de red y el espacio de almacenamiento necesarios para la transferencia de datos. Sin embargo, la compresión también agrega cierta sobrecarga y latencia al proceso. Los valores de los tipos de compresión y la compatibilidad se enumeran en la tabla siguiente.
Valor | Descripción | Compatible |
---|---|---|
None | No se aplica ninguna compresión ni procesamiento por lotes. none es el valor predeterminado si no se especifica ninguna compresión. | Sí |
gzip | Se aplican la compresión y el procesamiento por lotes de GZIP. GZIP es un algoritmo de compresión de uso general que ofrece un buen equilibrio entre la relación de compresión y la velocidad. | Sí. Se requiere el plan de tarifa Event Hubs Premium para la compresión GZIP. |
snappy | Se aplican la compresión y el procesamiento por lotes de Snappy. Snappy es un algoritmo de compresión rápido que ofrece una relación de compresión moderada y velocidad. | No es compatible con Azure Event Hubs. Use Apache Kafka. |
lz4 | Se aplican la compresión y el procesamiento por lotes LZ4. LZ4 es un algoritmo de compresión rápido que ofrece una relación de compresión baja y alta velocidad. | No es compatible con Azure Event Hubs. Use Apache Kafka. |
Procesamiento por lotes
Además de la compresión, también puede configurar el procesamiento por lotes para los mensajes antes de enviarlos a temas de Kafka. El procesamiento por lotes permite agrupar varios mensajes y comprimirlos como una sola unidad, lo que puede mejorar la eficacia de compresión y reducir la sobrecarga de red.
Campo | Descripción | Obligatorio |
---|---|---|
enabled | Valor booleano que indica si el procesamiento por lotes está habilitado o no. Si no se establece, el valor predeterminado es false. | Sí |
latencyMs | Intervalo de tiempo máximo en milisegundos que los mensajes se pueden almacenar en búfer antes de enviarse. Si se alcanza este intervalo, todos los mensajes almacenados en búfer se envían como un lote, independientemente de su cantidad o tamaño. Si no se establece, el valor predeterminado es 5. | No |
maxMessages | Número máximo de mensajes que se pueden almacenar en búfer antes de enviarlos. Si se alcanza este número, todos los mensajes almacenados en búfer se envían como un lote, independientemente del tamaño que tengan o de cuánto tiempo se almacenan en búfer. Si no se establece, el valor predeterminado es 100 000. | No |
maxBytes | Tamaño máximo en bytes que se pueden almacenar en el búfer antes de enviarlos. Si se alcanza este tamaño, todos los mensajes almacenados en búfer se envían como un lote, independientemente de cuántos sean o de cuánto tiempo se almacenan en búfer. El valor predeterminado es 100 0000 (1 MB). | No |
Un ejemplo de uso del procesamiento por lotes es:
batching:
enabled: true
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
Esto significa que los mensajes se envían cuando hay 100 mensajes en el búfer, o cuando hay 1024 bytes en el búfer, o cuando transcurren 1000 milisegundos desde el último envío, lo que ocurra primero.
Estrategia de control de particiones
La estrategia de control de particiones es una característica que permite controlar cómo se asignan los mensajes a las particiones de Kafka al enviarlos a temas de Kafka. Las particiones de Kafka son segmentos lógicos de un tema de Kafka que habilitan el procesamiento paralelo y la tolerancia a errores. Cada mensaje de un tema de Kafka tiene una partición y un desplazamiento que se usan para identificar y ordenar los mensajes.
De forma predeterminada, el conector de Kafka asigna mensajes a particiones aleatorias mediante un algoritmo round robin. Sin embargo, puede usar diferentes estrategias para asignar mensajes a particiones en función de algunos criterios, como el nombre del tema MQTT o una propiedad de mensaje MQTT. Esto puede ayudarle a lograr un mejor equilibrio de carga, localidad de datos o ordenación de mensajes.
Valor | Descripción |
---|---|
default | Asigna mensajes a particiones aleatorias mediante un algoritmo round robin. Es el valor predeterminado si no se especifica ninguna estrategia. |
static | Asigna mensajes a un número fijo de partición derivado del identificador de instancia del conector. Esto significa que cada instancia del conector envía mensajes a una partición diferente. Esto puede ayudar a lograr un mejor equilibrio de carga y localidad de datos. |
topic | Usa el nombre del tema MQTT como clave para la creación de particiones. Esto significa que los mensajes con el mismo nombre de tema MQTT se envían a la misma partición. Esto puede ayudar a lograr una mejor ordenación de mensajes y localidad de datos. |
propiedad | Usa una propiedad de mensaje MQTT como clave para la creación de particiones. Especifique el nombre de la propiedad en el campo partitionKeyProperty . Esto significa que los mensajes con el mismo valor de propiedad se envían a la misma partición. Esto puede ayudar a lograr una mejor ordenación de mensajes y la localidad de datos en función de un criterio personalizado. |
Un ejemplo de uso de la estrategia de control de particiones es:
partitionStrategy: property
partitionKeyProperty: device-id
Esto significa que los mensajes con la misma propiedad device-id se envían a la misma partición.
Rutas
El campo routes define una lista de rutas para la transferencia de datos entre temas MQTT y temas de Kafka. Cada ruta puede tener un campo mqttToKafka
o kafkaToMqtt
, en función de la dirección de la transferencia de datos.
MQTT a Kafka
El campo mqttToKafka
define una ruta que transfiere datos de un tema MQTT a un tema de Kafka.
Campo | Descripción | Obligatorio |
---|---|---|
name | Nombre único de la ruta. | Sí |
mqttTopic | El tema MQTT desde el que suscribirse. Puede usar caracteres comodín (# y + ) para buscar coincidencias con varios temas. |
Sí |
kafkaTopic | El tema de Kafka al que se va a enviar. | Sí |
kafkaAcks | El número de confirmaciones que requiere el conector desde el punto de conexión de Kafka. Los valores posibles son zero , one o all . |
No |
qos | Nivel de calidad de servicio (QoS) para la suscripción del tema MQTT. Los valores posibles son: 0 o 1 (valor predeterminado). QoS 2 no se admite actualmente. | Sí |
sharedSubscription | Configuración para el uso de suscripciones compartidas para temas de MQTT. Especifique groupName , que es un identificador único para un grupo de suscriptores y groupMinimumShareNumber , que es el número de suscriptores de un grupo que recibe mensajes de un tema. Por ejemplo, si groupName es "group1" y groupMinimumShareNumber es 3, el conector crea tres suscriptores con el mismo nombre de grupo para recibir mensajes de un tema. Esta característica permite distribuir mensajes entre varios suscriptores sin perder mensajes ni crear duplicados. |
No |
Ejemplo de uso de ruta mqttToKafka
:
mqttToKafka:
mqttTopic: temperature-alerts/#
kafkaTopic: receiving-event-hub
kafkaAcks: one
qos: 1
sharedSubscription:
groupName: group1
groupMinimumShareNumber: 3
En este ejemplo, los mensajes de temas MQTT que coinciden con alertas de temperatura/# se envían al tema de Kafka receiving-event-hub con QoS equivalente a 1 y grupo de suscripciones compartido "group1" con el número de recurso compartido 3.
Kafka a MQTT
El campo kafkaToMqtt
define una ruta que transfiere datos de un tema de Kafka a un tema MQTT.
Campo | Descripción | Obligatorio |
---|---|---|
name | Nombre único de la ruta. | Sí |
kafkaTopic | El tema de Kafka del que se va a extraer. | Sí |
mqttTopic | El tema MQTT en el que se va a publicar. | Sí |
consumerGroupId | Prefijo del identificador del grupo de consumidores para cada ruta de Kafka a MQTT. Si no se establece, el identificador del grupo de consumidores se establece en el mismo que el nombre de ruta. | No |
qos | Nivel de calidad de servicio (QoS) para los mensajes publicados en el tema MQTT. Los valores posibles son 0 o 1 (valor predeterminado). QoS 2 no se admite actualmente. Si QoS está establecido en 1, el conector publica el mensaje en el tema MQTT y, a continuación, espera la confirmación antes de confirmar el mensaje en Kafka. En el caso de QoS 0, el conector se confirma inmediatamente sin la confirmación MQTT. | No |
Ejemplo de uso de ruta kafkaToMqtt
:
kafkaToMqtt:
kafkaTopic: sending-event-hub
mqttTopic: heater-commands
qos: 0
En este ejemplo, los mensajes del tema de Kafka sending-event-hub se publican en el tema de MQTT heater-commands con el nivel 0 de QoS.
El nombre del centro de eventos debe coincidir con el tema de Kafka
Cada centro de eventos individual (no el espacio de nombres) debe denominarse exactamente igual que el tema de Kafka previsto especificado en las rutas. Además, la cadena de conexión EntityPath
debe coincidir si la cadena de conexión tiene como ámbito un centro de eventos. Este requisito se debe a que el espacio de nombres de Event Hubs es análogo al clúster de Kafka y el nombre del centro de eventos es análogo a un tema de Kafka, por lo que el nombre del tema de Kafka debe coincidir con el nombre del centro de eventos.
Desplazamientos de grupos de consumidores de Kafka
Si el conector se desconecta o se quita y se vuelve a instalar con el mismo identificador de grupo de consumidores de Kafka, el desplazamiento del grupo de consumidores (la última posición desde donde los mensajes de lectura del consumidor de Kafka) se almacena en Azure Event Hubs. Para más información, consulte Grupo de consumidores de Event Hubs frente a grupo de consumidores de Kafka.
Versión de MQTT
Este conector solo usa MQTT v5.
Contenido relacionado
Publicar y suscribir mensajes MQTT mediante Azure IoT MQ Preview
Comentarios
https://aka.ms/ContentUserFeedback.
Proximamente: Ao longo de 2024, retiraremos gradualmente GitHub Issues como mecanismo de comentarios sobre o contido e substituirémolo por un novo sistema de comentarios. Para obter máis información, consulte:Enviar e ver os comentarios