Share via


Üzenetek küldése és fogadása az Azure IoT MQ Preview és az Azure Event Hubs vagy a Kafka között

Fontos

Az Azure IoT Operations Előzetes verziója – az Azure Arc által engedélyezett verzió jelenleg előzetes verzióban érhető el. Ezt az előzetes verziójú szoftvert nem szabad éles környezetben használni.

A bétaverziójú, előzetes verziójú vagy másként még általánosan nem elérhető Azure-szolgáltatások jogi feltételeit lásd: Kiegészítő használati feltételek a Microsoft Azure előzetes verziójú termékeihez.

A Kafka-összekötő leküldi az üzeneteket az Azure IoT MQ Preview MQTT-közvetítőről egy Kafka-végpontra, és hasonlóképpen lekéri az üzeneteket. Mivel az Azure Event Hubs támogatja a Kafka API-t, az összekötő beépítetten működik az Event Hubs használatával.

Event Hubs-összekötő konfigurálása Kafka-végponton keresztül

Alapértelmezés szerint az összekötő nincs telepítve az Azure IoT MQ-val. Explicit módon engedélyezni kell a témakörleképezést és a hitelesítési hitelesítő adatokat. Az alábbi lépéseket követve engedélyezheti az IoT MQ és az Azure Event Hubs közötti kétirányú kommunikációt a Kafka-végponton keresztül.

  1. Hozzon létre egy Event Hubs-névteret.

  2. Hozzon létre egy eseményközpontot minden Kafka-témakörhöz.

Az összekötő hozzáférésének biztosítása az Event Hubs-névtérhez

Az IoT MQ Arc-bővítményeknek az Event Hubs-névtérhez való hozzáférésének biztosítása a legkényelmesebb módja annak, hogy biztonságos kapcsolatot létesítsen az IoT MQ Kakfa-összekötője és az Event Hubs között.

Mentse a következő Bicep-sablont egy fájlba, és alkalmazza azt az Azure CLI-vel a környezet érvényes paramétereinek beállítása után:

Feljegyzés

A Bicep-sablon feltételezi, hogy az Arc connnected fürt és az Event Hubs névtér ugyanabban az erőforráscsoportban található, és módosítsa a sablont, ha a környezete eltérő.

@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'

resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
  name: clusterName
}

resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
  name: mqExtensionName
  scope: connectedCluster
}

resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
  name: eventHubNamespaceName
}

// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
  scope: ehNamespace
  properties: {
     // ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}

// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
  scope: ehNamespace
  properties: {
    // ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}
# Set the required environment variables

# Resource group for resources
RESOURCE_GROUP=xxx

# Bicep template files name
TEMPLATE_FILE_NAME=xxx

# MQ Arc extension name
MQ_EXTENSION_NAME=xxx

# Arc connected cluster name
CLUSTER_NAME=xxx

# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx


az deployment group create \
      --name assign-RBAC-roles \
      --resource-group $RESOURCE_GROUP \
      --template-file $TEMPLATE_FILE_NAME \
      --parameters mqExtensionName=$MQ_EXTENSION_NAME \
      --parameters clusterName=$CLUSTER_NAME \
      --parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE

Kafka Csatlakozás or

A Kafka Csatlakozás or egyéni erőforrás (CR) lehetővé teszi egy Olyan Kafka-összekötő konfigurálását, amely képes kommunikálni egy Kafka-gazdagépet és eseményközpontot. A Kafka-összekötő képes adatokat továbbítani az MQTT-témakörök és a Kafka-témakörök között az Event Hubs kafka-kompatibilis végpontként való használatával.

Az alábbi példa egy Olyan Kafka Csatlakozás or CR-t mutat be, amely az IoT MQ Azure-identitásával csatlakozik egy Event Hubs-végponthoz, feltételezi, hogy más MQ-erőforrások lettek telepítve a gyorsútmutató használatával:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
  name: my-eh-connector
  namespace: azure-iot-operations # same as one used for other MQ resources
spec:
  image:
    pullPolicy: IfNotPresent
    repository: mcr.microsoft.com/azureiotoperations/kafka
    tag: 0.4.0-preview
  instances: 2
  clientIdPrefix: my-prefix
  kafkaConnection:
    # Port 9093 is Event Hubs' Kakfa endpoint
    # Plug in your Event Hubs namespace name
    endpoint: <NAMESPACE>.servicebus.windows.net:9093
    tls:
      tlsEnabled: true
    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          # plugin in your Event Hubs namespace name
          audience: "https://<NAMESPACE>.servicebus.windows.net" 
  localBrokerConnection:
    endpoint: "aio-mq-dmqtt-frontend:8883"
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
    authentication:
      kubernetes: {}

Az alábbi táblázat a Kafka Csatlakozás or CR mezőit ismerteti:

Mező Leírás Kötelező
rendszerkép A Kafka-összekötő képe. Megadhatja a pullPolicyképet és tag a képetrepository. Az alapértelmezett értékek az előző példában jelennek meg. Igen
példányok A Futtatandó Kafka-összekötő példányainak száma. Igen
clientIdPrefix Az összekötő által használt ügyfél-azonosítóra előerősíteni kívánt sztring. Nem
kafka Csatlakozás ion Az Event Hubs-végpont kapcsolati adatai. Lásd: Kafka Csatlakozás ion. Igen
localBroker Csatlakozás ion Az alapértelmezett közvetítőkapcsolatot felülíró helyi közvetítő kapcsolati adatai. Lásd: Helyi közvetítőkapcsolat kezelése. Nem
Loglevel A Kafka-összekötő naplószintje. Lehetséges értékek: nyomkövetés, hibakeresés, információ, figyelmeztetés, hiba vagy végzetes. Az alapértelmezett beállítás figyelmeztetés. Nem

Kafka-kapcsolat

A kafkaConnection mező a Kafka-végpont kapcsolati adatait határozza meg.

Mező Leírás Kötelező
endpoint Az Event Hubs-végpont gazdagépe és portja. A port általában 9093. A bootstrap-kiszolgálók szintaxisának használatához több, vesszővel elválasztott végpontot is megadhat. Igen
Tls A TLS-titkosítás konfigurálása. Lásd: TLS. Igen
hitelesítés A hitelesítés konfigurációja. Lásd: Hitelesítés. Nem

TLS

A tls mező lehetővé teszi a TLS-titkosítást a kapcsolathoz, és opcionálisan megadja a hitelesítésszolgáltató konfigurációs leképezését.

Mező Leírás Kötelező
tlsEnabled Logikai érték, amely azt jelzi, hogy engedélyezve van-e a TLS-titkosítás. Az Event Hubs-kommunikáció esetében igaz értékre kell állítani. Igen
trustedCaCertificateConfigMap A kiszolgáló identitásának ellenőrzésére szolgáló ca-tanúsítványt tartalmazó konfigurációs térkép neve. Ez a mező nem szükséges az Event Hubs-kommunikációhoz, mivel az Event Hubs alapértelmezés szerint megbízható, jól ismert hitelesítésszolgáltatókat használ. Ezt a mezőt azonban akkor használhatja, ha egyéni hitelesítésszolgáltatói tanúsítványt szeretne használni. Nem

Megbízható hitelesítésszolgáltató megadásakor hozzon létre egy ConfigMap-et, amely A CA nyilvános bájitalját tartalmazza PEM formátumban, és adja meg a trustedCaCertificateConfigMap tulajdonság nevét.

kubectl create configmap ca-pem --from-file path/to/ca.pem

Hitelesítés

A hitelesítési mező különböző hitelesítési módszereket támogat, például SASL, X509 vagy felügyelt identitást.

Mező Leírás Kötelező
engedélyezve Logikai érték, amely azt jelzi, hogy engedélyezve van-e a hitelesítés. Igen
authType A használt hitelesítési típust tartalmazó mező. Lásd: Hitelesítési típus
Hitelesítés típusa
Mező Leírás Kötelező
sasl A SASL-hitelesítés konfigurációja. Adja meg az saslTypeegyszerű, scramSha256 vagy scramSha512 típusú titkos kulcsokat, és token hivatkozzon a jelszót tartalmazó KubernetessecretName- vagy Azure Key Vault-titkos keyVault kódra. Igen, ha SASL-hitelesítést használ
systemAssignedManagedIdentity A felügyelt identitás hitelesítésének konfigurációja. Adja meg a jogkivonat-kérelem célközönségét, amelynek meg kell egyeznie az Event Hubs névterével (https://<NAMESPACE>.servicebus.windows.net), mert az összekötő egy Kafka-ügyfél. Ha engedélyezve van, a rendszer automatikusan létrehoz és hozzárendel egy rendszer által hozzárendelt felügyelt identitást az összekötőhöz. Igen, ha felügyelt identitáshitelesítést használ
x509 Az X509-hitelesítés konfigurációja. Adja meg a vagy keyVault a secretName mezőt. A secretName mező annak a titkos kulcsnak a neve, amely az ügyféltanúsítványt és az ügyfélkulcsot tartalmazza PEM formátumban, TLS-titkos kódként tárolva. Igen, ha X509-hitelesítést használ

A Kubernetes-titkos kódok helyett az Azure Key Vault és az keyVault Azure IoT MQ titkos kulcsainak kezeléséhez lásd: Titkos kódok kezelése az Azure Key Vault vagy a Kubernetes titkos kulcsokkal.

Hitelesítés az Event Hubsban

Ha kapcsolati sztring és Kubernetes-titkos kóddal szeretne csatlakozni az Event Hubshoz, használja plain az SASL-típust$ConnectionString, valamint a felhasználónevet és a teljes kapcsolati sztring jelszóként. Először hozza létre a Kubernetes-titkos kódot:

kubectl create secret generic cs-secret -n azure-iot-operations \
  --from-literal=username='$ConnectionString' \
  --from-literal=password='Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY>'

Ezután hivatkozzon a konfiguráció titkos kódjára:

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        secretName: cs-secret

Az Azure Key Vault Kubernetes-titkos kódok helyett való használatához hozzon létre egy Azure Key Vault-titkos kulcsot a kapcsolati sztringEndpoint=sb://.., hivatkozzon rávaultSecret, és adja meg a felhasználónevet a konfigurációban leírtak szerint"$ConnectionString".

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        keyVault:
          username: "$ConnectionString"
          vault:
            name: my-key-vault
            directoryId: <AKV directory ID>
            credentials:
              servicePrincipalLocalSecretName: aio-akv-sp
          vaultSecret:
            name: my-cs # Endpoint=sb://..
            # version: 939ecc2...

A felügyelt identitás használatához adja meg az egyetlen hitelesítési módszerként. Olyan szerepkört is hozzá kell rendelnie a felügyelt identitáshoz, amely engedélyt ad arra, hogy üzeneteket küldjön és fogadjon az Event Hubstól, például az Azure Event Hubs-adattulajdonostól vagy az Azure Event Hubs-adatküldőtől/-fogadótól. További információ: Alkalmazás hitelesítése Microsoft Entra-azonosítóval az Event Hubs-erőforrások eléréséhez.

authentication:
  enabled: true
  authType:
    systemAssignedManagedIdentity:
      audience: https://<NAMESPACE>.servicebus.windows.net
X.509

X.509 esetén használja a nyilvános tanúsítványt és a titkos kulcsot tartalmazó Kubernetes TLS-titkos kulcsot.

kubectl create secret tls my-tls-secret -n azure-iot-operations \
  --cert=path/to/cert/file \
  --key=path/to/key/file

Ezután adja meg a secretName konfigurációt.

authentication:
  enabled: true
  authType:
    x509:
      secretName: my-tls-secret

Az Azure Key Vault használatához ellenőrizze, hogy a tanúsítvány és a titkos kulcs megfelelően van-e importálva , majd adja meg a hivatkozást a következővel vaultCert: .

authentication:
  enabled: true
  authType:
    x509:
      keyVault:
        vault:
          name: my-key-vault
          directoryId: <AKV directory ID>
          credentials:
            servicePrincipalLocalSecretName: aio-akv-sp
        vaultCert:
          name: my-cert
          # version: 939ecc2...
        ## If presenting full chain also  
        # vaultCaChainSecret:
        #   name: my-chain

Vagy ha a teljes lánc bemutatására van szükség, töltse fel a teljes lánc tanúsítványát és kulcsát az AKV-ba PFX-fájlként, és használja helyette a vaultCaChainSecret mezőt.

# ...
keyVault:
  vaultCaChainSecret:
    name: my-cert
    # version: 939ecc2...

Helyi közvetítőkapcsolat kezelése

Az MQTT-hídhoz hasonlóan az Event Hubs-összekötő is az IoT MQ MQTT-közvetítő ügyfeleként működik. Ha testre szabta az IoT MQ MQTT-közvetítő figyelőportját és/vagy hitelesítését, felülbírálja az Event Hubs-összekötő helyi MQTT-kapcsolatkonfigurációját is. További információ: MQTT-híd helyi közvetítőkapcsolata.

Kafka Csatlakozás orTopicMap

A Kafka Csatlakozás orTopicMap egyéni erőforrás (CR) segítségével meghatározhatja az MQTT-témakörök és a Kafka-témakörök közötti leképezést a kétirányú adatátvitelhez. Adjon meg egy hivatkozást a Kafka Csatlakozás or CR-ra és az útvonalak listájára. Minden útvonal lehet MQTT–Kafka útvonal vagy Kafka–MQTT útvonal. Példa:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
  name: my-eh-topic-map
  namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
  kafkaConnectorRef: my-eh-connector
  compression: snappy
  batching:
    enabled: true
    latencyMs: 1000
    maxMessages: 100
    maxBytes: 1024
  partitionStrategy: property
  partitionKeyProperty: device-id
  copyMqttProperties: true
  routes:
    # Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
    - mqttToKafka:
        name: "route1"
        mqttTopic: temperature-alerts/#
        kafkaTopic: receiving-event-hub
        kafkaAcks: one
        qos: 1
        sharedSubscription:
          groupName: group1
          groupMinimumShareNumber: 3
    # Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
    - kafkaToMqtt:
        name: "route2"
        consumerGroupId: mqConnector
        kafkaTopic: sending-event-hub
        mqttTopic: heater-commands
        qos: 0

Az alábbi táblázat a Kafka Csatlakozás orTopicMap CR mezőit ismerteti:

Mező Leírás Kötelező
kafka Csatlakozás orRef Annak a Kafka Csatlakozás or CR-nek a neve, amelyhez ez a témakörtérkép tartozik. Igen
tömörítés A Kafka-témaköröknek küldött üzenetek tömörítési konfigurációja. Lásd: Tömörítés. Nem
Adagoló A Kafka-témaköröknek küldött üzenetek kötegelésének konfigurációja. Lásd: Kötegelés. Nem
partitionStrategy A Kafka-partíciók Kafka-témakörökbe küldött üzenetek kezelésére szolgáló stratégia. Lásd: Partíciókezelési stratégia. Nem
copyMqttProperties Logikai érték annak ellenőrzéséhez, hogy az MQTT rendszer és a felhasználói tulajdonságok át lesznek-e másolva a Kafka üzenetfejlécére. A rendszer a felhasználói tulajdonságokat is átmásolja. Néhány átalakítás a rendszertulajdonságokkal történik. Alapértelmezés szerint hamis. Nem
Útvonalak Az MQTT-témakörök és a Kafka-témakörök közötti adatátvitel útvonalainak listája. Minden útvonal rendelkezhet egy mqttToKafka vagy egy kafkaToMqtt mezővel az adatátvitel irányától függően. Lásd: Útvonalak. Igen

Tömörítés

A tömörítési mező lehetővé teszi a Kafka-témaköröknek küldött üzenetek tömörítését. A tömörítés segít csökkenteni az adatátvitelhez szükséges hálózati sávszélességet és tárhelyet. A tömörítés azonban némi többletterhelést és késést is hozzáad a folyamathoz. A támogatott tömörítési típusok az alábbi táblázatban találhatók.

Érték Leírás
Nincs A rendszer nem alkalmaz tömörítést vagy kötegelést. nincs az alapértelmezett érték, ha nincs megadva tömörítés.
gzip A rendszer GZIP-tömörítést és kötegelést alkalmaz. A GZIP egy általános célú tömörítési algoritmus, amely jó egyensúlyt kínál a tömörítési arány és a sebesség között.
Snappy A rendszer dokkolótömörítést és kötegelést alkalmaz. A Snappy egy gyors tömörítési algoritmus, amely mérsékelt tömörítési arányt és sebességet kínál.
lz4 A rendszer LZ4-tömörítést és kötegelést alkalmaz. Az LZ4 egy gyors tömörítési algoritmus, amely alacsony tömörítési arányt és nagy sebességet kínál.

Kötegelés

A tömörítés mellett konfigurálhatja az üzenetek kötegelését is, mielőtt elküldené őket a Kafka-témakörökbe. A kötegelés lehetővé teszi több üzenet csoportosítását és egyetlen egységként való tömörítését, ami javíthatja a tömörítés hatékonyságát és csökkentheti a hálózati terhelést.

Mező Leírás Kötelező
engedélyezve Logikai érték, amely azt jelzi, hogy engedélyezve van-e a kötegelés. Ha nincs beállítva, az alapértelmezett érték hamis. Igen
késésIM-ek Az üzenetek elküldése előtt pufferelhető maximális időintervallum ezredmásodpercben. Ha eléri ezt az időközt, a rendszer az összes pufferelt üzenetet kötegként küldi el, függetlenül attól, hogy hány vagy mekkora. Ha nincs beállítva, az alapértelmezett érték 5. Nem
maxMessages Az elküldött üzenetek maximális száma, amely pufferelhető. Ha eléri ezt a számot, a rendszer az összes pufferelt üzenetet kötegként küldi el, függetlenül attól, hogy mekkora méretűek vagy mennyi ideig vannak pufferelve. Ha nincs beállítva, az alapértelmezett érték 100000. Nem
maxBytes Az elküldés előtt pufferelhető bájtok maximális mérete. Ha eléri ezt a méretet, a rendszer az összes pufferelt üzenetet kötegként küldi el, függetlenül attól, hogy hány vagy mennyi ideig vannak pufferelve. Az alapértelmezett érték 10000000 (1 MB). Nem

Példa a kötegelés használatára:

batching:
  enabled: true
  latencyMs: 1000
  maxMessages: 100
  maxBytes: 1024

Ez azt jelenti, hogy az üzeneteket akkor küldi el a rendszer, ha a pufferben 100 üzenet található, vagy ha a pufferben 1024 bájt található, vagy amikor az utolsó küldés óta 1000 ezredmásodperc telt el, attól függően, hogy melyik az első.

Partíciókezelési stratégia

A partíciókezelési stratégia egy olyan funkció, amely lehetővé teszi az üzenetek Kafka-partíciókhoz való hozzárendelésének szabályozását a Kafka-témakörökbe való küldéskor. A Kafka-partíciók egy Kafka-témakör logikai szegmensei, amelyek lehetővé teszik a párhuzamos feldolgozást és a hibatűrést. A Kafka-témakörök minden üzenete rendelkezik egy partícióval és egy eltolással, amely az üzenetek azonosítására és sorrendjére szolgál.

A Kafka-összekötő alapértelmezés szerint ciklikus időszeleteléses algoritmussal rendel üzeneteket véletlenszerű partíciókhoz. Azonban különböző stratégiákat használhat arra, hogy üzeneteket rendeljen a partíciókhoz bizonyos feltételek alapján, például az MQTT-témakör neve vagy egy MQTT üzenettulajdonság alapján. Ez segíthet a terheléselosztás, az adatok helyének vagy az üzenetek sorrendjének jobb elérésében.

Érték Leírás
alapértelmezett Ciklikus időszeleteléses algoritmussal rendeli hozzá az üzeneteket véletlenszerű partíciókhoz. Ez az alapértelmezett érték, ha nincs megadva stratégia.
static Üzeneteket rendel hozzá egy rögzített partíciószámhoz, amely az összekötő példányazonosítójából származik. Ez azt jelenti, hogy minden összekötőpéldány egy másik partícióra küld üzeneteket. Ez segíthet a terheléselosztás és az adat helyének jobb elérésében.
témakör A particionálás kulcsa az MQTT-témakör neve. Ez azt jelenti, hogy az azonos MQTT-témakörnévvel rendelkező üzeneteket a rendszer ugyanarra a partícióra küldi. Ez segíthet a jobb üzenetrendezésben és az adatok helyének javításában.
tulajdonság A particionálás kulcsa egy MQTT üzenettulajdonság. Adja meg a tulajdonság nevét a partitionKeyProperty mezőben. Ez azt jelenti, hogy az azonos tulajdonságértékkel rendelkező üzeneteket a rendszer ugyanarra a partícióra küldi. Ez segíthet az üzenetsorrendezés és az adatok helyének jobb elérésében egy egyéni feltétel alapján.

Példa a partíciókezelési stratégia használatára:

partitionStrategy: property
partitionKeyProperty: device-id

Ez azt jelenti, hogy az azonos eszközazonosító tulajdonsággal rendelkező üzeneteket a rendszer ugyanarra a partícióra küldi.

Útvonalak

Az Útvonalak mező meghatározza az MQTT-témakörök és a Kafka-témakörök közötti adatátvitel útvonalainak listáját. Minden útvonal rendelkezhet egy mqttToKafka vagy egy kafkaToMqtt mezővel az adatátvitel irányától függően.

MQTT–Kafka

A mqttToKafka mező egy útvonalat határoz meg, amely adatokat továbbít egy MQTT-témakörből egy Kafka-témakörbe.

Mező Leírás Szükséges
név Az útvonal egyedi neve. Igen
mqttTopic A feliratkozni kívánt MQTT-témakör. Több témakörhöz helyettesítő karaktereket (# és +) is használhat. Igen
kafkaTopic A Kafka-témakör, amelybe elküldendő. Igen
kafkaAcks Az összekötő által a Kafka-végponttól igényelt nyugtázások száma. A lehetséges értékek a következők: zero , onevagy all. Nem
Qos Az MQTT-témakör-előfizetés szolgáltatásminőségi (QoS) szintje. Lehetséges értékek: 0 vagy 1 (alapértelmezett). A QoS 2 jelenleg nem támogatott. Igen
sharedSubscription A megosztott előfizetések MQTT-témakörökhöz való használatára vonatkozó konfiguráció. Adja meg az groupNameelőfizetők egy csoportjának egyedi azonosítóját, és a groupMinimumShareNumber, amely egy olyan csoport előfizetőinek száma, amely egy témakörből érkező üzeneteket fogad. Ha például a groupName a "group1" és a groupMinimumShareNumber értéke 3, akkor az összekötő három azonos csoportnévvel rendelkező előfizetőt hoz létre, hogy üzeneteket fogadjon egy témakörből. Ezzel a funkcióval üzeneteket oszthat ki több előfizető között anélkül, hogy elveszítené az üzeneteket, vagy duplikációkat hoz létre. Nem

Példa az útvonal használatára mqttToKafka :

mqttToKafka:
  mqttTopic: temperature-alerts/#
  kafkaTopic: receiving-event-hub
  kafkaAcks: one
  qos: 1
  sharedSubscription:
    groupName: group1
    groupMinimumShareNumber: 3

Ebben a példában a hőmérséklet-riasztásoknak/#-nak megfelelő MQTT-témakörökből érkező üzeneteket a rendszer a Kafka-témakör fogadó-esemény-központba küldi, amelynek QoS-ekvivalens 1- és megosztott előfizetési csoportja "group1" a 3- os megosztási számmal.

Kafka–MQTT

A kafkaToMqtt mező egy olyan útvonalat határoz meg, amely adatokat továbbít egy Kafka-témakörből egy MQTT-témakörbe.

Mező Leírás Szükséges
név Az útvonal egyedi neve. Igen
kafkaTopic A Kafka-témakör, amelyből lekérhet. Igen
mqttTopic A közzéteendő MQTT-témakör. Igen
consumerGroupId Az egyes Kafka-MQTT-útvonalak fogyasztói csoportazonosítójának előtagja. Ha nincs beállítva, a fogyasztói csoport azonosítója megegyezik az útvonal nevével. Nem
Qos Az MQTT-témakörben közzétett üzenetek szolgáltatásminőségi (QoS) szintje. Lehetséges értékek: 0 vagy 1 (alapértelmezett). A QoS 2 jelenleg nem támogatott. Ha a QoS értéke 1, az összekötő közzéteszi az üzenetet az MQTT-témakörben, majd megvárja az üzenetet, mielőtt véglegesíti az üzenetet a Kafkának. A QoS 0 esetében az összekötő azonnal, MQTT ack nélkül véglegesíti a véglegesítést. Nem

Példa az útvonal használatára kafkaToMqtt :

kafkaToMqtt:
  kafkaTopic: sending-event-hub
  mqttTopic: heater-commands
  qos: 0

Ebben a példában a Kafka topic sending-event-hub* üzenetei a 0. QoS-szintű MQTT-témakörfűtő-parancsokban jelennek meg.

Az eseményközpont nevének meg kell egyeznie a Kafka-témakörnek

Minden egyes eseményközpontnak, amely nem a névtér, pontosan ugyanúgy kell elneveznie, mint az útvonalakban megadott tervezett Kafka-témakörnek. Emellett a kapcsolati sztring EntityPath egyeznie kell, ha kapcsolati sztring hatóköre egy eseményközpontra terjed ki. Ennek a követelménynek az az oka, hogy az Event Hubs-névtér hasonló a Kafka-fürthöz, és az eseményközpont neve hasonló egy Kafka-témakörhöz, ezért a Kafka-témakör nevének meg kell egyeznie az eseményközpont nevével.

Kafka fogyasztói csoport eltolásai

Ha az összekötő leválasztja vagy eltávolítja és újratelepíti ugyanazzal a Kafka fogyasztói csoportazonosítóval, a fogyasztói csoport eltolása (az utolsó hely, ahonnan a Kafka-fogyasztó olvassa az üzeneteket) az Azure Event Hubsban lesz tárolva. További információ: Event Hubs fogyasztói csoport és Kafka fogyasztói csoport.

MQTT-verzió

Ez az összekötő csak MQTT v5-öt használ.

MQTT-üzenetek közzététele és feliratkozása az Azure IoT MQ előzetes verziójával