Freigeben über


Senden und Empfangen von Nachrichten zwischen Azure IoT MQ Preview und Azure Event Hubs oder Kafka

Wichtig

Die von Azure Arc aktivierte Azure IoT Operations Preview befindet sich derzeit in der VORSCHAU. Sie sollten diese Vorschausoftware nicht in Produktionsumgebungen verwenden.

Die zusätzlichen Nutzungsbestimmungen für Microsoft Azure-Vorschauen enthalten rechtliche Bedingungen. Sie gelten für diejenigen Azure-Features, die sich in der Beta- oder Vorschauversion befinden oder aber anderweitig noch nicht zur allgemeinen Verfügbarkeit freigegeben sind.

Der Kafka-Connector pusht Nachrichten vom MQTT Vermittler von Azure IoT MQ Preview an einen Kafka-Endpunkt und pullt Nachrichten auf ähnliche Weise umgekehrt ab. Da Azure Event Hubs die Kafka-API unterstützt, funktioniert der Connector standardmäßig sofort mit Event Hubs.

Konfigurieren des Event Hubs-Connectors über den Kafka-Endpunkt

Der Connector wird nicht standardmäßig mit Azure IoT MQ installiert. Er muss explizit aktiviert werden, wobei die Themenzuordnung und die Authentifizierungsanmeldeinformationen angegeben werden. Führen Sie diese Schritte aus, um die bidirektionale Kommunikation zwischen IoT MQ und Azure Event Hubs über den Kafka-Endpunkt zu ermöglichen.

  1. Erstellen eines Event Hubs-Namespace.

  2. Erstellen Sie einen Event Hub für jedes Kafka-Thema.

Gewähren von Zugriff des Connectors auf den Event-Hubs-Namespace

Das Gewähren des Zugriffs der IoT MQ Arc-Erweiterung auf einen Event Hubs-Namespace ist die bequemste Möglichkeit, eine sichere Verbindung vom Kafka-Connector von IoT MQ zu Event Hubs herzustellen.

Speichern Sie die folgende Bicep-Vorlage in einer Datei, und wenden Sie sie mit der Azure CLI an, nachdem Sie die gültigen Parameter für Ihre Umgebung festgelegt haben:

Hinweis

Die Bicep-Vorlage geht davon aus, dass sich der mit Arc verbundene Cluster und der Event Hubs-Namespace in derselben Ressourcengruppe befinden; passen Sie die Vorlage an, wenn Ihre Umgebung anders ist.

@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'

resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
  name: clusterName
}

resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
  name: mqExtensionName
  scope: connectedCluster
}

resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
  name: eventHubNamespaceName
}

// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
  scope: ehNamespace
  properties: {
     // ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}

// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
  scope: ehNamespace
  properties: {
    // ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}
# Set the required environment variables

# Resource group for resources
RESOURCE_GROUP=xxx

# Bicep template files name
TEMPLATE_FILE_NAME=xxx

# MQ Arc extension name
MQ_EXTENSION_NAME=xxx

# Arc connected cluster name
CLUSTER_NAME=xxx

# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx


az deployment group create \
      --name assign-RBAC-roles \
      --resource-group $RESOURCE_GROUP \
      --template-file $TEMPLATE_FILE_NAME \
      --parameters mqExtensionName=$MQ_EXTENSION_NAME \
      --parameters clusterName=$CLUSTER_NAME \
      --parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE

KafkaConnector

Mit der benutzerdefinierten Ressource (Custom Resource, CR) KafkaConnector können Sie einen Kafka-Connector konfigurieren, der einen Kafka-Host und Event Hubs kommunizieren kann. Der Kafka-Connector kann Daten zwischen MQTT-Themen und Kafka-Themen übertragen, wobei die Event Hubs als Kafka-kompatibler Endpunkt verwendet werden.

Das folgende Beispiel zeigt eine KafkaConnector-CR, die mithilfe verschiedener Authentifizierungstypen eine Verbindung mit einem Event Hubs-Endpunkt herstellt. Es wird davon ausgegangen, dass andere MQ-Ressourcen mithilfe der Schnellstartanleitung installiert wurden:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
  name: my-eh-connector
  namespace: azure-iot-operations # same as one used for other MQ resources
spec:
  image:
    pullPolicy: IfNotPresent
    repository: mcr.microsoft.com/azureiotoperations/kafka
    tag: 0.4.0-preview
  instances: 2
  clientIdPrefix: my-prefix
  kafkaConnection:
    # Port 9093 is Event Hubs' Kakfa endpoint
    # Plug in your Event Hubs namespace name
    endpoint: <NAMESPACE>.servicebus.windows.net:9093
    tls:
      tlsEnabled: true
    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          # plugin in your Event Hubs namespace name
          audience: "https://<NAMESPACE>.servicebus.windows.net" 
  localBrokerConnection:
    endpoint: "aio-mq-dmqtt-frontend:8883"
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
    authentication:
      kubernetes: {}

Die folgenden Tabelle beschreibt die Felder in der benutzerdefinierten KafkaConnector-Ressource:

Feld Beschreibung Erforderlich
image Das Bild des Kafka-Connectors. Sie können pullPolicy, repository und tag für das Bild angeben. Standardwerte werden im vorherigen Beispiel angezeigt. Ja
instances Die Anzahl der Instanzen des auszuführenden Kafka-Connectors. Ja
clientIdPrefix Die Zeichenfolge, die einer Client-ID vorangestellt werden soll, die vom Connector verwendet wird. Nein
kafkaConnection Die Verbindungsdetails des Event Hubs-Endpunkts. Siehe Kafka-Verbindung. Ja
localBrokerConnection Die Verbindungsdetails des lokalen Brokers, der die Standardbrokerverbindung außer Kraft setzt. Siehe Verwalten der lokalen Brokerverbindung. Nein
logLevel Die Protokollebene des Kafka-Connectors. Mögliche Werte sind: trace (Ablaufverfolgung), debug (Debug), info (Information), warn (Warnung), error (Fehler) oder fatal (schwerwiegend). Der Standardwert ist warn. Nein

Kafka-Verbindung

Das Feld kafkaConnection definiert die Verbindungsdetails des Kafka-Endpunkts.

Feld Beschreibung Erforderlich
endpoint Der Host und Port des Event Hubs-Endpunkts. Der Port ist in der Regel 9093. Sie können mehrere Endpunkte angeben, die durch Kommas getrennt sind, um eine Bootstrapserver-Syntax zu verwenden. Ja
tls Die Konfiguration für die TLS-Verschlüsselung. Siehe TLS. Ja
authentication Die Konfiguration für die Authentifizierung. Siehe Authentifizierung. Nein

TLS

Das Feld tls aktiviert die TLS-Verschlüsselung für die Verbindung und gibt optional eine ConfigMap für die ZS an.

Feld Beschreibung Erforderlich
tlsEnabled Ein boolescher Wert, der angibt, ob die TLS-Verschlüsselung aktiviert ist oder nicht. Er muss für die Event Hubs-Kommunikation auf „true“ festgelegt werden. Ja
trustedCaCertificateConfigMap Der Name der ConfigMap, die das ZS-Zertifikat für die Überprüfung der Serveridentität enthält. Dieses Feld ist für die Kommunikation zwischen Event Hubs nicht erforderlich, da Event Hubs bekannte Zertifizierungsstellen verwendet, die standardmäßig als vertrauenswürdig eingestuft werden. Sie können dieses Feld jedoch verwenden, wenn Sie ein benutzerdefiniertes ZS-Zertifikat verwenden möchten. Nein

Wenn Sie eine vertrauenswürdige ZS angeben, erstellen Sie eine ConfigMap mit dem öffentlichen Teil der ZS im PEM-Format, und geben Sie den Namen in der trustedCaCertificateConfigMap-Eigenschaft an.

kubectl create configmap ca-pem --from-file path/to/ca.pem

Authentifizierung

Das Authentifizierungsfeld unterstützt verschiedene Typen von Authentifizierungsmethoden, z. B. SASL, X509 oder verwaltete Identität.

Feld Beschreibung Erforderlich
enabled Ein boolescher Wert, der angibt, ob die Authentifizierung aktiviert ist oder nicht. Ja
authType Ein Feld, das den verwendeten Authentifizierungstyp enthält. Siehe Authentifizierungstyp Ja
Authentifizierungstyp
Feld Beschreibung Erforderlich
sasl Die Konfiguration für die SASL-Authentifizierung. Geben Sie die saslType an, die einfach, scramSha256 oder scramSha512 sein kann, und token, um auf den geheimen Schlüssel von Kubernetes secretName oder den Azure Key Vault keyVault zu verweisen, der das Kennwort enthält. Ja, bei Verwendung der SASL-Authentifizierung
systemAssignedManagedIdentity Die Konfiguration für die Authentifizierung mit verwalteter Identität. Geben Sie die Zielgruppe für die Tokenanforderung an, die mit dem Event Hubs-Namespace (https://<NAMESPACE>.servicebus.windows.net) übereinstimmen muss, da der Connector ein Kafka-Client ist. Eine systemseitig zugewiesene verwaltete Identität wird automatisch erstellt und dem Connector zugewiesen, wenn sie aktiviert wird. Ja, bei Verwendung der Authentifizierung mit verwalteter Identität
x509 Die Konfiguration für die X509-Authentifizierung. Geben Sie das Feld secretName oder keyVault an. Das Feld secretName ist der Name des Geheimnisses, welches das Clientzertifikat und den Clientschlüssel im PEM-Format enthält, gespeichert als TLS-Geheimnis. Ja, bei Verwendung der X509-Authentifizierung

Informationen zum Verwenden von Azure Key Vault und der keyVault zum Verwalten von Geheimschlüsseln für Azure IoT MQ anstelle von Kubernetes-Geheimnissen finden Sie unter Verwalten von geheimen Schlüsseln mithilfe von Azure Key Vault- oder Kubernetes-Geheimnissen.

Authentifizieren bei Event Hubs

Um eine verwaltete Identität zu verwenden, geben Sie sie als einzige Authentifizierungsmethode an. Außerdem müssen Sie der verwalteten Identität eine Rolle zuweisen, die die Berechtigung zum Senden und Empfangen von Nachrichten von Event Hubs gewährt, z. B. Azure Event Hubs-Datenbesitzer oder Azure Event Hubs-Datenabsender/-empfänger. Weitere Informationen finden Sie unter Authentifizieren einer Anwendung mit Microsoft Entra ID für den Zugriff auf Event Hubs-Ressourcen.

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
  name: my-eh-connector
  namespace: azure-iot-operations # same as one used for other MQ resources
spec:
  image:
    pullPolicy: IfNotPresent
    repository: mcr.microsoft.com/azureiotoperations/kafka
    tag: 0.4.0-preview
  instances: 2
  clientIdPrefix: my-prefix
  kafkaConnection:
    # Port 9093 is Event Hubs' Kakfa endpoint
    # Plug in your Event Hubs namespace name
    endpoint: <NAMESPACE>.servicebus.windows.net:9093
    tls:
      tlsEnabled: true
    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          # plugin in your Event Hubs namespace name
          audience: "https://<NAMESPACE>.servicebus.windows.net" 
  localBrokerConnection:
    endpoint: "aio-mq-dmqtt-frontend:8883"
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
    authentication:
      kubernetes: {}

Verwalten der lokalen Brokerverbindung

Wie die MQTT-Brücke fungiert der Event Hubs-Connector als Client für den MQTT Vermittler von IoT MQ. Wenn Sie den Listenerport und/oder die Authentifizierung Ihres MQTT Vermittlers von IoT MQ angepasst haben, überschreiben Sie auch die lokale MQTT-Verbindungskonfiguration für den Event Hubs-Connector. Weitere Informationen finden Sie unter Lokale Broker-Verbindung der MQTT-Brücke.

KafkaConnectorTopicMap

Mit der benutzerdefinierten Ressource (CR) „KafkaConnectorTopicMap“ können Sie die Zuordnung zwischen MQTT-Themen und Kafka-Themen für die bidirektionale Datenübertragung definieren. Geben Sie einen Verweis auf eine KafkaConnector-CR und eine Liste von Routen an. Jede Route kann entweder eine MQTT-zu-Kafka-Route oder eine Kafka-zu-MQTT-Route sein. Beispiel:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
  name: my-eh-topic-map
  namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
  kafkaConnectorRef: my-eh-connector
  compression: none
  batching:
    enabled: true
    latencyMs: 1000
    maxMessages: 100
    maxBytes: 1024
  partitionStrategy: property
  partitionKeyProperty: device-id
  copyMqttProperties: true
  routes:
    # Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
    - mqttToKafka:
        name: "route1"
        mqttTopic: temperature-alerts/#
        kafkaTopic: receiving-event-hub
        kafkaAcks: one
        qos: 1
        sharedSubscription:
          groupName: group1
          groupMinimumShareNumber: 3
    # Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
    - kafkaToMqtt:
        name: "route2"
        consumerGroupId: mqConnector
        kafkaTopic: sending-event-hub
        mqttTopic: heater-commands
        qos: 0

Die folgenden Tabelle beschreibt die Felder in der KafkaConnectorTopicMap-CR:

Feld Beschreibung Erforderlich
kafkaConnectorRef Der Name der KafkaConnector-CR, zu der diese Themenzuordnung gehört. Ja
compression Die Konfiguration für die Komprimierung für die Nachrichten, die an Kafka-Themen gesendet werden. Siehe Komprimierung. Nein
Batchverarbeitung Die Konfiguration für die Batchverarbeitung für die Nachrichten, die an Kafka-Themen gesendet werden. Siehe Batchverarbeitung. Nein
partitionStrategy Die Strategie für die Behandlung von Kafka-Partitionen beim Senden von Nachrichten an Kafka-Themen. Siehe Partitionsbehandlungsstrategie. Nein
copyMqttProperties Boolescher Wert zum Steuern, ob MQTT-System- und Benutzereigenschaften in den Kafka-Nachrichtenkopf kopiert werden. Die Benutzereigenschaften werden unverändert kopiert. Einige Transformationen erfolgen mit Systemeigenschaften. Der Standardwert ist „false“. Nein
Routen Eine Liste der Routen für die Datenübertragung zwischen MQTT-Themen und Kafka-Themen. Jede Route kann je nach Datenübertragungsrichtung entweder ein mqttToKafka- oder ein kafkaToMqtt-Feld aufweisen. Siehe Routen. Ja

Komprimierung

Das Komprimierungsfeld ermöglicht die Komprimierung für die Nachrichten, die an Kafka-Themen gesendet werden. Die Komprimierung trägt dazu bei, die für die Datenübertragung erforderliche Netzwerkbandbreite und den Speicherplatz zu reduzieren. Die Komprimierung fügt dem Prozess jedoch auch zusätzlichen Aufwand und Wartezeit hinzu. Die Werte und Unterstützung der Komprimierungstypen sind in der folgenden Tabelle aufgeführt.

Wert Beschreibung Unterstützt
Keine Es wird keine Komprimierung oder Batchverarbeitung angewendet. keine ist der Standardwert, wenn keine Komprimierung angegeben wird. Ja
gzip GZIP-Komprimierung und Batchverarbeitung werden angewendet. GZIP ist ein universeller Komprimierungsalgorithmus, der ein gutes Gleichgewicht zwischen Komprimierungsrate und Geschwindigkeit bietet. Ja. Für die GZIP-Komprimierung ist die Preisstufe Event Hubs Premium erforderlich.
snappy Snappy-Komprimierung und Batchverarbeitung werden angewendet. Snappy ist ein schneller Komprimierungsalgorithmus, der eine moderate Komprimierungsrate und Geschwindigkeit bietet. Wird von Azure Event Hubs nicht unterstützt. Verwenden Sie Apache Kafka.
lz4 LZ4-Komprimierung und Batchverarbeitung werden angewendet. LZ4 ist ein schneller Komprimierungsalgorithmus, der eine niedrige Komprimierungsrate und hohe Geschwindigkeit bietet. Wird von Azure Event Hubs nicht unterstützt. Verwenden Sie Apache Kafka.

Batchverarbeitung

Neben der Komprimierung können Sie auch die Batchverarbeitung für Nachrichten konfigurieren, bevor Sie sie an Kafka-Themen senden. Mit der Batchverarbeitung können Sie mehrere Nachrichten zusammen gruppieren und als einzelne Einheit komprimieren, wodurch die Komprimierungseffizienz verbessert und der Netzwerkaufwand reduziert werden kann.

Feld Beschreibung Erforderlich
enabled Ein boolescher Wert, der angibt, ob die Batchverarbeitung aktiviert ist oder nicht. Wenn keine Festlegung erfolgt, ist der Standardwert FALSE. Ja
latencyMs Das maximale Zeitintervall in Millisekunden, in dem Nachrichten vor dem Senden gepuffert werden können. Wenn dieses Intervall erreicht ist, werden alle gepufferten Nachrichten unabhängig davon, wie groß sie sind, als Batch gesendet. Wenn nicht festgelegt, ist der Standardwert 5. Nein
maxMessages Die maximale Anzahl von Nachrichten, die vor dem Senden gepuffert werden können. Wenn diese Anzahl erreicht ist, werden alle gepufferten Nachrichten unabhängig davon, wie groß sie sind oder wie lange sie gepuffert werden, als Batch gesendet. Wenn nicht festgelegt, ist der Standardwert 100.000. Nein
maxBytes Die maximale Größe in Bytes, die vor dem Senden gepuffert werden kann. Wenn diese Größe erreicht ist, werden alle gepufferten Nachrichten unabhängig davon, wie viele es sind oder wie lange sie gepuffert werden, als Batch gesendet. Der Standardwert ist 1.000.000 (1 MB). Nein

Ein Beispiel für die Verwendung der Batchverarbeitung ist:

batching:
  enabled: true
  latencyMs: 1000
  maxMessages: 100
  maxBytes: 1024

Dies bedeutet, dass Nachrichten entweder gesendet werden, wenn 100 Nachrichten im Puffer vorhanden sind, oder wenn 1.024 Bytes im Puffer vorhanden sind oder wenn 1.000 Millisekunden seit dem letzten Senden verstrichen sind, je nachdem, was zuerst eintritt.

Partitionsbehandlungsstrategie

Die Partitionsbehandlungsstrategie ist ein Feature, mit dem Sie steuern können, wie Nachrichten Kafka-Partitionen zugewiesen werden, wenn sie an Kafka-Themen gesendet werden. Kafka-Partitionen sind logische Segmente eines Kafka-Themas, die parallele Verarbeitung und Fehlertoleranz ermöglichen. Jede Nachricht in einem Kafka-Thema verfügt über eine Partition und einen Offset, mit dem die Nachrichten identifiziert und sortiert werden.

Der Kafka-Connector weist Nachrichten standardmäßig zufälligen Partitionen zu, wobei ein Roundrobin-Algorithmus verwendet wird. Sie können jedoch verschiedene Strategien verwenden, um Nachrichten auf der Grundlage bestimmter Kriterien, wie dem MQTT-Themennamen oder einer MQTT-Nachrichteneigenschaft, den Partitionen zuzuweisen. Dadurch können Sie einen besseren Lastausgleich, eine bessere Datenlokalisierung oder eine bessere Nachrichtenanordnung erreichen.

Wert Beschreibung
default Weist Nachrichten standardmäßig zufälligen Partitionen zu, wobei ein Roundrobin-Algorithmus verwendet wird. Dies ist der Standardwert, wenn keine Strategie angegeben wird.
static Weist Nachrichten einer festen Partitionsnummer zu, die von der Instanz-ID des Connectors abgeleitet ist. Dies bedeutet, dass jede Connectorinstanz Nachrichten an eine andere Partition sendet. Dies kann dazu beitragen, einen besseren Lastenausgleich und eine bessere Datenlokalität zu erzielen.
topic Verwendet den MQTT-Themennamen als Schlüssel für die Partitionierung. Dies bedeutet, dass Nachrichten mit demselben MQTT-Themennamen an dieselbe Partition gesendet werden. Dies kann dazu beitragen, eine bessere Nachrichtenanordnung und Datenlokalität zu erzielen.
property Verwendet eine MQTT-Nachrichteneigenschaft als Schlüssel für die Partitionierung. Geben Sie den Namen der Eigenschaft im Feld partitionKeyProperty an. Dies bedeutet, dass Nachrichten mit demselben Eigenschaftswert an dieselbe Partition gesendet werden. Dies kann dazu beitragen, basierend auf einem benutzerdefinierten Kriterium eine bessere Nachrichtenanordnung und Datenlokalität zu erzielen.

Ein Beispiel für die Verwendung der Partitionsbehandlungsstrategie ist:

partitionStrategy: property
partitionKeyProperty: device-id

Dies bedeutet, dass Nachrichten mit derselben Geräte-ID an dieselbe Partition gesendet werden.

Routes

Das Feld „Routen“ definiert eine Liste der Routen für die Datenübertragung zwischen MQTT-Themen und Kafka-Themen. Jede Route kann je nach Datenübertragungsrichtung entweder ein mqttToKafka- oder ein kafkaToMqtt-Feld aufweisen.

MQTT zu Kafka

Das Feld mqttToKafka definiert eine Route, die Daten von einem MQTT-Thema zu einem Kafka-Thema überträgt.

Feld Beschreibung Erforderlich
name Eindeutiger Name für die Route. Ja
mqttTopic Das MQTT-Thema zum Abonnieren. Sie können Platzhalterzeichen (# und +) verwenden, um mehrere Themen abzugleichen. Ja
kafkaTopic Das Kafka-Thema, an das gesendet werden soll. Ja
kafkaAcks Die Anzahl der Bestätigungen, die der Connector vom Kafka-Endpunkt benötigt. Mögliche Werte sind: zero, one oder all. Nein
qos Die QoS-Ebene (Quality of Service) für das MQTT-Themenabonnement. Mögliche Werte sind: 0 oder 1 (Standard). QoS 2 wird derzeit nicht unterstützt. Ja
sharedSubscription Die Konfiguration für die Verwendung gemeinsamer Abonnements für MQTT-Themen. Geben Sie den groupName an, bei dem es sich um einen eindeutigen Bezeichner für eine Gruppe von Abonnenten handelt, und die groupMinimumShareNumber, bei der es sich um die Anzahl der Abonnenten in einer Gruppe handelt, die Nachrichten aus einem Thema empfängt. Wenn „groupName“ beispielsweise „group1“ und „groupMinimumShareNumber“ gleich 3 ist, erstellt der Connector drei Abonnenten mit demselben Gruppennamen, um Nachrichten aus einem Thema zu empfangen. Mit diesem Feature können Sie Nachrichten unter mehreren Abonnenten verteilen, ohne Nachrichten zu verlieren oder Duplikate zu erstellen. Nein

Beispiel für die Verwendung der mqttToKafka-Route:

mqttToKafka:
  mqttTopic: temperature-alerts/#
  kafkaTopic: receiving-event-hub
  kafkaAcks: one
  qos: 1
  sharedSubscription:
    groupName: group1
    groupMinimumShareNumber: 3

In diesem Beispiel werden Nachrichten von MQTT-Themen, die temperature-alerts/# entsprechen, an das Kafka-Thema receiving-event-hub mit QoS-Äquivalent 1 und der gemeinsamen Abonnementgruppe „group1“ mit der Freigabenummer 3 gesendet.

Kafka zu MQTT

Das Feld kafkaToMqtt definiert eine Route, die Daten von einem Kafka-Thema zu einem MQTT-Thema überträgt.

Feld Beschreibung Erforderlich
name Eindeutiger Name für die Route. Ja
kafkaTopic Das Kafka-Thema, aus dem gepullt werden soll. Ja
mqttTopic Das MQTT-Thema, in dem veröffentlicht werden soll. Ja
consumerGroupId Das Präfix der Consumergruppen-ID für jede Kafka-zu MQTT-Route. Wenn nicht festgelegt, wird die Consumergruppen-ID auf denselben Namen wie der Routenname festgelegt. Nein
qos Die QoS-Ebene (Quality of Service) für die im MQTT-Thema veröffentlichten Nachrichten. Mögliche Werte sind 0 oder 1 (Standard). QoS 2 wird derzeit nicht unterstützt. Wenn QoS auf 1 festgelegt ist, veröffentlicht der Connector die Nachricht im MQTT-Thema und wartet dann auf den Aufruf von Ack, bevor die Nachricht wieder an Kafka zurückcommittet wird. Für QoS 0 committet der Connector sofort ohne MQTT-Ack zurück. Nein

Beispiel für die Verwendung der kafkaToMqtt-Route:

kafkaToMqtt:
  kafkaTopic: sending-event-hub
  mqttTopic: heater-commands
  qos: 0

In diesem Beispiel werden Nachrichten aus dem Kafka-Thema sending-event-hub im MQTT-Thema heater-commands mit QoS-Ebene 0 veröffentlicht.

Der Name des Event Hubs muss dem Kafka-Thema entsprechen.

Jeder einzelne Event Hub, nicht der Namespace, muss genau so benannt sein wie das in den Routen angegebene vorgesehene Kafka-Thema. Außerdem muss die Verbindungszeichenfolge EntityPath übereinstimmen, wenn die Verbindungszeichenfolge auf einen Event Hub festgelegt ist. Diese Anforderung ergibt sich daraus, dass der Event Hub-Namespace dem Kafka-Cluster und der Name des Event Hubs einem Kafka-Thema entspricht, sodass der Name des Kafka-Themas dem Namen des Event Hubs entsprechen muss.

Offsets der Kafka-Consumergruppe

Wenn der Connector die Verbindung trennt oder entfernt und mit derselben Kafka-Consumergruppen-ID neu installiert wird, wird der Offset der Consumergruppe (die letzte Position, an welcher der Kafka Consumer Nachrichten hat) in Azure Event Hubs gespeichert. Weitere Informationen finden Sie unter Event Hubs-Consumergruppe im Vergleich zu Kafka-Consumergruppe.

MQTT-Version

Dieser Connector verwendet nur MQTT v5.

Veröffentlichen und Abonnieren von MQTT-Nachrichten mit Azure IoT MQ (Vorschau)