Odesílání a příjem zpráv mezi Azure IoT MQ Preview a Azure Event Hubs nebo Kafka

Důležité

Azure IoT Operations Preview – Služba Azure Arc je aktuálně ve verzi PREVIEW. Tento software ve verzi Preview byste neměli používat v produkčních prostředích.

Právní podmínky, které platí pro funkce Azure, které jsou ve verzi beta, verzi Preview nebo které zatím nejsou veřejně dostupné, najdete v Dodatečných podmínkách použití pro Microsoft Azure verze Preview.

Konektor Kafka odesílá zprávy z zprostředkovatele MQT MQTT Azure IoT MQTT do koncového bodu Kafka a podobně načítá zprávy jiným způsobem. Vzhledem k tomu, že Azure Event Hubs podporuje rozhraní Kafka API, funguje konektor s Event Hubs automaticky.

Konfigurace konektoru Event Hubs přes koncový bod Kafka

Ve výchozím nastavení se konektor nenainstaluje s Azure IoT MQ. Musí být explicitně povolen s mapováním témat a zadanými přihlašovacími údaji pro ověřování. Pokud chcete povolit obousměrnou komunikaci mezi IoT MQ a Azure Event Hubs prostřednictvím koncového bodu Kafka, postupujte podle těchto kroků.

  1. Vytvořte obor názvů služby Event Hubs.

  2. Vytvořte centrum událostí pro každé téma Kafka.

Udělení přístupu konektoru k oboru názvů služby Event Hubs

Udělení přístupu rozšíření IoT MQ Arc k oboru názvů služby Event Hubs je nejpohodlnější způsob, jak vytvořit zabezpečené připojení z konektoru Kakfa ioT MQ ke službě Event Hubs.

Uložte následující šablonu Bicep do souboru a použijte ji pomocí Azure CLI po nastavení platných parametrů pro vaše prostředí:

Poznámka:

Šablona Bicep předpokládá, že cluster Arc konnnected a obor názvů Event Hubs jsou ve stejné skupině prostředků, upravte šablonu, pokud se vaše prostředí liší.

@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'

resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
  name: clusterName
}

resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
  name: mqExtensionName
  scope: connectedCluster
}

resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
  name: eventHubNamespaceName
}

// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
  scope: ehNamespace
  properties: {
     // ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}

// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
  scope: ehNamespace
  properties: {
    // ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}
# Set the required environment variables

# Resource group for resources
RESOURCE_GROUP=xxx

# Bicep template files name
TEMPLATE_FILE_NAME=xxx

# MQ Arc extension name
MQ_EXTENSION_NAME=xxx

# Arc connected cluster name
CLUSTER_NAME=xxx

# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx


az deployment group create \
      --name assign-RBAC-roles \
      --resource-group $RESOURCE_GROUP \
      --template-file $TEMPLATE_FILE_NAME \
      --parameters mqExtensionName=$MQ_EXTENSION_NAME \
      --parameters clusterName=$CLUSTER_NAME \
      --parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE

Kafka Připojení or

Kafka Připojení or vlastní prostředek (CR) umožňuje nakonfigurovat konektor Kafka, který může komunikovat hostitele Kafka a službu Event Hubs. Konektor Kafka může přenášet data mezi tématy MQTT a tématy Kafka pomocí služby Event Hubs jako koncového bodu kompatibilního se systémem Kafka.

Následující příklad ukazuje cr kafka Připojení or, který se připojuje ke koncovému bodu služby Event Hubs pomocí identity Azure ioT MQ, předpokládá, že se pomocí rychlého startu nainstalovaly další prostředky MQ:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
  name: my-eh-connector
  namespace: azure-iot-operations # same as one used for other MQ resources
spec:
  image:
    pullPolicy: IfNotPresent
    repository: mcr.microsoft.com/azureiotoperations/kafka
    tag: 0.4.0-preview
  instances: 2
  clientIdPrefix: my-prefix
  kafkaConnection:
    # Port 9093 is Event Hubs' Kakfa endpoint
    # Plug in your Event Hubs namespace name
    endpoint: <NAMESPACE>.servicebus.windows.net:9093
    tls:
      tlsEnabled: true
    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          # plugin in your Event Hubs namespace name
          audience: "https://<NAMESPACE>.servicebus.windows.net" 
  localBrokerConnection:
    endpoint: "aio-mq-dmqtt-frontend:8883"
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
    authentication:
      kubernetes: {}

Následující tabulka popisuje pole v systému Kafka Připojení or CR:

Pole Popis Povinní účastníci
image Obrázek konektoru Kafka Můžete zadat , pullPolicyrepositorya tag image. Výchozí hodnoty se zobrazují v předchozím příkladu. Ano
instance Počet instancí konektoru Kafka, které se mají spustit. Ano
clientIdPrefix Řetězec, který se má předem připojit k ID klienta používanému konektorem. No
Kafka Připojení ion Podrobnosti o připojení koncového bodu služby Event Hubs. Viz Připojení Kafka. Ano
localBroker Připojení ion Podrobnosti o připojení místního zprostředkovatele, který přepíše výchozí připojení zprostředkovatele. Viz Správa připojení místního zprostředkovatele. No
Loglevel Úroveň protokolu konektoru Kafka. Možné hodnoty jsou: trasování, ladění, informace, upozornění, chyba nebo závažná chyba. Výchozí hodnota je upozornění. No

Připojení Kafka

Pole kafkaConnection definuje podrobnosti připojení koncového bodu Kafka.

Pole Popis Povinní účastníci
endpoint Hostitel a port koncového bodu služby Event Hubs. Port je obvykle 9093. Můžete zadat více koncových bodů oddělených čárkami pro použití syntaxe serverů bootstrap. Ano
Tls Konfigurace šifrování TLS. Viz PROTOKOL TLS. Ano
ověřování Konfigurace pro ověřování. Viz Ověřování. No

Protokol TLS

Toto tls pole povolí šifrování TLS pro připojení a volitelně určuje mapu konfigurace certifikační autority.

Pole Popis Povinní účastníci
tlsEnabled Logická hodnota, která označuje, jestli je povolené šifrování TLS, nebo ne. Musí být nastavená na hodnotu true pro komunikaci služby Event Hubs. Ano
trustedCaCertificateConfigMap Název mapy konfigurace, která obsahuje certifikát certifikační autority pro ověření identity serveru. Toto pole není vyžadováno pro komunikaci služby Event Hubs, protože služba Event Hubs používá známé certifikační autority, které jsou ve výchozím nastavení důvěryhodné. Toto pole však můžete použít, pokud chcete použít vlastní certifikát certifikační autority. No

Pokud zadáte důvěryhodnou certifikační autoritu, vytvořte objekt ConfigMap obsahující veřejnou adresu CA ve formátu PEM a zadejte název vlastnosti trustedCaCertificateConfigMap .

kubectl create configmap ca-pem --from-file path/to/ca.pem

Ověřování

Ověřovací pole podporuje různé typy metod ověřování, jako jsou SASL, X509 nebo spravovaná identita.

Pole Popis Povinní účastníci
enabled Logická hodnota, která označuje, jestli je povolené nebo ne. Ano
Authtype Pole obsahující použitý typ ověřování. Zobrazit typ ověřování
Typ ověřování
Pole Popis Povinní účastníci
Sasl Konfigurace pro ověřování SASL. saslTypeZadejte , který může být prostý, scramSha256 nebo scramSha512, a token odkazovat na tajný kód Kubernetes secretName nebo Azure Key Vault keyVault obsahující heslo. Ano, pokud používáte ověřování SASL
systemAssignedManagedIdentity Konfigurace pro ověřování spravované identity. Zadejte cílovou skupinu pro požadavek tokenu, která musí odpovídat oboru názvů služby Event Hubs (https://<NAMESPACE>.servicebus.windows.net), protože konektor je klient Kafka. Spravovaná identita přiřazená systémem se automaticky vytvoří a přiřadí konektoru, když je povolená. Ano, pokud používáte ověřování spravovaných identit.
x509 Konfigurace pro ověřování X509. secretName Zadejte pole nebo keyVault pole. Pole secretName je název tajného klíče, který obsahuje klientský certifikát a klíč klienta ve formátu PEM uložený jako tajný klíč TLS. Ano, pokud používáte ověřování X509

Informace o tom, jak používat Azure Key Vault a keyVault spravovat tajné kódy pro Azure IoT MQ místo tajných kódů Kubernetes, najdete v tématu Správa tajných kódů pomocí služby Azure Key Vault nebo tajných kódů Kubernetes.

Ověřování ve službě Event Hubs

Pokud se chcete připojit ke službě Event Hubs pomocí tajného kódu připojovací řetězec a Kubernetes, použijte plain jako heslo typ $ConnectionString SASL a jako uživatelské jméno a úplný připojovací řetězec. Nejprve vytvořte tajný klíč Kubernetes:

kubectl create secret generic cs-secret -n azure-iot-operations \
  --from-literal=username='$ConnectionString' \
  --from-literal=password='Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY>'

Pak na tajný klíč v konfiguraci odkazujte:

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        secretName: cs-secret

Pokud chcete místo tajných kódů Kubernetes používat Službu Azure Key Vault, vytvořte tajný klíč služby Azure Key Vault s připojovací řetězecEndpoint=sb://.., na který vaultSecretodkazujete a zadejte uživatelské jméno jako "$ConnectionString" v konfiguraci.

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        keyVault:
          username: "$ConnectionString"
          vault:
            name: my-key-vault
            directoryId: <AKV directory ID>
            credentials:
              servicePrincipalLocalSecretName: aio-akv-sp
          vaultSecret:
            name: my-cs # Endpoint=sb://..
            # version: 939ecc2...

Pokud chcete použít spravovanou identitu, zadejte ji jako jedinou metodu v rámci ověřování. Musíte také přiřadit roli spravované identitě, která uděluje oprávnění k odesílání a přijímání zpráv ze služby Event Hubs, jako je vlastník dat služby Azure Event Hubs nebo odesílatel dat nebo příjemce dat služby Azure Event Hubs. Další informace najdete v tématu Ověření aplikace pomocí ID Microsoft Entra pro přístup k prostředkům služby Event Hubs.

authentication:
  enabled: true
  authType:
    systemAssignedManagedIdentity:
      audience: https://<NAMESPACE>.servicebus.windows.net
X.509

Pro X.509 použijte tajný klíč PROTOKOLU TLS Kubernetes obsahující veřejný certifikát a privátní klíč.

kubectl create secret tls my-tls-secret -n azure-iot-operations \
  --cert=path/to/cert/file \
  --key=path/to/key/file

Pak zadejte secretName konfiguraci.

authentication:
  enabled: true
  authType:
    x509:
      secretName: my-tls-secret

Pokud chcete místo toho použít Azure Key Vault, ujistěte se, že se certifikát a privátní klíč správně importují , a pak zadejte odkaz s vaultCert.

authentication:
  enabled: true
  authType:
    x509:
      keyVault:
        vault:
          name: my-key-vault
          directoryId: <AKV directory ID>
          credentials:
            servicePrincipalLocalSecretName: aio-akv-sp
        vaultCert:
          name: my-cert
          # version: 939ecc2...
        ## If presenting full chain also  
        # vaultCaChainSecret:
        #   name: my-chain

Nebo pokud se vyžaduje prezentace celého řetězce, nahrajte úplný certifikát řetězu a klíč do AKV jako soubor PFX a místo toho použijte vaultCaChainSecret pole.

# ...
keyVault:
  vaultCaChainSecret:
    name: my-cert
    # version: 939ecc2...

Správa připojení místního zprostředkovatele

Stejně jako most MQTT funguje konektor Event Hubs jako klient pro zprostředkovatele IoT MQ MQTT. Pokud jste přizpůsobili port naslouchacího procesu nebo ověřování zprostředkovatele IoT MQ MQTT, přepište také konfiguraci místního připojení MQTT konektoru Event Hubs. Další informace najdete v tématu Připojení místního zprostředkovatele MQTT.

Kafka Připojení orTopicMap

Vlastní prostředek Kafka Připojení orTopicMap (CR) umožňuje definovat mapování mezi tématy MQTT a tématy Kafka pro obousměrný přenos dat. Zadejte odkaz na kafka Připojení or CR a seznam tras. Každá trasa může být trasa MQTT na trasu Kafka nebo trasa Kafka do MQTT. Příklad:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
  name: my-eh-topic-map
  namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
  kafkaConnectorRef: my-eh-connector
  compression: snappy
  batching:
    enabled: true
    latencyMs: 1000
    maxMessages: 100
    maxBytes: 1024
  partitionStrategy: property
  partitionKeyProperty: device-id
  copyMqttProperties: true
  routes:
    # Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
    - mqttToKafka:
        name: "route1"
        mqttTopic: temperature-alerts/#
        kafkaTopic: receiving-event-hub
        kafkaAcks: one
        qos: 1
        sharedSubscription:
          groupName: group1
          groupMinimumShareNumber: 3
    # Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
    - kafkaToMqtt:
        name: "route2"
        consumerGroupId: mqConnector
        kafkaTopic: sending-event-hub
        mqttTopic: heater-commands
        qos: 0

Následující tabulka popisuje pole v systému Kafka Připojení orTopicMap CR:

Pole Popis Povinní účastníci
kafka Připojení orRef Název KAFKA Připojení or CR, do kterého patří tato mapa tématu. Ano
komprese Konfigurace pro kompresi zpráv odeslaných do témat Kafka. Viz Komprese. No
dávkování Konfigurace dávkování pro zprávy odeslané do témat Kafka. Viz Dávkování. No
partitionStrategy Strategie pro zpracování oddílů Kafka při odesílání zpráv do témat Kafka. Viz strategie zpracování oddílů. No
copyMqttProperties Logická hodnota, která určuje, jestli se systémové a uživatelské vlastnosti MQTT zkopírují do hlavičky zprávy Kafka. Vlastnosti uživatele se zkopírují tak, jak jsou. U některých transformací se provádí systémové vlastnosti. Výchozí hodnota je false. No
routes Seznam tras pro přenos dat mezi tématy MQTT a tématy Kafka Každá trasa může mít pole mqttToKafka nebo kafkaToMqtt pole v závislosti na směru přenosu dat. Viz trasy. Ano

Komprese

Pole komprese umožňuje kompresi zpráv odeslaných do témat Kafka. Komprese pomáhá snížit šířku pásma sítě a prostor úložiště vyžadovaný pro přenos dat. Komprese ale také zvyšuje režii a latenci procesu. Podporované typy komprese jsou uvedeny v následující tabulce.

Hodnota Popis
Žádná Nepoužívá se žádná komprese ani dávkování. žádná není výchozí hodnota, pokud není zadána žádná komprese.
gzip Používá se komprese GZIP a dávkování. GZIP je algoritmus komprese pro obecné účely, který nabízí dobrou rovnováhu mezi poměrem komprese a rychlostí.
Elegantní Použije se komprese přichycení a dávkování. Snappy je algoritmus rychlé komprese, který nabízí střední poměr komprese a rychlost.
lz4 Používá se komprese LZ4 a dávkování. LZ4 je algoritmus rychlé komprese, který nabízí nízký poměr komprese a vysokou rychlost.

Dávkování

Kromě komprese můžete také nakonfigurovat dávkování zpráv před jejich odesláním do témat Kafka. Dávkování umožňuje seskupit více zpráv a zkomprimovat je jako jednu jednotku, což může zlepšit efektivitu komprese a snížit režii sítě.

Pole Popis Povinní účastníci
enabled Logická hodnota, která označuje, jestli je povolené dávkování, nebo ne. Pokud není nastavená, výchozí hodnota je false. Ano
latenceMs Maximální časový interval v milisekundách, ve které lze zprávy ukládat do vyrovnávací paměti před odesláním. Pokud je tento interval dosažen, všechny zprávy ve vyrovnávací paměti se odesílají jako dávka bez ohledu na to, kolik nebo kolik jich je. Pokud není nastavená, výchozí hodnota je 5. No
maxMessages Maximální počet zpráv, které lze ukládat do vyrovnávací paměti před odesláním. Pokud je toto číslo dosaženo, všechny zprávy ve vyrovnávací paměti se odesílají jako dávka bez ohledu na to, jak velké jsou nebo jak dlouho se ukládají do vyrovnávací paměti. Pokud není nastavená, výchozí hodnota je 1 00000. No
maxBytes Maximální velikost v bajtech, které lze ukládat do vyrovnávací paměti před odesláním. Pokud je tato velikost dosažená, všechny zprávy ve vyrovnávací paměti se odesílají jako dávka bez ohledu na to, kolik jich je nebo jak dlouho se ukládají do vyrovnávací paměti. Výchozí hodnota je 1 000000 (1 MB). No

Příkladem použití dávkování je:

batching:
  enabled: true
  latencyMs: 1000
  maxMessages: 100
  maxBytes: 1024

To znamená, že zprávy se odesílají buď v případě, že je v vyrovnávací paměti 1000 zpráv, nebo když je v vyrovnávací paměti 1024 bajtů, nebo když od posledního odeslání uplynulo 1000 milisekund, podle toho, co nastane dříve.

Strategie zpracování oddílů

Strategie zpracování oddílů je funkce, která umožňuje řídit, jak se zprávy přiřazují k oddílům Kafka při jejich odesílání do témat Kafka. Oddíly Kafka jsou logické segmenty tématu Kafka, které umožňují paralelní zpracování a odolnost proti chybám. Každá zpráva v tématu Kafka má oddíl a posun, který slouží k identifikaci a řazení zpráv.

Ve výchozím nastavení konektor Kafka přiřazuje zprávy náhodným oddílům pomocí algoritmu kruhového dotazování. Pomocí různých strategií ale můžete přiřadit zprávy k oddílům na základě určitých kritérií, jako je název tématu MQTT nebo vlastnost zprávy MQTT. To vám pomůže dosáhnout lepšího vyrovnávání zatížení, umístění dat nebo řazení zpráv.

Hodnota Popis
default Přiřadí zprávy náhodným oddílům pomocí algoritmu kruhového dotazování. Je to výchozí hodnota, pokud není zadána žádná strategie.
static Přiřadí zprávy k pevnému číslu oddílu odvozeného z ID instance konektoru. To znamená, že každá instance konektoru odesílá zprávy do jiného oddílu. To může pomoct dosáhnout lepšího vyrovnávání zatížení a umístění dat.
topic Jako klíč pro dělení používá název tématu MQTT. To znamená, že zprávy se stejným názvem tématu MQTT se odesílají do stejného oddílu. To může pomoct dosáhnout lepšího pořadí zpráv a umístění dat.
vlastnost Jako klíč pro dělení používá vlastnost zprávy MQTT. Zadejte název vlastnosti v partitionKeyProperty poli. To znamená, že zprávy se stejnou hodnotou vlastnosti se odesílají do stejného oddílu. To může pomoct dosáhnout lepšího pořadí zpráv a umístění dat na základě vlastního kritéria.

Příkladem použití strategie zpracování oddílů je:

partitionStrategy: property
partitionKeyProperty: device-id

To znamená, že zprávy se stejnou vlastností id zařízení se odesílají do stejného oddílu.

Trasy

Pole Trasy definuje seznam tras pro přenos dat mezi tématy MQTT a tématy Kafka. Každá trasa může mít pole mqttToKafka nebo kafkaToMqtt pole v závislosti na směru přenosu dat.

MQTT to Kafka

Pole mqttToKafka definuje trasu, která přenáší data z tématu MQTT do tématu Kafka.

Pole Popis Povinní účastníci
name Jedinečný název trasy. Ano
mqttTopic Téma MQTT pro přihlášení k odběru. Zástupné znaky (# a +) můžete použít ke shodě s více tématy. Ano
kafkaTopic Téma Kafka, do které se má odeslat. Ano
KafkaAcks Počet potvrzení, které konektor vyžaduje z koncového bodu Kafka. Možné hodnoty jsou: zero , onenebo all. No
Qos Úroveň kvality služby (QoS) pro odběr tématu MQTT. Možné hodnoty jsou: 0 nebo 1 (výchozí). QoS 2 se v současné době nepodporuje. Ano
sharedSubscription Konfigurace pro používání sdílených předplatných pro témata MQTT. groupNameZadejte , což je jedinečný identifikátor pro skupinu odběratelů a groupMinimumShareNumber, což je počet odběratelů ve skupině, která přijímá zprávy z tématu. Pokud je například groupName "group1" a groupMinimumShareNumber je 3, konektor vytvoří tři odběratele se stejným názvem skupiny pro příjem zpráv z tématu. Tato funkce umožňuje distribuovat zprávy mezi více odběratelů bez ztráty zpráv nebo vytváření duplicit. No

Příklad použití mqttToKafka trasy:

mqttToKafka:
  mqttTopic: temperature-alerts/#
  kafkaTopic: receiving-event-hub
  kafkaAcks: one
  qos: 1
  sharedSubscription:
    groupName: group1
    groupMinimumShareNumber: 3

V tomto příkladu se zprávy z témat MQTT, která odpovídají upozorněním na teplotu/#, posílají do centra událostí Kafka s ekvivalentním číslem 1 a skupinou sdílených předplatných "group1" s číslem 3.

Kafka do MQTT

Pole kafkaToMqtt definuje trasu, která přenáší data z tématu Kafka do tématu MQTT.

Pole Popis Povinní účastníci
name Jedinečný název trasy. Ano
kafkaTopic Téma Kafka, ze které se má načíst. Ano
mqttTopic Téma MQTT, do které se má publikovat. Ano
consumerGroupId Předpona ID skupiny příjemců pro každou trasu Kafka do MQTT. Pokud není nastavené, ID skupiny příjemců se nastaví na stejné jako název trasy. No
Qos Úroveň kvality služby (QoS) pro zprávy publikované v tématu MQTT. Možné hodnoty jsou 0 nebo 1 (výchozí). QoS 2 se v současné době nepodporuje. Pokud je QoS nastavená na hodnotu 1, konektor publikuje zprávu do tématu MQTT a potom před potvrzením zprávy do systému Kafka počká na akci. V případě QoS 0 se konektor okamžitě potvrdí bez MQTT ack. No

Příklad použití kafkaToMqtt trasy:

kafkaToMqtt:
  kafkaTopic: sending-event-hub
  mqttTopic: heater-commands
  qos: 0

V tomto příkladu se zprávy z tématu Kafka sending-event-hub* publikují do příkazů ohřívače témat MQTT s úrovní QoS 0.

Název centra událostí musí odpovídat tématu Kafka.

Každé samostatné centrum událostí, které není oborem názvů, musí být pojmenováno přesně stejně jako zamýšlené téma Kafka zadané v trasách. Připojovací řetězec se také musí shodovat, EntityPath pokud je připojovací řetězec vymezený na jedno centrum událostí. Tento požadavek spočívá v tom, že obor názvů služby Event Hubs je podobný jako název clusteru Kafka a centra událostí je podobný tématu Kafka, takže název tématu Kafka musí odpovídat názvu centra událostí.

Posuny skupin příjemců Kafka

Pokud se konektor odpojí nebo odebere a přeinstaluje se stejným ID skupiny příjemců Kafka, posun skupiny příjemců (poslední pozice, ze které se ukládají zprávy čtení příjemců Kafka) ve službě Azure Event Hubs. Další informace najdete v tématu Skupina příjemců služby Event Hubs vs. Skupina příjemců Kafka.

Verze MQTT

Tento konektor používá pouze MQTT v5.

Publikování a přihlášení k odběru zpráv MQTT pomocí Azure IoT MQ Preview