Odesílání a příjem zpráv mezi Azure IoT MQ Preview a Azure Event Hubs nebo Kafka
Důležité
Azure IoT Operations Preview – Služba Azure Arc je aktuálně ve verzi PREVIEW. Tento software ve verzi Preview byste neměli používat v produkčních prostředích.
Právní podmínky, které platí pro funkce Azure, které jsou ve verzi beta, verzi Preview nebo které zatím nejsou veřejně dostupné, najdete v Dodatečných podmínkách použití pro Microsoft Azure verze Preview.
Konektor Kafka odesílá zprávy z zprostředkovatele MQT MQTT Azure IoT MQTT do koncového bodu Kafka a podobně načítá zprávy jiným způsobem. Vzhledem k tomu, že Azure Event Hubs podporuje rozhraní Kafka API, funguje konektor s Event Hubs automaticky.
Konfigurace konektoru Event Hubs přes koncový bod Kafka
Ve výchozím nastavení se konektor nenainstaluje s Azure IoT MQ. Musí být explicitně povolen s mapováním témat a zadanými přihlašovacími údaji pro ověřování. Pokud chcete povolit obousměrnou komunikaci mezi IoT MQ a Azure Event Hubs prostřednictvím koncového bodu Kafka, postupujte podle těchto kroků.
Vytvořte obor názvů služby Event Hubs.
Vytvořte centrum událostí pro každé téma Kafka.
Udělení přístupu konektoru k oboru názvů služby Event Hubs
Udělení přístupu rozšíření IoT MQ Arc k oboru názvů služby Event Hubs je nejpohodlnější způsob, jak vytvořit zabezpečené připojení z konektoru Kakfa ioT MQ ke službě Event Hubs.
Uložte následující šablonu Bicep do souboru a použijte ji pomocí Azure CLI po nastavení platných parametrů pro vaše prostředí:
Poznámka:
Šablona Bicep předpokládá, že cluster Arc konnnected a obor názvů Event Hubs jsou ve stejné skupině prostředků, upravte šablonu, pokud se vaše prostředí liší.
@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'
resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
name: clusterName
}
resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
name: mqExtensionName
scope: connectedCluster
}
resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
name: eventHubNamespaceName
}
// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
scope: ehNamespace
properties: {
// ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde')
principalId: mqExtension.identity.principalId
principalType: 'ServicePrincipal'
}
}
// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
scope: ehNamespace
properties: {
// ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975')
principalId: mqExtension.identity.principalId
principalType: 'ServicePrincipal'
}
}
# Set the required environment variables
# Resource group for resources
RESOURCE_GROUP=xxx
# Bicep template files name
TEMPLATE_FILE_NAME=xxx
# MQ Arc extension name
MQ_EXTENSION_NAME=xxx
# Arc connected cluster name
CLUSTER_NAME=xxx
# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx
az deployment group create \
--name assign-RBAC-roles \
--resource-group $RESOURCE_GROUP \
--template-file $TEMPLATE_FILE_NAME \
--parameters mqExtensionName=$MQ_EXTENSION_NAME \
--parameters clusterName=$CLUSTER_NAME \
--parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE
Kafka Připojení or
Kafka Připojení or vlastní prostředek (CR) umožňuje nakonfigurovat konektor Kafka, který může komunikovat hostitele Kafka a službu Event Hubs. Konektor Kafka může přenášet data mezi tématy MQTT a tématy Kafka pomocí služby Event Hubs jako koncového bodu kompatibilního se systémem Kafka.
Následující příklad ukazuje cr kafka Připojení or, který se připojuje ke koncovému bodu služby Event Hubs pomocí identity Azure ioT MQ, předpokládá, že se pomocí rychlého startu nainstalovaly další prostředky MQ:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
name: my-eh-connector
namespace: azure-iot-operations # same as one used for other MQ resources
spec:
image:
pullPolicy: IfNotPresent
repository: mcr.microsoft.com/azureiotoperations/kafka
tag: 0.4.0-preview
instances: 2
clientIdPrefix: my-prefix
kafkaConnection:
# Port 9093 is Event Hubs' Kakfa endpoint
# Plug in your Event Hubs namespace name
endpoint: <NAMESPACE>.servicebus.windows.net:9093
tls:
tlsEnabled: true
authentication:
enabled: true
authType:
systemAssignedManagedIdentity:
# plugin in your Event Hubs namespace name
audience: "https://<NAMESPACE>.servicebus.windows.net"
localBrokerConnection:
endpoint: "aio-mq-dmqtt-frontend:8883"
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
authentication:
kubernetes: {}
Následující tabulka popisuje pole v systému Kafka Připojení or CR:
Pole | Popis | Povinní účastníci |
---|---|---|
image | Obrázek konektoru Kafka Můžete zadat , pullPolicy repository a tag image. Výchozí hodnoty se zobrazují v předchozím příkladu. |
Ano |
instance | Počet instancí konektoru Kafka, které se mají spustit. | Ano |
clientIdPrefix | Řetězec, který se má předem připojit k ID klienta používanému konektorem. | No |
Kafka Připojení ion | Podrobnosti o připojení koncového bodu služby Event Hubs. Viz Připojení Kafka. | Ano |
localBroker Připojení ion | Podrobnosti o připojení místního zprostředkovatele, který přepíše výchozí připojení zprostředkovatele. Viz Správa připojení místního zprostředkovatele. | No |
Loglevel | Úroveň protokolu konektoru Kafka. Možné hodnoty jsou: trasování, ladění, informace, upozornění, chyba nebo závažná chyba. Výchozí hodnota je upozornění. | No |
Připojení Kafka
Pole kafkaConnection
definuje podrobnosti připojení koncového bodu Kafka.
Pole | Popis | Povinní účastníci |
---|---|---|
endpoint | Hostitel a port koncového bodu služby Event Hubs. Port je obvykle 9093. Můžete zadat více koncových bodů oddělených čárkami pro použití syntaxe serverů bootstrap. | Ano |
Tls | Konfigurace šifrování TLS. Viz PROTOKOL TLS. | Ano |
ověřování | Konfigurace pro ověřování. Viz Ověřování. | No |
Protokol TLS
Toto tls
pole povolí šifrování TLS pro připojení a volitelně určuje mapu konfigurace certifikační autority.
Pole | Popis | Povinní účastníci |
---|---|---|
tlsEnabled | Logická hodnota, která označuje, jestli je povolené šifrování TLS, nebo ne. Musí být nastavená na hodnotu true pro komunikaci služby Event Hubs. | Ano |
trustedCaCertificateConfigMap | Název mapy konfigurace, která obsahuje certifikát certifikační autority pro ověření identity serveru. Toto pole není vyžadováno pro komunikaci služby Event Hubs, protože služba Event Hubs používá známé certifikační autority, které jsou ve výchozím nastavení důvěryhodné. Toto pole však můžete použít, pokud chcete použít vlastní certifikát certifikační autority. | No |
Pokud zadáte důvěryhodnou certifikační autoritu, vytvořte objekt ConfigMap obsahující veřejnou adresu CA ve formátu PEM a zadejte název vlastnosti trustedCaCertificateConfigMap
.
kubectl create configmap ca-pem --from-file path/to/ca.pem
Ověřování
Ověřovací pole podporuje různé typy metod ověřování, jako jsou SASL, X509 nebo spravovaná identita.
Pole | Popis | Povinní účastníci |
---|---|---|
enabled | Logická hodnota, která označuje, jestli je povolené nebo ne. | Ano |
Authtype | Pole obsahující použitý typ ověřování. Zobrazit typ ověřování |
Typ ověřování
Pole | Popis | Povinní účastníci |
---|---|---|
Sasl | Konfigurace pro ověřování SASL. saslType Zadejte , který může být prostý, scramSha256 nebo scramSha512, a token odkazovat na tajný kód Kubernetes secretName nebo Azure Key Vault keyVault obsahující heslo. |
Ano, pokud používáte ověřování SASL |
systemAssignedManagedIdentity | Konfigurace pro ověřování spravované identity. Zadejte cílovou skupinu pro požadavek tokenu, která musí odpovídat oboru názvů služby Event Hubs (https://<NAMESPACE>.servicebus.windows.net ), protože konektor je klient Kafka. Spravovaná identita přiřazená systémem se automaticky vytvoří a přiřadí konektoru, když je povolená. |
Ano, pokud používáte ověřování spravovaných identit. |
x509 | Konfigurace pro ověřování X509. secretName Zadejte pole nebo keyVault pole. Pole secretName je název tajného klíče, který obsahuje klientský certifikát a klíč klienta ve formátu PEM uložený jako tajný klíč TLS. |
Ano, pokud používáte ověřování X509 |
Informace o tom, jak používat Azure Key Vault a keyVault
spravovat tajné kódy pro Azure IoT MQ místo tajných kódů Kubernetes, najdete v tématu Správa tajných kódů pomocí služby Azure Key Vault nebo tajných kódů Kubernetes.
Ověřování ve službě Event Hubs
Pokud se chcete připojit ke službě Event Hubs pomocí tajného kódu připojovací řetězec a Kubernetes, použijte plain
jako heslo typ $ConnectionString
SASL a jako uživatelské jméno a úplný připojovací řetězec. Nejprve vytvořte tajný klíč Kubernetes:
kubectl create secret generic cs-secret -n azure-iot-operations \
--from-literal=username='$ConnectionString' \
--from-literal=password='Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY>'
Pak na tajný klíč v konfiguraci odkazujte:
authentication:
enabled: true
authType:
sasl:
saslType: plain
token:
secretName: cs-secret
Pokud chcete místo tajných kódů Kubernetes používat Službu Azure Key Vault, vytvořte tajný klíč služby Azure Key Vault s připojovací řetězecEndpoint=sb://..
, na který vaultSecret
odkazujete a zadejte uživatelské jméno jako "$ConnectionString"
v konfiguraci.
authentication:
enabled: true
authType:
sasl:
saslType: plain
token:
keyVault:
username: "$ConnectionString"
vault:
name: my-key-vault
directoryId: <AKV directory ID>
credentials:
servicePrincipalLocalSecretName: aio-akv-sp
vaultSecret:
name: my-cs # Endpoint=sb://..
# version: 939ecc2...
Pokud chcete použít spravovanou identitu, zadejte ji jako jedinou metodu v rámci ověřování. Musíte také přiřadit roli spravované identitě, která uděluje oprávnění k odesílání a přijímání zpráv ze služby Event Hubs, jako je vlastník dat služby Azure Event Hubs nebo odesílatel dat nebo příjemce dat služby Azure Event Hubs. Další informace najdete v tématu Ověření aplikace pomocí ID Microsoft Entra pro přístup k prostředkům služby Event Hubs.
authentication:
enabled: true
authType:
systemAssignedManagedIdentity:
audience: https://<NAMESPACE>.servicebus.windows.net
X.509
Pro X.509 použijte tajný klíč PROTOKOLU TLS Kubernetes obsahující veřejný certifikát a privátní klíč.
kubectl create secret tls my-tls-secret -n azure-iot-operations \
--cert=path/to/cert/file \
--key=path/to/key/file
Pak zadejte secretName
konfiguraci.
authentication:
enabled: true
authType:
x509:
secretName: my-tls-secret
Pokud chcete místo toho použít Azure Key Vault, ujistěte se, že se certifikát a privátní klíč správně importují , a pak zadejte odkaz s vaultCert
.
authentication:
enabled: true
authType:
x509:
keyVault:
vault:
name: my-key-vault
directoryId: <AKV directory ID>
credentials:
servicePrincipalLocalSecretName: aio-akv-sp
vaultCert:
name: my-cert
# version: 939ecc2...
## If presenting full chain also
# vaultCaChainSecret:
# name: my-chain
Nebo pokud se vyžaduje prezentace celého řetězce, nahrajte úplný certifikát řetězu a klíč do AKV jako soubor PFX a místo toho použijte vaultCaChainSecret
pole.
# ...
keyVault:
vaultCaChainSecret:
name: my-cert
# version: 939ecc2...
Správa připojení místního zprostředkovatele
Stejně jako most MQTT funguje konektor Event Hubs jako klient pro zprostředkovatele IoT MQ MQTT. Pokud jste přizpůsobili port naslouchacího procesu nebo ověřování zprostředkovatele IoT MQ MQTT, přepište také konfiguraci místního připojení MQTT konektoru Event Hubs. Další informace najdete v tématu Připojení místního zprostředkovatele MQTT.
Kafka Připojení orTopicMap
Vlastní prostředek Kafka Připojení orTopicMap (CR) umožňuje definovat mapování mezi tématy MQTT a tématy Kafka pro obousměrný přenos dat. Zadejte odkaz na kafka Připojení or CR a seznam tras. Každá trasa může být trasa MQTT na trasu Kafka nebo trasa Kafka do MQTT. Příklad:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
name: my-eh-topic-map
namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
kafkaConnectorRef: my-eh-connector
compression: snappy
batching:
enabled: true
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
partitionStrategy: property
partitionKeyProperty: device-id
copyMqttProperties: true
routes:
# Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
- mqttToKafka:
name: "route1"
mqttTopic: temperature-alerts/#
kafkaTopic: receiving-event-hub
kafkaAcks: one
qos: 1
sharedSubscription:
groupName: group1
groupMinimumShareNumber: 3
# Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
- kafkaToMqtt:
name: "route2"
consumerGroupId: mqConnector
kafkaTopic: sending-event-hub
mqttTopic: heater-commands
qos: 0
Následující tabulka popisuje pole v systému Kafka Připojení orTopicMap CR:
Pole | Popis | Povinní účastníci |
---|---|---|
kafka Připojení orRef | Název KAFKA Připojení or CR, do kterého patří tato mapa tématu. | Ano |
komprese | Konfigurace pro kompresi zpráv odeslaných do témat Kafka. Viz Komprese. | No |
dávkování | Konfigurace dávkování pro zprávy odeslané do témat Kafka. Viz Dávkování. | No |
partitionStrategy | Strategie pro zpracování oddílů Kafka při odesílání zpráv do témat Kafka. Viz strategie zpracování oddílů. | No |
copyMqttProperties | Logická hodnota, která určuje, jestli se systémové a uživatelské vlastnosti MQTT zkopírují do hlavičky zprávy Kafka. Vlastnosti uživatele se zkopírují tak, jak jsou. U některých transformací se provádí systémové vlastnosti. Výchozí hodnota je false. | No |
routes | Seznam tras pro přenos dat mezi tématy MQTT a tématy Kafka Každá trasa může mít pole mqttToKafka nebo kafkaToMqtt pole v závislosti na směru přenosu dat. Viz trasy. |
Ano |
Komprese
Pole komprese umožňuje kompresi zpráv odeslaných do témat Kafka. Komprese pomáhá snížit šířku pásma sítě a prostor úložiště vyžadovaný pro přenos dat. Komprese ale také zvyšuje režii a latenci procesu. Podporované typy komprese jsou uvedeny v následující tabulce.
Hodnota | Popis |
---|---|
Žádná | Nepoužívá se žádná komprese ani dávkování. žádná není výchozí hodnota, pokud není zadána žádná komprese. |
gzip | Používá se komprese GZIP a dávkování. GZIP je algoritmus komprese pro obecné účely, který nabízí dobrou rovnováhu mezi poměrem komprese a rychlostí. |
Elegantní | Použije se komprese přichycení a dávkování. Snappy je algoritmus rychlé komprese, který nabízí střední poměr komprese a rychlost. |
lz4 | Používá se komprese LZ4 a dávkování. LZ4 je algoritmus rychlé komprese, který nabízí nízký poměr komprese a vysokou rychlost. |
Dávkování
Kromě komprese můžete také nakonfigurovat dávkování zpráv před jejich odesláním do témat Kafka. Dávkování umožňuje seskupit více zpráv a zkomprimovat je jako jednu jednotku, což může zlepšit efektivitu komprese a snížit režii sítě.
Pole | Popis | Povinní účastníci |
---|---|---|
enabled | Logická hodnota, která označuje, jestli je povolené dávkování, nebo ne. Pokud není nastavená, výchozí hodnota je false. | Ano |
latenceMs | Maximální časový interval v milisekundách, ve které lze zprávy ukládat do vyrovnávací paměti před odesláním. Pokud je tento interval dosažen, všechny zprávy ve vyrovnávací paměti se odesílají jako dávka bez ohledu na to, kolik nebo kolik jich je. Pokud není nastavená, výchozí hodnota je 5. | No |
maxMessages | Maximální počet zpráv, které lze ukládat do vyrovnávací paměti před odesláním. Pokud je toto číslo dosaženo, všechny zprávy ve vyrovnávací paměti se odesílají jako dávka bez ohledu na to, jak velké jsou nebo jak dlouho se ukládají do vyrovnávací paměti. Pokud není nastavená, výchozí hodnota je 1 00000. | No |
maxBytes | Maximální velikost v bajtech, které lze ukládat do vyrovnávací paměti před odesláním. Pokud je tato velikost dosažená, všechny zprávy ve vyrovnávací paměti se odesílají jako dávka bez ohledu na to, kolik jich je nebo jak dlouho se ukládají do vyrovnávací paměti. Výchozí hodnota je 1 000000 (1 MB). | No |
Příkladem použití dávkování je:
batching:
enabled: true
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
To znamená, že zprávy se odesílají buď v případě, že je v vyrovnávací paměti 1000 zpráv, nebo když je v vyrovnávací paměti 1024 bajtů, nebo když od posledního odeslání uplynulo 1000 milisekund, podle toho, co nastane dříve.
Strategie zpracování oddílů
Strategie zpracování oddílů je funkce, která umožňuje řídit, jak se zprávy přiřazují k oddílům Kafka při jejich odesílání do témat Kafka. Oddíly Kafka jsou logické segmenty tématu Kafka, které umožňují paralelní zpracování a odolnost proti chybám. Každá zpráva v tématu Kafka má oddíl a posun, který slouží k identifikaci a řazení zpráv.
Ve výchozím nastavení konektor Kafka přiřazuje zprávy náhodným oddílům pomocí algoritmu kruhového dotazování. Pomocí různých strategií ale můžete přiřadit zprávy k oddílům na základě určitých kritérií, jako je název tématu MQTT nebo vlastnost zprávy MQTT. To vám pomůže dosáhnout lepšího vyrovnávání zatížení, umístění dat nebo řazení zpráv.
Hodnota | Popis |
---|---|
default | Přiřadí zprávy náhodným oddílům pomocí algoritmu kruhového dotazování. Je to výchozí hodnota, pokud není zadána žádná strategie. |
static | Přiřadí zprávy k pevnému číslu oddílu odvozeného z ID instance konektoru. To znamená, že každá instance konektoru odesílá zprávy do jiného oddílu. To může pomoct dosáhnout lepšího vyrovnávání zatížení a umístění dat. |
topic | Jako klíč pro dělení používá název tématu MQTT. To znamená, že zprávy se stejným názvem tématu MQTT se odesílají do stejného oddílu. To může pomoct dosáhnout lepšího pořadí zpráv a umístění dat. |
vlastnost | Jako klíč pro dělení používá vlastnost zprávy MQTT. Zadejte název vlastnosti v partitionKeyProperty poli. To znamená, že zprávy se stejnou hodnotou vlastnosti se odesílají do stejného oddílu. To může pomoct dosáhnout lepšího pořadí zpráv a umístění dat na základě vlastního kritéria. |
Příkladem použití strategie zpracování oddílů je:
partitionStrategy: property
partitionKeyProperty: device-id
To znamená, že zprávy se stejnou vlastností id zařízení se odesílají do stejného oddílu.
Trasy
Pole Trasy definuje seznam tras pro přenos dat mezi tématy MQTT a tématy Kafka. Každá trasa může mít pole mqttToKafka
nebo kafkaToMqtt
pole v závislosti na směru přenosu dat.
MQTT to Kafka
Pole mqttToKafka
definuje trasu, která přenáší data z tématu MQTT do tématu Kafka.
Pole | Popis | Povinní účastníci |
---|---|---|
name | Jedinečný název trasy. | Ano |
mqttTopic | Téma MQTT pro přihlášení k odběru. Zástupné znaky (# a + ) můžete použít ke shodě s více tématy. |
Ano |
kafkaTopic | Téma Kafka, do které se má odeslat. | Ano |
KafkaAcks | Počet potvrzení, které konektor vyžaduje z koncového bodu Kafka. Možné hodnoty jsou: zero , one nebo all . |
No |
Qos | Úroveň kvality služby (QoS) pro odběr tématu MQTT. Možné hodnoty jsou: 0 nebo 1 (výchozí). QoS 2 se v současné době nepodporuje. | Ano |
sharedSubscription | Konfigurace pro používání sdílených předplatných pro témata MQTT. groupName Zadejte , což je jedinečný identifikátor pro skupinu odběratelů a groupMinimumShareNumber , což je počet odběratelů ve skupině, která přijímá zprávy z tématu. Pokud je například groupName "group1" a groupMinimumShareNumber je 3, konektor vytvoří tři odběratele se stejným názvem skupiny pro příjem zpráv z tématu. Tato funkce umožňuje distribuovat zprávy mezi více odběratelů bez ztráty zpráv nebo vytváření duplicit. |
No |
Příklad použití mqttToKafka
trasy:
mqttToKafka:
mqttTopic: temperature-alerts/#
kafkaTopic: receiving-event-hub
kafkaAcks: one
qos: 1
sharedSubscription:
groupName: group1
groupMinimumShareNumber: 3
V tomto příkladu se zprávy z témat MQTT, která odpovídají upozorněním na teplotu/#, posílají do centra událostí Kafka s ekvivalentním číslem 1 a skupinou sdílených předplatných "group1" s číslem 3.
Kafka do MQTT
Pole kafkaToMqtt
definuje trasu, která přenáší data z tématu Kafka do tématu MQTT.
Pole | Popis | Povinní účastníci |
---|---|---|
name | Jedinečný název trasy. | Ano |
kafkaTopic | Téma Kafka, ze které se má načíst. | Ano |
mqttTopic | Téma MQTT, do které se má publikovat. | Ano |
consumerGroupId | Předpona ID skupiny příjemců pro každou trasu Kafka do MQTT. Pokud není nastavené, ID skupiny příjemců se nastaví na stejné jako název trasy. | No |
Qos | Úroveň kvality služby (QoS) pro zprávy publikované v tématu MQTT. Možné hodnoty jsou 0 nebo 1 (výchozí). QoS 2 se v současné době nepodporuje. Pokud je QoS nastavená na hodnotu 1, konektor publikuje zprávu do tématu MQTT a potom před potvrzením zprávy do systému Kafka počká na akci. V případě QoS 0 se konektor okamžitě potvrdí bez MQTT ack. | No |
Příklad použití kafkaToMqtt
trasy:
kafkaToMqtt:
kafkaTopic: sending-event-hub
mqttTopic: heater-commands
qos: 0
V tomto příkladu se zprávy z tématu Kafka sending-event-hub* publikují do příkazů ohřívače témat MQTT s úrovní QoS 0.
Název centra událostí musí odpovídat tématu Kafka.
Každé samostatné centrum událostí, které není oborem názvů, musí být pojmenováno přesně stejně jako zamýšlené téma Kafka zadané v trasách. Připojovací řetězec se také musí shodovat, EntityPath
pokud je připojovací řetězec vymezený na jedno centrum událostí. Tento požadavek spočívá v tom, že obor názvů služby Event Hubs je podobný jako název clusteru Kafka a centra událostí je podobný tématu Kafka, takže název tématu Kafka musí odpovídat názvu centra událostí.
Posuny skupin příjemců Kafka
Pokud se konektor odpojí nebo odebere a přeinstaluje se stejným ID skupiny příjemců Kafka, posun skupiny příjemců (poslední pozice, ze které se ukládají zprávy čtení příjemců Kafka) ve službě Azure Event Hubs. Další informace najdete v tématu Skupina příjemců služby Event Hubs vs. Skupina příjemců Kafka.
Verze MQTT
Tento konektor používá pouze MQTT v5.
Související obsah
Publikování a přihlášení k odběru zpráv MQTT pomocí Azure IoT MQ Preview