Senden und Empfangen von Nachrichten zwischen Azure IoT MQ Preview und Azure Event Hubs oder Kafka
Wichtig
Die von Azure Arc aktivierte Azure IoT Operations Preview befindet sich derzeit in der VORSCHAU. Sie sollten diese Vorschausoftware nicht in Produktionsumgebungen verwenden.
Die zusätzlichen Nutzungsbestimmungen für Microsoft Azure-Vorschauen enthalten rechtliche Bedingungen. Sie gelten für diejenigen Azure-Features, die sich in der Beta- oder Vorschauversion befinden oder aber anderweitig noch nicht zur allgemeinen Verfügbarkeit freigegeben sind.
Der Kafka-Connector pusht Nachrichten vom MQTT Vermittler von Azure IoT MQ Preview an einen Kafka-Endpunkt und pullt Nachrichten auf ähnliche Weise umgekehrt ab. Da Azure Event Hubs die Kafka-API unterstützt, funktioniert der Connector standardmäßig sofort mit Event Hubs.
Konfigurieren des Event Hubs-Connectors über den Kafka-Endpunkt
Der Connector wird nicht standardmäßig mit Azure IoT MQ installiert. Er muss explizit aktiviert werden, wobei die Themenzuordnung und die Authentifizierungsanmeldeinformationen angegeben werden. Führen Sie diese Schritte aus, um die bidirektionale Kommunikation zwischen IoT MQ und Azure Event Hubs über den Kafka-Endpunkt zu ermöglichen.
Erstellen Sie einen Event Hub für jedes Kafka-Thema.
Gewähren von Zugriff des Connectors auf den Event-Hubs-Namespace
Das Gewähren des Zugriffs der IoT MQ Arc-Erweiterung auf einen Event Hubs-Namespace ist die bequemste Möglichkeit, eine sichere Verbindung vom Kafka-Connector von IoT MQ zu Event Hubs herzustellen.
Speichern Sie die folgende Bicep-Vorlage in einer Datei, und wenden Sie sie mit der Azure CLI an, nachdem Sie die gültigen Parameter für Ihre Umgebung festgelegt haben:
Hinweis
Die Bicep-Vorlage geht davon aus, dass sich der mit Arc verbundene Cluster und der Event Hubs-Namespace in derselben Ressourcengruppe befinden; passen Sie die Vorlage an, wenn Ihre Umgebung anders ist.
@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'
resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
name: clusterName
}
resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
name: mqExtensionName
scope: connectedCluster
}
resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
name: eventHubNamespaceName
}
// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
scope: ehNamespace
properties: {
// ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde')
principalId: mqExtension.identity.principalId
principalType: 'ServicePrincipal'
}
}
// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
scope: ehNamespace
properties: {
// ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975')
principalId: mqExtension.identity.principalId
principalType: 'ServicePrincipal'
}
}
# Set the required environment variables
# Resource group for resources
RESOURCE_GROUP=xxx
# Bicep template files name
TEMPLATE_FILE_NAME=xxx
# MQ Arc extension name
MQ_EXTENSION_NAME=xxx
# Arc connected cluster name
CLUSTER_NAME=xxx
# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx
az deployment group create \
--name assign-RBAC-roles \
--resource-group $RESOURCE_GROUP \
--template-file $TEMPLATE_FILE_NAME \
--parameters mqExtensionName=$MQ_EXTENSION_NAME \
--parameters clusterName=$CLUSTER_NAME \
--parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE
KafkaConnector
Mit der benutzerdefinierten Ressource (Custom Resource, CR) KafkaConnector können Sie einen Kafka-Connector konfigurieren, der einen Kafka-Host und Event Hubs kommunizieren kann. Der Kafka-Connector kann Daten zwischen MQTT-Themen und Kafka-Themen übertragen, wobei die Event Hubs als Kafka-kompatibler Endpunkt verwendet werden.
Das folgende Beispiel zeigt eine KafkaConnector-CR, die mithilfe verschiedener Authentifizierungstypen eine Verbindung mit einem Event Hubs-Endpunkt herstellt. Es wird davon ausgegangen, dass andere MQ-Ressourcen mithilfe der Schnellstartanleitung installiert wurden:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
name: my-eh-connector
namespace: azure-iot-operations # same as one used for other MQ resources
spec:
image:
pullPolicy: IfNotPresent
repository: mcr.microsoft.com/azureiotoperations/kafka
tag: 0.4.0-preview
instances: 2
clientIdPrefix: my-prefix
kafkaConnection:
# Port 9093 is Event Hubs' Kakfa endpoint
# Plug in your Event Hubs namespace name
endpoint: <NAMESPACE>.servicebus.windows.net:9093
tls:
tlsEnabled: true
authentication:
enabled: true
authType:
systemAssignedManagedIdentity:
# plugin in your Event Hubs namespace name
audience: "https://<NAMESPACE>.servicebus.windows.net"
localBrokerConnection:
endpoint: "aio-mq-dmqtt-frontend:8883"
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
authentication:
kubernetes: {}
Die folgenden Tabelle beschreibt die Felder in der benutzerdefinierten KafkaConnector-Ressource:
Feld | Beschreibung | Erforderlich |
---|---|---|
image | Das Bild des Kafka-Connectors. Sie können pullPolicy , repository und tag für das Bild angeben. Standardwerte werden im vorherigen Beispiel angezeigt. |
Ja |
instances | Die Anzahl der Instanzen des auszuführenden Kafka-Connectors. | Ja |
clientIdPrefix | Die Zeichenfolge, die einer Client-ID vorangestellt werden soll, die vom Connector verwendet wird. | Nein |
kafkaConnection | Die Verbindungsdetails des Event Hubs-Endpunkts. Siehe Kafka-Verbindung. | Ja |
localBrokerConnection | Die Verbindungsdetails des lokalen Brokers, der die Standardbrokerverbindung außer Kraft setzt. Siehe Verwalten der lokalen Brokerverbindung. | Nein |
logLevel | Die Protokollebene des Kafka-Connectors. Mögliche Werte sind: trace (Ablaufverfolgung), debug (Debug), info (Information), warn (Warnung), error (Fehler) oder fatal (schwerwiegend). Der Standardwert ist warn. | Nein |
Kafka-Verbindung
Das Feld kafkaConnection
definiert die Verbindungsdetails des Kafka-Endpunkts.
Feld | Beschreibung | Erforderlich |
---|---|---|
endpoint | Der Host und Port des Event Hubs-Endpunkts. Der Port ist in der Regel 9093. Sie können mehrere Endpunkte angeben, die durch Kommas getrennt sind, um eine Bootstrapserver-Syntax zu verwenden. | Ja |
tls | Die Konfiguration für die TLS-Verschlüsselung. Siehe TLS. | Ja |
authentication | Die Konfiguration für die Authentifizierung. Siehe Authentifizierung. | Nein |
TLS
Das Feld tls
aktiviert die TLS-Verschlüsselung für die Verbindung und gibt optional eine ConfigMap für die ZS an.
Feld | Beschreibung | Erforderlich |
---|---|---|
tlsEnabled | Ein boolescher Wert, der angibt, ob die TLS-Verschlüsselung aktiviert ist oder nicht. Er muss für die Event Hubs-Kommunikation auf „true“ festgelegt werden. | Ja |
trustedCaCertificateConfigMap | Der Name der ConfigMap, die das ZS-Zertifikat für die Überprüfung der Serveridentität enthält. Dieses Feld ist für die Kommunikation zwischen Event Hubs nicht erforderlich, da Event Hubs bekannte Zertifizierungsstellen verwendet, die standardmäßig als vertrauenswürdig eingestuft werden. Sie können dieses Feld jedoch verwenden, wenn Sie ein benutzerdefiniertes ZS-Zertifikat verwenden möchten. | Nein |
Wenn Sie eine vertrauenswürdige ZS angeben, erstellen Sie eine ConfigMap mit dem öffentlichen Teil der ZS im PEM-Format, und geben Sie den Namen in der trustedCaCertificateConfigMap
-Eigenschaft an.
kubectl create configmap ca-pem --from-file path/to/ca.pem
Authentifizierung
Das Authentifizierungsfeld unterstützt verschiedene Typen von Authentifizierungsmethoden, z. B. SASL, X509 oder verwaltete Identität.
Feld | Beschreibung | Erforderlich |
---|---|---|
enabled | Ein boolescher Wert, der angibt, ob die Authentifizierung aktiviert ist oder nicht. | Ja |
authType | Ein Feld, das den verwendeten Authentifizierungstyp enthält. Siehe Authentifizierungstyp | Ja |
Authentifizierungstyp
Feld | Beschreibung | Erforderlich |
---|---|---|
sasl | Die Konfiguration für die SASL-Authentifizierung. Geben Sie die saslType an, die einfach, scramSha256 oder scramSha512 sein kann, und token , um auf den geheimen Schlüssel von Kubernetes secretName oder den Azure Key Vault keyVault zu verweisen, der das Kennwort enthält. |
Ja, bei Verwendung der SASL-Authentifizierung |
systemAssignedManagedIdentity | Die Konfiguration für die Authentifizierung mit verwalteter Identität. Geben Sie die Zielgruppe für die Tokenanforderung an, die mit dem Event Hubs-Namespace (https://<NAMESPACE>.servicebus.windows.net ) übereinstimmen muss, da der Connector ein Kafka-Client ist. Eine systemseitig zugewiesene verwaltete Identität wird automatisch erstellt und dem Connector zugewiesen, wenn sie aktiviert wird. |
Ja, bei Verwendung der Authentifizierung mit verwalteter Identität |
x509 | Die Konfiguration für die X509-Authentifizierung. Geben Sie das Feld secretName oder keyVault an. Das Feld secretName ist der Name des Geheimnisses, welches das Clientzertifikat und den Clientschlüssel im PEM-Format enthält, gespeichert als TLS-Geheimnis. |
Ja, bei Verwendung der X509-Authentifizierung |
Informationen zum Verwenden von Azure Key Vault und der keyVault
zum Verwalten von Geheimschlüsseln für Azure IoT MQ anstelle von Kubernetes-Geheimnissen finden Sie unter Verwalten von geheimen Schlüsseln mithilfe von Azure Key Vault- oder Kubernetes-Geheimnissen.
Authentifizieren bei Event Hubs
Um eine verwaltete Identität zu verwenden, geben Sie sie als einzige Authentifizierungsmethode an. Außerdem müssen Sie der verwalteten Identität eine Rolle zuweisen, die die Berechtigung zum Senden und Empfangen von Nachrichten von Event Hubs gewährt, z. B. Azure Event Hubs-Datenbesitzer oder Azure Event Hubs-Datenabsender/-empfänger. Weitere Informationen finden Sie unter Authentifizieren einer Anwendung mit Microsoft Entra ID für den Zugriff auf Event Hubs-Ressourcen.
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
name: my-eh-connector
namespace: azure-iot-operations # same as one used for other MQ resources
spec:
image:
pullPolicy: IfNotPresent
repository: mcr.microsoft.com/azureiotoperations/kafka
tag: 0.4.0-preview
instances: 2
clientIdPrefix: my-prefix
kafkaConnection:
# Port 9093 is Event Hubs' Kakfa endpoint
# Plug in your Event Hubs namespace name
endpoint: <NAMESPACE>.servicebus.windows.net:9093
tls:
tlsEnabled: true
authentication:
enabled: true
authType:
systemAssignedManagedIdentity:
# plugin in your Event Hubs namespace name
audience: "https://<NAMESPACE>.servicebus.windows.net"
localBrokerConnection:
endpoint: "aio-mq-dmqtt-frontend:8883"
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
authentication:
kubernetes: {}
Verwalten der lokalen Brokerverbindung
Wie die MQTT-Brücke fungiert der Event Hubs-Connector als Client für den MQTT Vermittler von IoT MQ. Wenn Sie den Listenerport und/oder die Authentifizierung Ihres MQTT Vermittlers von IoT MQ angepasst haben, überschreiben Sie auch die lokale MQTT-Verbindungskonfiguration für den Event Hubs-Connector. Weitere Informationen finden Sie unter Lokale Broker-Verbindung der MQTT-Brücke.
KafkaConnectorTopicMap
Mit der benutzerdefinierten Ressource (CR) „KafkaConnectorTopicMap“ können Sie die Zuordnung zwischen MQTT-Themen und Kafka-Themen für die bidirektionale Datenübertragung definieren. Geben Sie einen Verweis auf eine KafkaConnector-CR und eine Liste von Routen an. Jede Route kann entweder eine MQTT-zu-Kafka-Route oder eine Kafka-zu-MQTT-Route sein. Beispiel:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
name: my-eh-topic-map
namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
kafkaConnectorRef: my-eh-connector
compression: none
batching:
enabled: true
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
partitionStrategy: property
partitionKeyProperty: device-id
copyMqttProperties: true
routes:
# Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
- mqttToKafka:
name: "route1"
mqttTopic: temperature-alerts/#
kafkaTopic: receiving-event-hub
kafkaAcks: one
qos: 1
sharedSubscription:
groupName: group1
groupMinimumShareNumber: 3
# Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
- kafkaToMqtt:
name: "route2"
consumerGroupId: mqConnector
kafkaTopic: sending-event-hub
mqttTopic: heater-commands
qos: 0
Die folgenden Tabelle beschreibt die Felder in der KafkaConnectorTopicMap-CR:
Feld | Beschreibung | Erforderlich |
---|---|---|
kafkaConnectorRef | Der Name der KafkaConnector-CR, zu der diese Themenzuordnung gehört. | Ja |
compression | Die Konfiguration für die Komprimierung für die Nachrichten, die an Kafka-Themen gesendet werden. Siehe Komprimierung. | Nein |
Batchverarbeitung | Die Konfiguration für die Batchverarbeitung für die Nachrichten, die an Kafka-Themen gesendet werden. Siehe Batchverarbeitung. | Nein |
partitionStrategy | Die Strategie für die Behandlung von Kafka-Partitionen beim Senden von Nachrichten an Kafka-Themen. Siehe Partitionsbehandlungsstrategie. | Nein |
copyMqttProperties | Boolescher Wert zum Steuern, ob MQTT-System- und Benutzereigenschaften in den Kafka-Nachrichtenkopf kopiert werden. Die Benutzereigenschaften werden unverändert kopiert. Einige Transformationen erfolgen mit Systemeigenschaften. Der Standardwert ist „false“. | Nein |
Routen | Eine Liste der Routen für die Datenübertragung zwischen MQTT-Themen und Kafka-Themen. Jede Route kann je nach Datenübertragungsrichtung entweder ein mqttToKafka - oder ein kafkaToMqtt -Feld aufweisen. Siehe Routen. |
Ja |
Komprimierung
Das Komprimierungsfeld ermöglicht die Komprimierung für die Nachrichten, die an Kafka-Themen gesendet werden. Die Komprimierung trägt dazu bei, die für die Datenübertragung erforderliche Netzwerkbandbreite und den Speicherplatz zu reduzieren. Die Komprimierung fügt dem Prozess jedoch auch zusätzlichen Aufwand und Wartezeit hinzu. Die Werte und Unterstützung der Komprimierungstypen sind in der folgenden Tabelle aufgeführt.
Wert | Beschreibung | Unterstützt |
---|---|---|
Keine | Es wird keine Komprimierung oder Batchverarbeitung angewendet. keine ist der Standardwert, wenn keine Komprimierung angegeben wird. | Ja |
gzip | GZIP-Komprimierung und Batchverarbeitung werden angewendet. GZIP ist ein universeller Komprimierungsalgorithmus, der ein gutes Gleichgewicht zwischen Komprimierungsrate und Geschwindigkeit bietet. | Ja. Für die GZIP-Komprimierung ist die Preisstufe Event Hubs Premium erforderlich. |
snappy | Snappy-Komprimierung und Batchverarbeitung werden angewendet. Snappy ist ein schneller Komprimierungsalgorithmus, der eine moderate Komprimierungsrate und Geschwindigkeit bietet. | Wird von Azure Event Hubs nicht unterstützt. Verwenden Sie Apache Kafka. |
lz4 | LZ4-Komprimierung und Batchverarbeitung werden angewendet. LZ4 ist ein schneller Komprimierungsalgorithmus, der eine niedrige Komprimierungsrate und hohe Geschwindigkeit bietet. | Wird von Azure Event Hubs nicht unterstützt. Verwenden Sie Apache Kafka. |
Batchverarbeitung
Neben der Komprimierung können Sie auch die Batchverarbeitung für Nachrichten konfigurieren, bevor Sie sie an Kafka-Themen senden. Mit der Batchverarbeitung können Sie mehrere Nachrichten zusammen gruppieren und als einzelne Einheit komprimieren, wodurch die Komprimierungseffizienz verbessert und der Netzwerkaufwand reduziert werden kann.
Feld | Beschreibung | Erforderlich |
---|---|---|
enabled | Ein boolescher Wert, der angibt, ob die Batchverarbeitung aktiviert ist oder nicht. Wenn keine Festlegung erfolgt, ist der Standardwert FALSE. | Ja |
latencyMs | Das maximale Zeitintervall in Millisekunden, in dem Nachrichten vor dem Senden gepuffert werden können. Wenn dieses Intervall erreicht ist, werden alle gepufferten Nachrichten unabhängig davon, wie groß sie sind, als Batch gesendet. Wenn nicht festgelegt, ist der Standardwert 5. | Nein |
maxMessages | Die maximale Anzahl von Nachrichten, die vor dem Senden gepuffert werden können. Wenn diese Anzahl erreicht ist, werden alle gepufferten Nachrichten unabhängig davon, wie groß sie sind oder wie lange sie gepuffert werden, als Batch gesendet. Wenn nicht festgelegt, ist der Standardwert 100.000. | Nein |
maxBytes | Die maximale Größe in Bytes, die vor dem Senden gepuffert werden kann. Wenn diese Größe erreicht ist, werden alle gepufferten Nachrichten unabhängig davon, wie viele es sind oder wie lange sie gepuffert werden, als Batch gesendet. Der Standardwert ist 1.000.000 (1 MB). | Nein |
Ein Beispiel für die Verwendung der Batchverarbeitung ist:
batching:
enabled: true
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
Dies bedeutet, dass Nachrichten entweder gesendet werden, wenn 100 Nachrichten im Puffer vorhanden sind, oder wenn 1.024 Bytes im Puffer vorhanden sind oder wenn 1.000 Millisekunden seit dem letzten Senden verstrichen sind, je nachdem, was zuerst eintritt.
Partitionsbehandlungsstrategie
Die Partitionsbehandlungsstrategie ist ein Feature, mit dem Sie steuern können, wie Nachrichten Kafka-Partitionen zugewiesen werden, wenn sie an Kafka-Themen gesendet werden. Kafka-Partitionen sind logische Segmente eines Kafka-Themas, die parallele Verarbeitung und Fehlertoleranz ermöglichen. Jede Nachricht in einem Kafka-Thema verfügt über eine Partition und einen Offset, mit dem die Nachrichten identifiziert und sortiert werden.
Der Kafka-Connector weist Nachrichten standardmäßig zufälligen Partitionen zu, wobei ein Roundrobin-Algorithmus verwendet wird. Sie können jedoch verschiedene Strategien verwenden, um Nachrichten auf der Grundlage bestimmter Kriterien, wie dem MQTT-Themennamen oder einer MQTT-Nachrichteneigenschaft, den Partitionen zuzuweisen. Dadurch können Sie einen besseren Lastausgleich, eine bessere Datenlokalisierung oder eine bessere Nachrichtenanordnung erreichen.
Wert | Beschreibung |
---|---|
default | Weist Nachrichten standardmäßig zufälligen Partitionen zu, wobei ein Roundrobin-Algorithmus verwendet wird. Dies ist der Standardwert, wenn keine Strategie angegeben wird. |
static | Weist Nachrichten einer festen Partitionsnummer zu, die von der Instanz-ID des Connectors abgeleitet ist. Dies bedeutet, dass jede Connectorinstanz Nachrichten an eine andere Partition sendet. Dies kann dazu beitragen, einen besseren Lastenausgleich und eine bessere Datenlokalität zu erzielen. |
topic | Verwendet den MQTT-Themennamen als Schlüssel für die Partitionierung. Dies bedeutet, dass Nachrichten mit demselben MQTT-Themennamen an dieselbe Partition gesendet werden. Dies kann dazu beitragen, eine bessere Nachrichtenanordnung und Datenlokalität zu erzielen. |
property | Verwendet eine MQTT-Nachrichteneigenschaft als Schlüssel für die Partitionierung. Geben Sie den Namen der Eigenschaft im Feld partitionKeyProperty an. Dies bedeutet, dass Nachrichten mit demselben Eigenschaftswert an dieselbe Partition gesendet werden. Dies kann dazu beitragen, basierend auf einem benutzerdefinierten Kriterium eine bessere Nachrichtenanordnung und Datenlokalität zu erzielen. |
Ein Beispiel für die Verwendung der Partitionsbehandlungsstrategie ist:
partitionStrategy: property
partitionKeyProperty: device-id
Dies bedeutet, dass Nachrichten mit derselben Geräte-ID an dieselbe Partition gesendet werden.
Routes
Das Feld „Routen“ definiert eine Liste der Routen für die Datenübertragung zwischen MQTT-Themen und Kafka-Themen. Jede Route kann je nach Datenübertragungsrichtung entweder ein mqttToKafka
- oder ein kafkaToMqtt
-Feld aufweisen.
MQTT zu Kafka
Das Feld mqttToKafka
definiert eine Route, die Daten von einem MQTT-Thema zu einem Kafka-Thema überträgt.
Feld | Beschreibung | Erforderlich |
---|---|---|
name | Eindeutiger Name für die Route. | Ja |
mqttTopic | Das MQTT-Thema zum Abonnieren. Sie können Platzhalterzeichen (# und + ) verwenden, um mehrere Themen abzugleichen. |
Ja |
kafkaTopic | Das Kafka-Thema, an das gesendet werden soll. | Ja |
kafkaAcks | Die Anzahl der Bestätigungen, die der Connector vom Kafka-Endpunkt benötigt. Mögliche Werte sind: zero , one oder all . |
Nein |
qos | Die QoS-Ebene (Quality of Service) für das MQTT-Themenabonnement. Mögliche Werte sind: 0 oder 1 (Standard). QoS 2 wird derzeit nicht unterstützt. | Ja |
sharedSubscription | Die Konfiguration für die Verwendung gemeinsamer Abonnements für MQTT-Themen. Geben Sie den groupName an, bei dem es sich um einen eindeutigen Bezeichner für eine Gruppe von Abonnenten handelt, und die groupMinimumShareNumber , bei der es sich um die Anzahl der Abonnenten in einer Gruppe handelt, die Nachrichten aus einem Thema empfängt. Wenn „groupName“ beispielsweise „group1“ und „groupMinimumShareNumber“ gleich 3 ist, erstellt der Connector drei Abonnenten mit demselben Gruppennamen, um Nachrichten aus einem Thema zu empfangen. Mit diesem Feature können Sie Nachrichten unter mehreren Abonnenten verteilen, ohne Nachrichten zu verlieren oder Duplikate zu erstellen. |
Nein |
Beispiel für die Verwendung der mqttToKafka
-Route:
mqttToKafka:
mqttTopic: temperature-alerts/#
kafkaTopic: receiving-event-hub
kafkaAcks: one
qos: 1
sharedSubscription:
groupName: group1
groupMinimumShareNumber: 3
In diesem Beispiel werden Nachrichten von MQTT-Themen, die temperature-alerts/# entsprechen, an das Kafka-Thema receiving-event-hub mit QoS-Äquivalent 1 und der gemeinsamen Abonnementgruppe „group1“ mit der Freigabenummer 3 gesendet.
Kafka zu MQTT
Das Feld kafkaToMqtt
definiert eine Route, die Daten von einem Kafka-Thema zu einem MQTT-Thema überträgt.
Feld | Beschreibung | Erforderlich |
---|---|---|
name | Eindeutiger Name für die Route. | Ja |
kafkaTopic | Das Kafka-Thema, aus dem gepullt werden soll. | Ja |
mqttTopic | Das MQTT-Thema, in dem veröffentlicht werden soll. | Ja |
consumerGroupId | Das Präfix der Consumergruppen-ID für jede Kafka-zu MQTT-Route. Wenn nicht festgelegt, wird die Consumergruppen-ID auf denselben Namen wie der Routenname festgelegt. | Nein |
qos | Die QoS-Ebene (Quality of Service) für die im MQTT-Thema veröffentlichten Nachrichten. Mögliche Werte sind 0 oder 1 (Standard). QoS 2 wird derzeit nicht unterstützt. Wenn QoS auf 1 festgelegt ist, veröffentlicht der Connector die Nachricht im MQTT-Thema und wartet dann auf den Aufruf von Ack, bevor die Nachricht wieder an Kafka zurückcommittet wird. Für QoS 0 committet der Connector sofort ohne MQTT-Ack zurück. | Nein |
Beispiel für die Verwendung der kafkaToMqtt
-Route:
kafkaToMqtt:
kafkaTopic: sending-event-hub
mqttTopic: heater-commands
qos: 0
In diesem Beispiel werden Nachrichten aus dem Kafka-Thema sending-event-hub im MQTT-Thema heater-commands mit QoS-Ebene 0 veröffentlicht.
Der Name des Event Hubs muss dem Kafka-Thema entsprechen.
Jeder einzelne Event Hub, nicht der Namespace, muss genau so benannt sein wie das in den Routen angegebene vorgesehene Kafka-Thema. Außerdem muss die Verbindungszeichenfolge EntityPath
übereinstimmen, wenn die Verbindungszeichenfolge auf einen Event Hub festgelegt ist. Diese Anforderung ergibt sich daraus, dass der Event Hub-Namespace dem Kafka-Cluster und der Name des Event Hubs einem Kafka-Thema entspricht, sodass der Name des Kafka-Themas dem Namen des Event Hubs entsprechen muss.
Offsets der Kafka-Consumergruppe
Wenn der Connector die Verbindung trennt oder entfernt und mit derselben Kafka-Consumergruppen-ID neu installiert wird, wird der Offset der Consumergruppe (die letzte Position, an welcher der Kafka Consumer Nachrichten hat) in Azure Event Hubs gespeichert. Weitere Informationen finden Sie unter Event Hubs-Consumergruppe im Vergleich zu Kafka-Consumergruppe.
MQTT-Version
Dieser Connector verwendet nur MQTT v5.
Zugehöriger Inhalt
Veröffentlichen und Abonnieren von MQTT-Nachrichten mit Azure IoT MQ (Vorschau)
Feedback
https://aka.ms/ContentUserFeedback.
Bald verfügbar: Im Laufe des Jahres 2024 werden wir GitHub-Issues stufenweise als Feedbackmechanismus für Inhalte abbauen und durch ein neues Feedbacksystem ersetzen. Weitere Informationen finden Sie unterFeedback senden und anzeigen für