Wysyłanie danych z usługi Azure IoT MQ (wersja zapoznawcza) do usługi Data Lake Storage
Ważne
Usługa Azure IoT Operations Preview — włączona przez usługę Azure Arc jest obecnie dostępna w wersji zapoznawczej. Nie należy używać tego oprogramowania w wersji zapoznawczej w środowiskach produkcyjnych.
Zobacz Dodatkowe warunki użytkowania wersji zapoznawczych platformy Microsoft Azure, aby zapoznać się z postanowieniami prawnymi dotyczącymi funkcji platformy Azure, które są w wersji beta lub wersji zapoznawczej albo w inny sposób nie zostały jeszcze wydane jako ogólnie dostępne.
Łącznik data lake umożliwia wysyłanie danych z brokera usługi Azure IoT MQ w wersji zapoznawczej do magazynu typu data lake, takiego jak Azure Data Lake Storage Gen2 (ADLSv2), Microsoft Fabric OneLake i Azure Data Explorer. Łącznik subskrybuje tematy MQTT i pozyskuje komunikaty do tabel delty na koncie usługi Data Lake Storage.
Wymagania wstępne
Konto usługi Data Lake Storage na platformie Azure z kontenerem i folderem danych. Aby uzyskać więcej informacji na temat tworzenia usługi Data Lake Storage, użyj jednej z następujących opcji przewodnika Szybki start:
- Przewodnik Szybki start dotyczący usługi Microsoft Fabric OneLake:
- Utwórz obszar roboczy, ponieważ domyślny obszar roboczy nie jest obsługiwany.
- Utwórz jezioro.
- Przewodnik Szybki start dla usługi Azure Data Lake Storage Gen2:
- Utwórz konto magazynu do użycia z Azure Data Lake Storage Gen2.
- Klaster usługi Azure Data Explorer:
- Wykonaj kroki pełne klastra w przewodniku Szybki start: tworzenie klastra i bazy danych usługi Azure Data Explorer.
- Przewodnik Szybki start dotyczący usługi Microsoft Fabric OneLake:
Broker MQTT IoT MQTT. Aby uzyskać więcej informacji na temat wdrażania brokera MQTT IoT MQTT, zobacz Szybki start: wdrażanie usługi Azure IoT Operations Preview w klastrze Kubernetes z obsługą usługi Arc.
Konfigurowanie wysyłania danych do usługi Microsoft Fabric OneLake przy użyciu tożsamości zarządzanej
Skonfiguruj łącznik usługi Data Lake w celu nawiązania połączenia z usługą Microsoft Fabric OneLake przy użyciu tożsamości zarządzanej.
Upewnij się, że zostały spełnione kroki opisane w wymaganiach wstępnych, w tym obszar roboczy usługi Microsoft Fabric i usługa Lakehouse. Nie można użyć domyślnego obszaru roboczego .
Upewnij się, że rozszerzenie usługi IoT MQ Arc jest zainstalowane i skonfigurowane przy użyciu tożsamości zarządzanej.
W witrynie Azure Portal przejdź do klastra Kubernetes połączonego z usługą Arc i wybierz pozycję Ustawienia> Extensions. Na liście rozszerzeń wyszukaj nazwę rozszerzenia IoT MQ. Nazwa zaczyna się od
mq-
pięciu losowych znaków. Na przykład mq-4jgjs.Pobierz identyfikator aplikacji skojarzony z tożsamością zarządzaną rozszerzenia IoT MQ Arc i zanotuj wartość identyfikatora GUID. Identyfikator aplikacji różni się od obiektu lub identyfikatora podmiotu zabezpieczeń. Interfejs wiersza polecenia platformy Azure można użyć, wyszukując identyfikator obiektu tożsamości zarządzanej, a następnie wysyłając zapytanie do identyfikatora aplikacji jednostki usługi skojarzonej z tożsamością zarządzaną. Na przykład:
OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv) az ad sp show --query appId --id $OBJECT_ID --output tsv
Powinny zostać wyświetlone dane wyjściowe z wartością identyfikatora GUID:
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
Ten identyfikator GUID to identyfikator aplikacji, który należy użyć w następnym kroku.
W obszarze roboczym usługi Microsoft Fabric użyj pozycji Zarządzaj dostępem, a następnie wybierz pozycję + Dodaj osoby lub grupy.
Wyszukaj rozszerzenie IoT MQ Arc o nazwie "mq" i upewnij się, że wybrano wartość identyfikatora GUID identyfikatora aplikacji znalezioną w poprzednim kroku.
Wybierz pozycję Współautor jako rolę, a następnie wybierz pozycję Dodaj.
Utwórz zasób DataLake Połączenie or, który definiuje ustawienia konfiguracji i punktu końcowego dla łącznika. Możesz użyć podanego przykładu kodu YAML, ale pamiętaj, aby zmienić następujące pola:
target.fabricOneLake.endpoint
: punkt końcowy konta onelake usługi Microsoft Fabric. Adres URL punktu końcowego można uzyskać z usługi Microsoft Fabric Lakehouse w obszarze Właściwości plików>. Adres URL powinien wyglądać następująco:https://onelake.dfs.fabric.microsoft.com
.target.fabricOneLake.names
: nazwy obszaru roboczego i lakehouse. Użyj tego pola lubguids
. Nie używaj obu tych elementów.workspaceName
: nazwa obszaru roboczego.lakehouseName
: nazwa jeziora.
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: info databaseFormat: delta target: fabricOneLake: # Example: https://onelake.dfs.fabric.microsoft.com endpoint: <example-endpoint-url> names: workspaceName: <example-workspace-name> lakehouseName: <example-lakehouse-name> ## OR # guids: # workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx # lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx fabricPath: tables authentication: systemAssignedManagedIdentity: audience: https://storage.azure.com/ localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
Utwórz zasób DataLake Połączenie orTopicMap, który definiuje mapowanie między tematem MQTT a tabelą delty w usłudze Data Lake Storage. Możesz użyć podanego przykładu kodu YAML, ale pamiętaj, aby zmienić następujące pola:
dataLakeConnectorRef
: nazwa utworzonego wcześniej zasobu DataLake Połączenie or.clientId
: unikatowy identyfikator klienta MQTT.mqttSourceTopic
: nazwa tematu MQTT, z którego mają pochodzić dane.table.tableName
: nazwa tabeli, do której chcesz dołączyć element w lakehouse. Tabela jest tworzona automatycznie, jeśli nie istnieje.table.schema
: schemat tabeli delta, która powinna być zgodna z formatem i polami komunikatów JSON wysyłanych do tematu MQTT.
Zastosuj zasoby DataLake Połączenie or i DataLake Połączenie orTopicMap do klastra Kubernetes przy użyciu polecenia
kubectl apply -f datalake-connector.yaml
.Zacznij wysyłać komunikaty JSON do tematu MQTT przy użyciu wydawcy MQTT. Wystąpienie łącznika usługi Data Lake subskrybuje temat i pozyskuje komunikaty do tabeli delty.
Za pomocą przeglądarki sprawdź, czy dane są importowane do magazynu lakehouse. W obszarze roboczym usługi Microsoft Fabric wybierz swoją usługę Lakehouse, a następnie pozycję Tabele. Dane powinny być widoczne w tabeli.
Niezidentyfikowana tabela
Jeśli dane są wyświetlane w tabeli Niezidentyfikowane :
Przyczyną mogą być nieobsługiwane znaki w nazwie tabeli. Nazwa tabeli musi być prawidłową nazwą kontenera usługi Azure Storage, co oznacza, że może zawierać dowolną cyfrę angielską, górną lub małą literę oraz pasek underbar _
, o długości do 256 znaków. Nie są dozwolone żadne kreski -
ani znaki spacji.
Konfigurowanie wysyłania danych do usługi Azure Data Lake Storage Gen2 przy użyciu tokenu SAS
Skonfiguruj łącznik usługi Data Lake w celu nawiązania połączenia z kontem usługi Azure Data Lake Storage Gen2 (ADLS Gen2) przy użyciu tokenu sygnatury dostępu współdzielonego (SAS).
Uzyskaj token SAS dla konta usługi Azure Data Lake Storage Gen2 (ADLS Gen2). Na przykład użyj witryny Azure Portal, aby przejść do konta magazynu. W menu w obszarze Zabezpieczenia i sieć wybierz pozycję Sygnatura dostępu współdzielonego. Użyj poniższej tabeli, aby ustawić wymagane uprawnienia.
Parametr Wartość Dozwolone usługi Obiekt blob Dozwolone typy zasobów Obiekt, kontener Dozwolone uprawnienia Odczyt, zapis, usuwanie, lista, tworzenie Aby zoptymalizować pod kątem najniższych uprawnień, możesz również pobrać sygnaturę dostępu współdzielonego dla pojedynczego kontenera. Aby zapobiec błędom uwierzytelniania, upewnij się, że kontener jest zgodny z wartością
table.tableName
w konfiguracji mapy tematu.Utwórz wpis tajny kubernetes z tokenem SAS. Nie dołączaj znaku
?
zapytania, który może znajdować się na początku tokenu.kubectl create secret generic my-sas \ --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \ -n azure-iot-operations
Utwórz zasób DataLake Połączenie or, który definiuje ustawienia konfiguracji i punktu końcowego dla łącznika. Możesz użyć podanego przykładu kodu YAML, ale pamiętaj, aby zmienić następujące pola:
endpoint
: punkt końcowy usługi Data Lake Storage konta magazynu ADLSv2 w postacihttps://example.blob.core.windows.net
. W witrynie Azure Portal znajdź punkt końcowy w obszarze Konto > magazynu Ustawienia > Endpoints > Data Lake Storage.accessTokenSecretName
: nazwa wpisu tajnego kubernetes zawierającego token SAS (my-sas
z poprzedniego przykładu).
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: "debug" databaseFormat: "delta" target: datalakeStorage: endpoint: "https://example.blob.core.windows.net" authentication: accessTokenSecretName: "my-sas" localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
Utwórz zasób DataLake Połączenie orTopicMap, który definiuje mapowanie między tematem MQTT a tabelą delty w usłudze Data Lake Storage. Możesz użyć podanego przykładu kodu YAML, ale pamiętaj, aby zmienić następujące pola:
dataLakeConnectorRef
: nazwa utworzonego wcześniej zasobu DataLake Połączenie or.clientId
: unikatowy identyfikator klienta MQTT.mqttSourceTopic
: nazwa tematu MQTT, z którego mają pochodzić dane.table.tableName
: nazwa kontenera, do którego chcesz dołączyć usługę Data Lake Storage. Jeśli token SYGNATURy dostępu współdzielonego jest w zakresie konta, kontener zostanie utworzony automatycznie, jeśli nie ma go.table.schema
: schemat tabeli delta, który powinien być zgodny z formatem i polami komunikatów JSON wysyłanych do tematu MQTT.
Zastosuj zasoby DataLake Połączenie or i DataLake Połączenie orTopicMap do klastra Kubernetes przy użyciu polecenia
kubectl apply -f datalake-connector.yaml
.Zacznij wysyłać komunikaty JSON do tematu MQTT przy użyciu wydawcy MQTT. Wystąpienie łącznika usługi Data Lake subskrybuje temat i pozyskuje komunikaty do tabeli delty.
W witrynie Azure Portal sprawdź, czy tabela delta została utworzona. Pliki są uporządkowane według identyfikatora klienta, nazwy wystąpienia łącznika, tematu MQTT i czasu. W kontenerach konta >magazynu otwórz kontener określony w obiekcie DataLake Połączenie orTopicMap. Sprawdź, czy _delta_log istnieje, a pliki parque pokazują ruch MQTT. Otwórz plik parque, aby potwierdzić, że ładunek jest zgodny z tym, co zostało wysłane i zdefiniowane w schemacie.
Używanie tożsamości zarządzanej do uwierzytelniania w usłudze ADLSv2
Aby użyć tożsamości zarządzanej, określ ją jako jedyną metodę w obszarze DataLake Połączenie or authentication
. Użyj az k8s-extension show
polecenia , aby znaleźć identyfikator podmiotu zabezpieczeń rozszerzenia usługi IoT MQ Arc, a następnie przypisać rolę do tożsamości zarządzanej, która udziela uprawnień do zapisu na koncie magazynu, takiego jak Współautor danych obiektu blob usługi Storage. Aby dowiedzieć się więcej, zobacz Autoryzowanie dostępu do obiektów blob przy użyciu identyfikatora Entra firmy Microsoft.
authentication:
systemAssignedManagedIdentity:
audience: https://my-account.blob.core.windows.net
Konfigurowanie wysyłania danych do usługi Azure Data Explorer przy użyciu tożsamości zarządzanej
Skonfiguruj łącznik usługi Data Lake do wysyłania danych do punktu końcowego usługi Azure Data Explorer przy użyciu tożsamości zarządzanej.
Upewnij się, że zostały spełnione kroki opisane w wymaganiach wstępnych, w tym pełny klaster usługi Azure Data Explorer. Opcja "bezpłatny klaster" nie działa.
Po utworzeniu klastra utwórz bazę danych do przechowywania danych.
Tabelę dla danych można utworzyć za pośrednictwem witryny Azure Portal i ręcznie utworzyć kolumny lub użyć języka KQL na karcie zapytania. Na przykład:
.create table thermostat ( externalAssetId: string, assetName: string, CurrentTemperature: real, Pressure: real, MqttTopic: string, Timestamp: datetime )
Włączanie pozyskiwania danych przesyłanych strumieniowo
Włącz pozyskiwanie strumieniowe w tabeli i bazie danych. Na karcie zapytania uruchom następujące polecenie, zastępując <DATABASE_NAME>
nazwę bazy danych:
.alter database <DATABASE_NAME> policy streamingingestion enable
Dodawanie tożsamości zarządzanej do klastra usługi Azure Data Explorer
Aby łącznik był uwierzytelniany w usłudze Azure Data Explorer, należy dodać tożsamość zarządzaną do klastra usługi Azure Data Explorer.
- W witrynie Azure Portal przejdź do klastra Kubernetes połączonego z usługą Arc i wybierz pozycję Ustawienia> Extensions. Na liście rozszerzeń wyszukaj nazwę rozszerzenia IoT MQ. Nazwa zaczyna się od
mq-
pięciu losowych znaków. Na przykład mq-4jgjs. Nazwa rozszerzenia IoT MQ jest taka sama jak nazwa tożsamości zarządzanej MQ. - W bazie danych usługi Azure Data Explorer wybierz pozycję Uprawnienia>Dodaj>ingestor. Wyszukaj nazwę tożsamości zarządzanej MQ i dodaj ją.
Aby uzyskać więcej informacji na temat dodawania uprawnień, zobacz Zarządzanie uprawnieniami klastra usługi Azure Data Explorer.
Teraz możesz przystąpić do wdrażania łącznika i wysyłania danych do usługi Azure Data Explorer.
Przykładowy plik wdrożenia
Przykładowy plik wdrożenia łącznika usługi Azure Data Explorer. Komentarze, które zaczynają się od TODO
, wymagają zastąpienia ustawień symboli zastępczych informacjami.
apiVersion: mq.iotoperations.azure.com/v1beta1
name: my-adx-connector
namespace: azure-iot-operations
spec:
repository: mcr.microsoft.com/azureiotoperations/datalake
tag: 0.4.0-preview
pullPolicy: Always
databaseFormat: adx
target:
# TODO: insert the ADX cluster endpoint
endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
authentication:
systemAssignedManagedIdentity:
audience: https://api.kusto.windows.net
localBrokerConnection:
endpoint: aio-mq-dmqtt-frontend:8883
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
authentication:
kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: adx-topicmap
namespace: azure-iot-operations
spec:
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
# TODO: add DB and table name
tablePath: <DATABASE_NAME>
tableName: <TABLE_NAME>
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: MqttTopic
format: utf8
optional: false
mapping: $topic
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
W tym przykładzie dane z tematu azure-iot-operations/data/thermostat
są akceptowane z komunikatami w formacie JSON, takimi jak:
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
DataLake Połączenie or
Obiekt DataLake Połączenie or to niestandardowy zasób kubernetes, który definiuje konfigurację i właściwości wystąpienia łącznika usługi Data Lake. Łącznik usługi Data Lake pozyskuje dane z tematów MQTT do tabel delty na koncie usługi Data Lake Storage.
Pole specyfikacji zasobu DataLake Połączenie or zawiera następujące pola podrzędne:
protocol
: wersja MQTT. Może to być jeden zv5
lubv3
.image
: Pole obrazu określa obraz kontenera modułu łącznika usługi Data Lake. Ma następujące pola podrzędne:repository
: nazwa rejestru kontenerów i repozytorium, w którym jest przechowywany obraz.tag
: tag obrazu do użycia.pullPolicy
: zasady ściągania obrazu. Może to być jeden zAlways
elementów ,IfNotPresent
lubNever
.
instances
: liczba replik łącznika data lake do uruchomienia.logLevel
: poziom dziennika dla modułu łącznika usługi Data Lake. Może to być jeden ztrace
elementów ,debug
,info
,warn
,error
lubfatal
.databaseFormat
: format danych do pozyskiwania w usłudze Data Lake Storage. Może to być jeden zdelta
lubparquet
.target
: pole docelowe określa miejsce docelowe pozyskiwania danych. Może to byćdatalakeStorage
, ,adx
fabricOneLake
lublocalStorage
.datalakeStorage
: określa konfigurację i właściwości konta ADLSv2. Ma następujące pola podrzędne:endpoint
: adres URL punktu końcowego konta usługi Data Lake Storage. Nie dołączaj żadnego ukośnika/
końcowego .authentication
: Pole uwierzytelniania określa typ i poświadczenia dostępu do konta usługi Data Lake Storage. Może to być jeden z następujących elementów.accessTokenSecretName
: nazwa wpisu tajnego kubernetes do korzystania z uwierzytelniania tokenu dostępu współdzielonego dla konta usługi Data Lake Storage. To pole jest wymagane, jeśli typ toaccessToken
.systemAssignedManagedIdentity
: w przypadku korzystania z tożsamości zarządzanej przez system na potrzeby uwierzytelniania. Ma jedno pole podrzędneaudience
: ciąg w postacihttps://<my-account-name>.blob.core.windows.net
grupy odbiorców tokenu tożsamości zarządzanej o określonym zakresie na poziomie konta lubhttps://storage.azure.com
dla dowolnego konta magazynu.
fabricOneLake
: określa konfigurację i właściwości usługi Microsoft Fabric OneLake. Ma następujące pola podrzędne:endpoint
: adres URL punktu końcowego usługi Microsoft Fabric OneLake. Zazwyczaj jesthttps://onelake.dfs.fabric.microsoft.com
to punkt końcowy globalny OneLake. Jeśli używasz regionalnego punktu końcowego, jest on w postacihttps://<region>-onelake.dfs.fabric.microsoft.com
. Nie dołączaj żadnego ukośnika/
końcowego . Aby dowiedzieć się więcej, zobacz Połączenie do usługi Microsoft OneLake.names
: określa nazwy obszaru roboczego i lakehouse. Użyj tego pola lubguids
. Nie używaj obu tych elementów. Ma następujące pola podrzędne:workspaceName
: nazwa obszaru roboczego.lakehouseName
: nazwa jeziora.
guids
: określa identyfikatory GUID obszaru roboczego i lakehouse. Użyj tego pola lubnames
. Nie używaj obu tych elementów. Ma następujące pola podrzędne:workspaceGuid
: identyfikator GUID obszaru roboczego.lakehouseGuid
: identyfikator GUID lakehouse.
fabricPath
: lokalizacja danych w obszarze roboczym Sieć szkieletowa. Może to być wartośćtables
lubfiles
. Jeśli taktables
, dane są przechowywane w usłudze Fabric OneLake jako tabele. Jeśli jestfiles
to , dane są przechowywane w usłudze Fabric OneLake jako pliki. Jeśli ma wartośćfiles
,databaseFormat
musi to byćparquet
.authentication
: Pole uwierzytelniania określa typ i poświadczenia dostępu do usługi Microsoft Fabric OneLake. To może byćsystemAssignedManagedIdentity
tylko na razie. Ma jedno pole podrzędne:systemAssignedManagedIdentity
: w przypadku korzystania z tożsamości zarządzanej przez system na potrzeby uwierzytelniania. Ma jedno pole podrzędneaudience
: ciąg dla odbiorców tokenu tożsamości zarządzanej i musi to byćhttps://storage.azure.com
.
adx
: określa konfigurację i właściwości bazy danych usługi Azure Data Explorer. Ma następujące pola podrzędne:endpoint
: adres URL punktu końcowego klastra usługi Azure Data Explorer, taki jakhttps://<CLUSTER>.<REGION>.kusto.windows.net
. Nie dołączaj żadnego ukośnika/
końcowego .authentication
: Pole uwierzytelniania określa typ i poświadczenia na potrzeby uzyskiwania dostępu do klastra usługi Azure Data Explorer. To może byćsystemAssignedManagedIdentity
tylko na razie. Ma jedno pole podrzędne:systemAssignedManagedIdentity
: w przypadku korzystania z tożsamości zarządzanej przez system na potrzeby uwierzytelniania. Ma jedno pole podrzędneaudience
: ciąg dla odbiorców tokenu tożsamości zarządzanej i powinien to byćhttps://api.kusto.windows.net
.
localStorage
: określa konfigurację i właściwości lokalnego konta magazynu. Ma następujące pola podrzędne:volumeName
: nazwa woluminu zainstalowanego w każdym zasobniku łącznika.
localBrokerConnection
: służy do zastępowania domyślnej konfiguracji połączenia z brokerem MQTT IoT MQTT. Zobacz Zarządzanie lokalnym połączeniem brokera.
DataLake Połączenie orTopicMap
DataLake Połączenie orTopicMap to niestandardowy zasób kubernetes definiujący mapowanie między tematem MQTT a tabelą delty na koncie usługi Data Lake Storage. Zasób DataLake Połączenie orTopicMap odwołuje się do zasobu DataLake Połączenie or działającego na tym samym urządzeniu brzegowym i pozyskiwania danych z tematu MQTT do tabeli delty.
Pole specyfikacji zasobu DataLake Połączenie orTopicMap zawiera następujące pola podrzędne:
dataLakeConnectorRef
: nazwa zasobu DataLake Połączenie or, do którego należy ta mapa tematu.mapping
: Pole mapowania określa szczegóły i właściwości tematu MQTT oraz tabelę delta. Ma następujące pola podrzędne:allowedLatencySecs
: maksymalne opóźnienie w sekundach między odbieraniem komunikatu z tematu MQTT i pozyskiwaniem go do tabeli delty. To pole jest wymagane.clientId
: unikatowy identyfikator klienta MQTT, który subskrybuje temat.maxMessagesPerBatch
: maksymalna liczba komunikatów do pozyskiwania w jednej partii do tabeli delty. Ze względu na tymczasowe ograniczenie ta wartość musi być mniejsza niż 16, jeśliqos
jest ustawiona na 1. To pole jest wymagane.messagePayloadType
: typ ładunku wysyłanego do tematu MQTT. Może to być jeden zjson
lubavro
(jeszcze nieobsługiwany).mqttSourceTopic
: nazwa tematów MQTT do subskrybowania. Obsługuje notację wieloznacznych symboli wieloznacznych w temacie MQTT.qos
: jakość poziomu usług dla subskrybowania tematu MQTT. Może to być jeden z 0 lub 1.table
: Pole tabeli określa konfigurację i właściwości tabeli delta na koncie usługi Data Lake Storage. Ma następujące pola podrzędne:tableName
: nazwa tabeli delty do utworzenia lub dołączenia do elementu na koncie usługi Data Lake Storage. To pole jest również nazywane nazwą kontenera w przypadku użycia z usługą Azure Data Lake Storage Gen2. Może zawierać dowolną małą literę angielską i znak underbar_
, o długości do 256 znaków. Nie są dozwolone żadne kreski-
ani znaki spacji.tablePath
: nazwa bazy danych usługi Azure Data Explorer podczas korzystania zadx
łącznika typów.schema
: schemat tabeli delty, który powinien być zgodny z formatem i polami ładunku komunikatu. Jest to tablica obiektów z następującymi polami podrzędnymi:name
: nazwa kolumny w tabeli delty.format
: typ danych kolumny w tabeli delty. Może to być jeden zboolean
elementów ,int8
,int32
int64
uInt32
uInt64
uInt16
float16
float32
uInt8
int16
date32
timestamp
float64
binary
lub .utf8
Typy niepodpisane, takie jakuInt8
, nie są w pełni obsługiwane i są traktowane jako typy podpisane, jeśli określono tutaj.optional
: wartość logiczna wskazująca, czy kolumna jest opcjonalna, czy wymagana. To pole jest opcjonalne i domyślnie ma wartość false.mapping
: wyrażenie ścieżki JSON definiujące sposób wyodrębniania wartości kolumny z ładunku komunikatu MQTT. Wbudowane mapowania$client_id
, ,$topic
$property
i$received_time
są dostępne do użycia jako kolumny w celu wzbogacania kodu JSON w treści komunikatu MQTT. To pole jest wymagane. Użyj $property dla właściwości użytkownika MQTT. Na przykład $property.assetId reprezentuje wartość właściwości assetId z komunikatu MQTT.
Oto przykład zasobu DataLake Połączenie orTopicMap:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: datalake-topicmap
namespace: azure-iot-operations
spec:
dataLakeConnectorRef: my-datalake-connector
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
tableName: thermostat
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
Ciągowany kod JSON, taki jak "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}"
nie jest obsługiwany i powoduje, że łącznik zgłasza błąd wartości null przez łącznik.
Przykładowy komunikat dotyczący tematu azure-iot-operations/data/thermostat
, który działa z tym schematem:
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
Które mapuje na:
externalAssetId | assetName | CurrentTemperature | Ciśnienie | mqttTopic | timestamp |
---|---|---|---|---|---|
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx | termostat-de | 5506 | 5506 | Dlc | 2024-04-02T22:36:03.1827681Z |
Ważne
Jeśli schemat danych zostanie zaktualizowany, na przykład typ danych zostanie zmieniony lub nazwa zostanie zmieniona, przekształcenie danych przychodzących może przestać działać. Jeśli wystąpi zmiana schematu, musisz zmienić nazwę tabeli danych.
Delta lub parquet
Obsługiwane są formaty różnicowe i parquet.
Zarządzanie lokalnym połączeniem brokera
Podobnie jak mostek MQTT, łącznik usługi Data Lake działa jako klient brokera MQTT IoT MQTT. Jeśli dostosowano port odbiornika lub uwierzytelnianie brokera MQTT IoT MQTT, przesłoń również lokalną konfigurację połączenia MQTT dla łącznika usługi Data Lake. Aby dowiedzieć się więcej, zobacz Połączenie lokalnego brokera mostka MQTT.
Powiązana zawartość
Publikowanie i subskrybowanie komunikatów MQTT przy użyciu usługi Azure IoT MQ (wersja zapoznawcza)