Udostępnij za pośrednictwem


Wysyłanie i odbieranie komunikatów między usługą Azure IoT MQ w wersji zapoznawczej a usługą Azure Event Hubs lub platformą Kafka

Ważne

Usługa Azure IoT Operations Preview — włączona przez usługę Azure Arc jest obecnie dostępna w wersji zapoznawczej. Nie należy używać tego oprogramowania w wersji zapoznawczej w środowiskach produkcyjnych.

Zobacz Dodatkowe warunki użytkowania wersji zapoznawczych platformy Microsoft Azure, aby zapoznać się z postanowieniami prawnymi dotyczącymi funkcji platformy Azure, które są w wersji beta lub wersji zapoznawczej albo w inny sposób nie zostały jeszcze wydane jako ogólnie dostępne.

Łącznik platformy Kafka wypycha komunikaty z brokera MQTT usługi Azure IoT MQQ w wersji zapoznawczej do punktu końcowego platformy Kafka i podobnie ściąga komunikaty w inny sposób. Ponieważ usługa Azure Event Hubs obsługuje interfejs API platformy Kafka, łącznik działa gotowe do użycia z usługą Event Hubs.

Konfigurowanie łącznika usługi Event Hubs za pośrednictwem punktu końcowego platformy Kafka

Domyślnie łącznik nie jest instalowany z usługą Azure IoT MQ. Musi być jawnie włączona z określonymi poświadczeniami mapowania tematów i uwierzytelniania. Wykonaj następujące kroki, aby włączyć dwukierunkową komunikację między usługą IoT MQ i usługą Azure Event Hubs za pośrednictwem punktu końcowego platformy Kafka.

  1. Utwórz przestrzeń nazw usługi Event Hubs.

  2. Utwórz centrum zdarzeń dla każdego tematu platformy Kafka.

Udzielanie łącznikowi dostępu do przestrzeni nazw usługi Event Hubs

Udzielanie dostępu rozszerzenia usługi IoT MQ Arc do przestrzeni nazw usługi Event Hubs to najwygodniejszy sposób nawiązywania bezpiecznego połączenia z łącznika Kakfa usługi IoT do usługi Event Hubs.

Zapisz następujący szablon Bicep w pliku i zastosuj go za pomocą interfejsu wiersza polecenia platformy Azure po ustawieniu prawidłowych parametrów dla środowiska:

Uwaga

Szablon Bicep zakłada, że klaster usługi Arc connnected i przestrzeń nazw usługi Event Hubs znajdują się w tej samej grupie zasobów, dostosuj szablon, jeśli środowisko jest inne.

@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'

resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
  name: clusterName
}

resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
  name: mqExtensionName
  scope: connectedCluster
}

resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
  name: eventHubNamespaceName
}

// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
  scope: ehNamespace
  properties: {
     // ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}

// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
  scope: ehNamespace
  properties: {
    // ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}
# Set the required environment variables

# Resource group for resources
RESOURCE_GROUP=xxx

# Bicep template files name
TEMPLATE_FILE_NAME=xxx

# MQ Arc extension name
MQ_EXTENSION_NAME=xxx

# Arc connected cluster name
CLUSTER_NAME=xxx

# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx


az deployment group create \
      --name assign-RBAC-roles \
      --resource-group $RESOURCE_GROUP \
      --template-file $TEMPLATE_FILE_NAME \
      --parameters mqExtensionName=$MQ_EXTENSION_NAME \
      --parameters clusterName=$CLUSTER_NAME \
      --parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE

KafkaConnector

Zasób niestandardowy platformy KafkaConnector (CR) umożliwia skonfigurowanie łącznika platformy Kafka, który może komunikować się z hostem platformy Kafka i usługą Event Hubs. Łącznik platformy Kafka może przesyłać dane między tematami MQTT i tematami platformy Kafka przy użyciu usługi Event Hubs jako punktu końcowego zgodnego z platformą Kafka.

Poniższy przykład przedstawia cr platformy KafkaConnector , który łączy się z punktem końcowym usługi Event Hubs przy użyciu różnych typów uwierzytelniania. Przyjęto założenie, że inne zasoby MQ zostały zainstalowane przy użyciu przewodnika Szybki start:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
  name: my-eh-connector
  namespace: azure-iot-operations # same as one used for other MQ resources
spec:
  image:
    pullPolicy: IfNotPresent
    repository: mcr.microsoft.com/azureiotoperations/kafka
    tag: 0.4.0-preview
  instances: 2
  clientIdPrefix: my-prefix
  kafkaConnection:
    # Port 9093 is Event Hubs' Kakfa endpoint
    # Plug in your Event Hubs namespace name
    endpoint: <NAMESPACE>.servicebus.windows.net:9093
    tls:
      tlsEnabled: true
    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          # plugin in your Event Hubs namespace name
          audience: "https://<NAMESPACE>.servicebus.windows.net" 
  localBrokerConnection:
    endpoint: "aio-mq-dmqtt-frontend:8883"
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
    authentication:
      kubernetes: {}

W poniższej tabeli opisano pola w zasobie niestandardowym platformy KafkaConnector:

Pole opis Wymagania
obraz Obraz łącznika platformy Kafka. Możesz określić wartości pullPolicy, repositoryi tag obrazu. Wartości domyślne są wyświetlane w poprzednim przykładzie. Tak
wystąpienia Liczba wystąpień łącznika platformy Kafka do uruchomienia. Tak
clientIdPrefix Ciąg, który ma być poprzedzony identyfikatorem klienta używanym przez łącznik. Nie.
kafkaConnection Szczegóły połączenia punktu końcowego usługi Event Hubs. Zobacz Połączenie platformy Kafka. Tak
localBrokerConnection Szczegóły połączenia lokalnego brokera, który zastępuje domyślne połączenie brokera. Zobacz Zarządzanie lokalnym połączeniem brokera. Nie.
logLevel Poziom dziennika łącznika platformy Kafka. Możliwe wartości to: ślad, debugowanie, informacje, ostrzeżenie, błąd lub krytyczny. Wartość domyślna to ostrzeżenie. Nie.

Połączenie platformy Kafka

Pole kafkaConnection definiuje szczegóły połączenia punktu końcowego platformy Kafka.

Pole opis Wymagania
endpoint Host i port punktu końcowego usługi Event Hubs. Port jest zazwyczaj 9093. Można określić wiele punktów końcowych rozdzielonych przecinkami, aby użyć składni serwerów bootstrap. Tak
tls Konfiguracja szyfrowania TLS. Zobacz TLS. Tak
uwierzytelnianie Konfiguracja uwierzytelniania. Zobacz Uwierzytelnianie. Nie.

TLS

Pole tls włącza szyfrowanie TLS dla połączenia i opcjonalnie określa mapę konfiguracji urzędu certyfikacji.

Pole opis Wymagania
tlsEnabled Wartość logiczna wskazująca, czy szyfrowanie TLS jest włączone, czy nie. Dla komunikacji z usługą Event Hubs musi być ustawiona wartość true. Tak
trustedCaCertificateConfigMap Nazwa mapy konfiguracji zawierającej certyfikat urzędu certyfikacji do weryfikowania tożsamości serwera. To pole nie jest wymagane w przypadku komunikacji z usługą Event Hubs, ponieważ usługa Event Hubs domyślnie używa dobrze znanych urzędów certyfikacji, które są domyślnie zaufane. Można jednak użyć tego pola, jeśli chcesz użyć niestandardowego certyfikatu urzędu certyfikacji. Nie.

Podczas określania zaufanego urzędu certyfikacji należy utworzyć obiekt ConfigMap zawierający publiczną eliksir urzędu certyfikacji w formacie PEM i określić nazwę we trustedCaCertificateConfigMap właściwości .

kubectl create configmap ca-pem --from-file path/to/ca.pem

Uwierzytelnianie

Pole uwierzytelniania obsługuje różne typy metod uwierzytelniania, takich jak SASL, X509 lub tożsamość zarządzana.

Pole opis Wymagania
enabled Wartość logiczna wskazująca, czy uwierzytelnianie jest włączone, czy nie. Tak
authType Pole zawierające używany typ uwierzytelniania. Zobacz Typ uwierzytelniania Tak
Typ uwierzytelniania
Pole opis Wymagania
sasl Konfiguracja uwierzytelniania SASL. Określ element saslType, który może być zwykły, scramSha256 lub scramSha512, oraz token odwołać się do wpisu tajnego platformy Kubernetes secretName lub usługi Azure Key Vault keyVault zawierającego hasło. Tak, jeśli używasz uwierzytelniania SASL
systemAssignedManagedIdentity Konfiguracja uwierzytelniania tożsamości zarządzanej. Określ odbiorców żądania tokenu, który musi być zgodny z przestrzenią nazw usługi Event Hubs (https://<NAMESPACE>.servicebus.windows.net), ponieważ łącznik jest klientem platformy Kafka. Tożsamość zarządzana przypisana przez system jest tworzona automatycznie i przypisywana do łącznika po jej włączeniu. Tak, jeśli używasz uwierzytelniania tożsamości zarządzanej
x509 Konfiguracja uwierzytelniania X509. secretName Określ pole lubkeyVault. Pole secretName to nazwa wpisu tajnego, który zawiera certyfikat klienta i klucz klienta w formacie PEM, przechowywany jako wpis tajny PROTOKOŁU TLS. Tak, jeśli używasz uwierzytelniania X509

Aby dowiedzieć się, jak używać usługi Azure Key Vault i poleceń keyVault do zarządzania wpisami tajnymi dla usługi Azure IoT MQ zamiast wpisów tajnych platformy Kubernetes, zobacz Manage secrets using Azure Key Vault or Kubernetes secrets (Zarządzanie wpisami tajnymi przy użyciu usługi Azure Key Vault lub Kubernetes).

Uwierzytelnianie w usłudze Event Hubs

Aby użyć tożsamości zarządzanej, określ ją jako jedyną metodę w ramach uwierzytelniania. Musisz również przypisać rolę do tożsamości zarządzanej, która udziela uprawnień do wysyłania i odbierania komunikatów z usługi Event Hubs, takich jak właściciel danych usługi Azure Event Hubs lub nadawca/odbiornik danych usługi Azure Event Hubs. Aby dowiedzieć się więcej, zobacz Uwierzytelnianie aplikacji przy użyciu identyfikatora Entra firmy Microsoft w celu uzyskania dostępu do zasobów usługi Event Hubs.

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
  name: my-eh-connector
  namespace: azure-iot-operations # same as one used for other MQ resources
spec:
  image:
    pullPolicy: IfNotPresent
    repository: mcr.microsoft.com/azureiotoperations/kafka
    tag: 0.4.0-preview
  instances: 2
  clientIdPrefix: my-prefix
  kafkaConnection:
    # Port 9093 is Event Hubs' Kakfa endpoint
    # Plug in your Event Hubs namespace name
    endpoint: <NAMESPACE>.servicebus.windows.net:9093
    tls:
      tlsEnabled: true
    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          # plugin in your Event Hubs namespace name
          audience: "https://<NAMESPACE>.servicebus.windows.net" 
  localBrokerConnection:
    endpoint: "aio-mq-dmqtt-frontend:8883"
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
    authentication:
      kubernetes: {}

Zarządzanie lokalnym połączeniem brokera

Podobnie jak mostek MQTT, łącznik usługi Event Hubs działa jako klient brokera MQTT IoT MQTT. Jeśli dostosowano port odbiornika i/lub uwierzytelnianie brokera IoT MQTT, przesłoń również lokalną konfigurację połączenia MQTT dla łącznika usługi Event Hubs. Aby dowiedzieć się więcej, zobacz Połączenie lokalnego brokera mostka MQTT.

KafkaConnectorTopicMap

Zasób niestandardowy platformy KafkaConnectorTopicMap (CR) umożliwia zdefiniowanie mapowania między tematami MQTT i tematami platformy Kafka na potrzeby dwukierunkowego transferu danych. Określ odwołanie do cr platformy KafkaConnector i listę tras. Każda trasa może być trasą MQTT na platformie Kafka lub trasą platformy Kafka do MQTT. Na przykład:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
  name: my-eh-topic-map
  namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
  kafkaConnectorRef: my-eh-connector
  compression: none
  batching:
    enabled: true
    latencyMs: 1000
    maxMessages: 100
    maxBytes: 1024
  partitionStrategy: property
  partitionKeyProperty: device-id
  copyMqttProperties: true
  routes:
    # Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
    - mqttToKafka:
        name: "route1"
        mqttTopic: temperature-alerts/#
        kafkaTopic: receiving-event-hub
        kafkaAcks: one
        qos: 1
        sharedSubscription:
          groupName: group1
          groupMinimumShareNumber: 3
    # Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
    - kafkaToMqtt:
        name: "route2"
        consumerGroupId: mqConnector
        kafkaTopic: sending-event-hub
        mqttTopic: heater-commands
        qos: 0

W poniższej tabeli opisano pola w pliku CR platformy KafkaConnectorTopicMap:

Pole opis Wymagania
kafkaConnectorRef Nazwa cr platformy KafkaConnector, do którego należy ta mapa tematu. Tak
kompresja Konfiguracja kompresji komunikatów wysyłanych do tematów platformy Kafka. Zobacz Kompresja. Nie.
przetwarzanie wsadowe Konfiguracja przetwarzania wsadowego komunikatów wysyłanych do tematów platformy Kafka. Zobacz Batching (Przetwarzanie wsadowe). Nie.
partitionStrategy Strategia obsługi partycji platformy Kafka podczas wysyłania komunikatów do tematów platformy Kafka. Zobacz Strategia obsługi partycji. Nie.
copyMqttProperties Wartość logiczna do kontrolowania, czy system MQTT i właściwości użytkownika są kopiowane do nagłówka komunikatu platformy Kafka. Właściwości użytkownika są kopiowane zgodnie z rzeczywistym użyciem. Niektóre przekształcenia są wykonywane z właściwościami systemu. Wartość domyślna to false. Nie.
routes Lista tras transferu danych między tematami MQTT i tematami platformy Kafka. Każda trasa może mieć mqttToKafka pole lub kafkaToMqtt w zależności od kierunku transferu danych. Zobacz Trasy. Tak

Kompresja

Pole kompresji umożliwia kompresję komunikatów wysyłanych do tematów platformy Kafka. Kompresja pomaga zmniejszyć przepustowość sieci i miejsce do magazynowania wymagane do transferu danych. Jednak kompresja dodaje również pewne obciążenie i opóźnienie do procesu. Wartości i obsługa typów kompresji są wymienione w poniższej tabeli.

Wartość Opis Obsługiwane
Brak Nie zastosowano kompresji ani dzielenia na partie. brak jest wartością domyślną, jeśli nie określono kompresji. Tak
gzip Zastosowano kompresję GZIP i przetwarzanie wsadowe. GZIP to algorytm kompresji ogólnego przeznaczenia, który zapewnia dobrą równowagę między współczynnikiem kompresji a szybkością. Tak. Warstwa cenowa usługi Event Hubs Premium jest wymagana do kompresji GZIP.
Żwawy Zastosowano kompresję Snappy i przetwarzanie wsadowe. Snappy to szybki algorytm kompresji, który oferuje umiarkowany współczynnik kompresji i szybkość. Usługa Azure Event Hubs nie jest obsługiwana. Korzystanie z platformy Apache Kafka.
lz4 Stosuje się kompresję LZ4 i przetwarzanie wsadowe. LZ4 to szybki algorytm kompresji, który oferuje niski współczynnik kompresji i dużą szybkość. Usługa Azure Event Hubs nie jest obsługiwana. Korzystanie z platformy Apache Kafka.

Dzielenie na partie

Oprócz kompresji można również skonfigurować przetwarzanie wsadowe dla komunikatów przed wysłaniem ich do tematów platformy Kafka. Przetwarzanie wsadowe pozwala grupować wiele komunikatów i kompresować je jako pojedynczą jednostkę, co może zwiększyć wydajność kompresji i zmniejszyć obciążenie sieci.

Pole opis Wymagania
enabled Wartość logiczna wskazująca, czy przetwarzanie wsadowe jest włączone, czy nie. Jeśli nie zostanie ustawiona, wartość domyślna to false. Tak
latencyMs Maksymalny przedział czasu w milisekundach, który można buforować przed wysłaniem komunikatów. Jeśli ten interwał zostanie osiągnięty, wszystkie buforowane komunikaty są wysyłane jako partia, niezależnie od liczby lub wielkości tych komunikatów. Jeśli nie zostanie ustawiona, wartość domyślna to 5. Nie.
maxMessages Maksymalna liczba komunikatów, które można buforować przed wysłaniem. Jeśli ta liczba zostanie osiągnięta, wszystkie buforowane komunikaty są wysyłane jako partia, niezależnie od tego, jak duże są lub jak długo są buforowane. Jeśli nie zostanie ustawiona, wartość domyślna to 100000. Nie.
maxBytes Maksymalny rozmiar bajtów, który można buforować przed wysłaniem. Jeśli ten rozmiar zostanie osiągnięty, wszystkie buforowane komunikaty są wysyłane jako partia, niezależnie od tego, ile są lub jak długo są buforowane. Wartość domyślna to 10000000 (1 MB). Nie.

Przykładem użycia przetwarzania wsadowego jest:

batching:
  enabled: true
  latencyMs: 1000
  maxMessages: 100
  maxBytes: 1024

Oznacza to, że komunikaty są wysyłane, gdy w buforze znajduje się 100 komunikatów lub gdy w buforze znajduje się 1024 bajty, albo gdy upłynie 1000 milisekund od ostatniego wysłania, w zależności od tego, co nastąpi wcześniej.

Strategia obsługi partycji

Strategia obsługi partycji to funkcja umożliwiająca kontrolowanie sposobu przypisywanego komunikatów do partycji platformy Kafka podczas wysyłania ich do tematów platformy Kafka. Partycje platformy Kafka to logiczne segmenty tematu platformy Kafka, które umożliwiają równoległe przetwarzanie i odporność na uszkodzenia. Każdy komunikat w temacie platformy Kafka ma partycję i przesunięcie, które są używane do identyfikowania i porządkowenia komunikatów.

Domyślnie łącznik platformy Kafka przypisuje komunikaty do losowych partycji przy użyciu algorytmu działania okrężnego. Można jednak użyć różnych strategii, aby przypisać komunikaty do partycji na podstawie niektórych kryteriów, takich jak nazwa tematu MQTT lub właściwość komunikatu MQTT. Może to pomóc w lepszej równoważeniu obciążenia, lokalności danych lub porządkoweniu komunikatów.

Wartość Opis
domyślna Przypisuje komunikaty do losowych partycji przy użyciu algorytmu działania okrężnego. Jest to wartość domyślna, jeśli nie określono żadnej strategii.
static Przypisuje komunikaty do stałego numeru partycji pochodzącego z identyfikatora wystąpienia łącznika. Oznacza to, że każde wystąpienie łącznika wysyła komunikaty do innej partycji. Może to pomóc w osiągnięciu lepszego równoważenia obciążenia i lokalizacji danych.
topic Używa nazwy tematu MQTT jako klucza do partycjonowania. Oznacza to, że komunikaty o tej samej nazwie tematu MQTT są wysyłane do tej samej partycji. Może to pomóc w osiągnięciu lepszej kolejności komunikatów i lokalizacji danych.
właściwość Używa właściwości komunikatu MQTT jako klucza do partycjonowania. Określ nazwę właściwości w partitionKeyProperty polu. Oznacza to, że komunikaty o tej samej wartości właściwości są wysyłane do tej samej partycji. Może to pomóc w lepszej kolejności komunikatów i lokalności danych na podstawie kryterium niestandardowego.

Przykładem użycia strategii obsługi partycji jest:

partitionStrategy: property
partitionKeyProperty: device-id

Oznacza to, że komunikaty z tą samą właściwością device-id są wysyłane do tej samej partycji.

Trasy

Pole routes definiuje listę tras na potrzeby transferu danych między tematami MQTT i tematami platformy Kafka. Każda trasa może mieć mqttToKafka pole lub kafkaToMqtt w zależności od kierunku transferu danych.

MQTT do platformy Kafka

Pole mqttToKafka definiuje trasę, która przesyła dane z tematu MQTT do tematu platformy Kafka.

Pole opis Wymagania
name Unikatowa nazwa trasy. Tak
mqttTopic Temat MQTT do subskrybowania. Można użyć symboli wieloznacznych (# i +) do dopasowania wielu tematów. Tak
kafkaTopic Temat platformy Kafka do wysłania. Tak
kafkaAcks Liczba potwierdzań, których łącznik wymaga z punktu końcowego platformy Kafka. Możliwe wartości to: zero , onelub all. Nie.
qos Poziom jakości usług (QoS) dla subskrypcji tematu MQTT. Możliwe wartości to: 0 lub 1 (wartość domyślna). Usługa QoS 2 nie jest obecnie obsługiwana. Tak
sharedSubscription Konfiguracja korzystania z subskrypcji udostępnionych dla tematów MQTT. groupNameOkreśl element , który jest unikatowym identyfikatorem grupy subskrybentów, i groupMinimumShareNumber, czyli liczbą subskrybentów w grupie, która odbiera komunikaty z tematu. Jeśli na przykład parametr groupName to "group1", a parametr groupMinimumShareNumber to 3, łącznik tworzy trzech subskrybentów o tej samej nazwie grupy w celu odbierania komunikatów z tematu. Ta funkcja umożliwia dystrybucję komunikatów między wieloma subskrybentami bez utraty komunikatów ani tworzenia duplikatów. Nie.

Przykład użycia mqttToKafka trasy:

mqttToKafka:
  mqttTopic: temperature-alerts/#
  kafkaTopic: receiving-event-hub
  kafkaAcks: one
  qos: 1
  sharedSubscription:
    groupName: group1
    groupMinimumShareNumber: 3

W tym przykładzie komunikaty z tematów MQTT, które pasują do alertów temperatury/# są wysyłane do tematu platformy Kafka odbierającego centrum zdarzeń z równoważną funkcją QoS 1 i udostępnioną grupą subskrypcji "group1" z numerem 3.

Kafka do MQTT

Pole kafkaToMqtt definiuje trasę, która przesyła dane z tematu platformy Kafka do tematu MQTT.

Pole opis Wymagania
name Unikatowa nazwa trasy. Tak
kafkaTopic Temat platformy Kafka do pobrania. Tak
mqttTopic Temat MQTT do opublikowania. Tak
consumerGroupId Prefiks identyfikatora grupy odbiorców dla każdej trasy platformy Kafka do MQTT. Jeśli nie zostanie ustawiona, identyfikator grupy odbiorców jest ustawiony na taki sam jak nazwa trasy. Nie.
qos Poziom jakości usług (QoS) dla komunikatów opublikowanych w temacie MQTT. Możliwe wartości to 0 lub 1 (wartość domyślna). Usługa QoS 2 nie jest obecnie obsługiwana. Jeśli wartość QoS jest ustawiona na 1, łącznik publikuje komunikat w temacie MQTT, a następnie czeka nack przed zatwierdzeniem komunikatu z powrotem do platformy Kafka. W przypadku usługi QoS 0 łącznik zatwierdza natychmiast bez ACK MQTT. Nie.

Przykład użycia kafkaToMqtt trasy:

kafkaToMqtt:
  kafkaTopic: sending-event-hub
  mqttTopic: heater-commands
  qos: 0

W tym przykładzie komunikaty z tematu platformy Kafka wysyłające-centrum zdarzeń są publikowane w poleceniach podgrzewacza tematów MQTT z poziomem QoS 0.

Nazwa centrum zdarzeń musi być zgodna z tematem platformy Kafka

Każde pojedyncze centrum zdarzeń, a nie przestrzeń nazw, musi być nazwana dokładnie tak samo jak zamierzony temat platformy Kafka określony w trasach. Ponadto parametry połączenia EntityPath musi być zgodna z zakresem parametry połączenia do jednego centrum zdarzeń. To wymaganie jest spowodowane tym, że przestrzeń nazw usługi Event Hubs jest analogiczna do nazwy klastra Kafka, a nazwa centrum zdarzeń jest analogiczna do tematu platformy Kafka, więc nazwa tematu platformy Kafka musi być zgodna z nazwą centrum zdarzeń.

Przesunięcia grupy odbiorców platformy Kafka

Jeśli łącznik zostanie odłączony lub zostanie usunięty i ponownie zainstalowany z tym samym identyfikatorem grupy odbiorców platformy Kafka, przesunięcie grupy odbiorców (ostatnia pozycja, z której użytkownik platformy Kafka odczytuje komunikaty) jest przechowywana w usłudze Azure Event Hubs. Aby dowiedzieć się więcej, zobacz Grupa odbiorców usługi Event Hubs a grupa odbiorców platformy Kafka.

Wersja MQTT

Ten łącznik używa tylko protokołu MQTT v5.

Publikowanie i subskrybowanie komunikatów MQTT przy użyciu usługi Azure IoT MQ (wersja zapoznawcza)