Senden von Daten von Azure IoT MQ Preview an Data Lake Storage
Wichtig
Die von Azure Arc aktivierte Azure IoT Operations Preview befindet sich derzeit in der VORSCHAU. Sie sollten diese Vorschausoftware nicht in Produktionsumgebungen verwenden.
Die zusätzlichen Nutzungsbestimmungen für Microsoft Azure-Vorschauen enthalten rechtliche Bedingungen. Sie gelten für diejenigen Azure-Features, die sich in der Beta- oder Vorschauversion befinden oder aber anderweitig noch nicht zur allgemeinen Verfügbarkeit freigegeben sind.
Sie können den Data Lake-Connector verwenden, um Daten vom Azure IoT MQ-Broker (Vorschau) an einen Data Lake wie Azure Data Lake Storage Gen2 (ADLSv2), Microsoft Fabric OneLake und Azure Data Explorer zu senden. Der Connector abonniert MQTT-Themen und erfasst die Nachrichten in die Delta-Tabellen im Data Lake Storage-Konto.
Voraussetzungen
Ein Data Lake Storage-Konto in Azure mit einem Container und einem Ordner für Ihre Daten. Weitere Informationen zur Erstellung eines Data Lake Storage erhalten Sie mit einer der folgenden Schnellstartoptionen:
- Microsoft Fabric OneLake Schnellstart:
- Erstellen eines Arbeitsbereichs, da die Standardoption mein Arbeitsbereich nicht unterstützt wird.
- Erstellen eines Lakehouse
- Azure Data Lake Storage Gen2 Schnellstart:
- Erstellen Sie ein Speicherkonto zur Verwendung mit Azure Data Lake Storage Gen2.
- Azure Data Explorer-Cluster:
- Führen Sie die Schritte für einen vollständigen Cluster unter Schnellstart: Erstellen eines Azure Data Explorer-Clusters und einer Datenbank aus.
- Microsoft Fabric OneLake Schnellstart:
Ein IoT MQ MQTT Vermittler Weitere Informationen zum Bereitstellen eines IoT MQ MQTT Vermittlers finden Sie im Schnellstart: Bereitstellen von Azure IoT Einsatz Preview in einem Arc-fähigen Kubernetes-Cluster.
Konfigurieren des Sendens von Daten an Microsoft Fabric OneLake mithilfe der verwalteten Identität
Konfigurieren Sie einen Data Lake Connector zum Herstellen einer Verbindung mit Microsoft Fabric OneLake unter Verwendung einer verwalteten Identität.
Stellen Sie sicher, dass die Schritte unter Voraussetzungen erfüllt sind, einschließlich eines Microsoft Fabric-Arbeitsbereichs und eines Lakehouse. Die Standardoption mein Arbeitsbereich kann nicht verwendet werden.
Stellen Sie sicher, dass die IoT MQ Arc-Erweiterung installiert und mit verwalteter Identität konfiguriert ist.
Wechseln Sie im Azure-Portal zum Arc-verbundenen Kubernetes-Cluster und wählen Sie Einstellungen>Erweiterungen aus. Suchen Sie in der Erweiterungsliste nach Ihrem IoT MQ-Erweiterungsnamen. Der Name beginnt mit
mq-
gefolgt von fünf zufälligen Zeichen. Beispiel: mq-4jgjs.Ermitteln Sie die mit der verwalteten Identität der IoT MQ Arc-Erweiterung verknüpfte App-ID und notieren Sie sich den GUID-Wert. Die App-ID unterscheidet sich von der Objekt- oder Prinzipal-ID. Sie können die Azure CLI verwenden, indem Sie die Objekt-ID der verwalteten Identität suchen und dann die App-ID des Dienstprinzipals abfragen, das mit der verwalteten Identität verbunden ist. Beispiel:
OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv) az ad sp show --query appId --id $OBJECT_ID --output tsv
Sie sollten eine Ausgabe mit einem GUID-Wert erhalten:
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
Diese GUID ist die App-ID, die Sie im nächsten Schritt verwenden müssen.
Verwenden Sie im Microsoft Fabric-Arbeitsbereich Zugriff verwalten und wählen Sie dann + Personen oder Gruppen hinzufügen aus.
Suchen Sie nach der IoT MQ Arc-Erweiterung mit dem Namen „mq“ und stellen Sie sicher, dass Sie den GUID-Wert der App-ID auswählen, den Sie im vorherigen Schritt gefunden haben.
Wählen Sie die Rolle Mitwirkende*r und dann Hinzufügen aus.
Erstellen Sie eine DataLakeConnector-Ressource, die die Konfigurations- und Endpunkteinstellungen für den Connector definiert. Sie können das bereitgestellte YAML als Beispiel verwenden, aber stellen Sie sicher, dass Sie die folgenden Felder ändern:
target.fabricOneLake.endpoint
: Der Endpunkt des Microsoft Fabric OneLake-Kontos. Sie können die Endpunkt-URL von Microsoft Fabric Lakehouse unter Files>Eigenschaftenabrufen. Die URL sollte wiehttps://onelake.dfs.fabric.microsoft.com
aussehen.target.fabricOneLake.names
: Die Namen des Arbeitsbereichs und des Lakehouse Verwenden Sie entweder dieses Feld oderguids
. Sie sollten nicht beides verwenden.workspaceName
: Der Name des ArbeitsbereichslakehouseName
: Den Namen des Lakehouse
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: info databaseFormat: delta target: fabricOneLake: # Example: https://onelake.dfs.fabric.microsoft.com endpoint: <example-endpoint-url> names: workspaceName: <example-workspace-name> lakehouseName: <example-lakehouse-name> ## OR # guids: # workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx # lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx fabricPath: tables authentication: systemAssignedManagedIdentity: audience: https://storage.azure.com/ localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
Erstellen Sie eine DataLakeConnectorTopicMap-Ressource, die die Zuordnung zwischen dem MQTT-Thema und der Delta-Tabelle in Data Lake Storage definiert. Sie können das bereitgestellte YAML als Beispiel verwenden, aber stellen Sie sicher, dass Sie die folgenden Felder ändern:
dataLakeConnectorRef
: Den Namen der DataLakeConnector-Ressource, die Sie zuvor erstellt habenclientId
: Dies ist ein eindeutiger Bezeichner für Ihren MQTT-Client.mqttSourceTopic
: Den Namen des MQTT-Topics, von dem die Daten stammen sollentable.tableName
: Den Namen der Tabelle, die Sie im Lakehouse ergänzen möchten. Wenn die Tabelle nicht vorhanden ist, wird sie automatisch erstellt.table.schema
: Das Schema der Delta-Tabelle, das dem Format und den Feldern der JSON-Nachrichten entsprechen soll, die Sie an das MQTT-Thema senden.
Wenden Sie die Ressourcen DataLakeConnector und DataLakeConnectorTopicMap mit
kubectl apply -f datalake-connector.yaml
auf Ihren Kubernetes-Cluster an.Beginnen Sie mit dem Senden von JSON-Nachrichten an das MQTT-Thema über Ihren MQTT-Publisher. Die Data Lake Connector-Instanz abonniert das Thema und erfasst die Nachrichten in die Delta-Tabelle.
Überprüfen Sie mit einem Browser, ob die Daten in das Lakehouse importiert wurden. Wählen Sie im Arbeitsbereich von Microsoft Fabric Ihr Lakehouse und dann Tabellen. Die Daten sollten in der Tabelle angezeigt werden.
Tabelle „Unbekannt“
Wenn Ihre Daten in der Tabelle Unbekannt angezeigt werden:
Die Ursache könnten nicht unterstützte Zeichen im Tabellennamen sein. Der Tabellenname muss ein gültiger Azure Storage Containername sein, d.h. er kann jeden englischen Buchstaben, Groß- oder Kleinbuchstaben und den Unterstrich _
enthalten und bis zu 256 Zeichen lang sein. Es sind keine Bindestriche (-
) oder Leerzeichen erlaubt.
Konfigurieren des Sendens von Daten an Azure Data Lake Storage Gen2 mithilfe des SAS-Tokens
Konfigurieren Sie einen Data Lake Connector für die Verbindung zu einem Azure Data Lake Storage Gen2 (ADLS Gen2) Konto mit einem SAS-Token (Shared Access Signature).
Rufen Sie ein SAS-Token für ein Azure Data Lake Storage Gen2 (ADLS Gen2) Konto ab. Verwenden Sie zum Beispiel das Azure-Portal, um Ihr Speicherkonto aufzurufen. Wählen Sie im Menü unter Sicherheit und Netzwerk die Option Shared Access Signature aus. Verwenden Sie die folgende Tabelle, um die erforderlichen Berechtigungen festzulegen.
Parameter Wert Zulässige Dienste Blob Zulässige Ressourcentypen Objekt, Container Zugelassene Berechtigungen Lesen, Schreiben, Löschen, Auflisten, Erstellen Zur Optimierung für die geringste Berechtigung können Sie auch die SAS für einen einzelnen Container abrufen. Um Authentifizierungsfehler zu vermeiden, stellen Sie sicher, dass der Container dem
table.tableName
Wert in der Themenzuordnungskonfiguration entspricht.Erstellen Sie ein Kubernetes-Geheimnis mit dem SAS-Token. Verzichten Sie auf das Fragezeichen
?
, das am Anfang des Tokens stehen könnte.kubectl create secret generic my-sas \ --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \ -n azure-iot-operations
Erstellen Sie eine DataLakeConnector-Ressource, die die Konfigurations- und Endpunkteinstellungen für den Connector definiert. Sie können das bereitgestellte YAML als Beispiel verwenden, aber stellen Sie sicher, dass Sie die folgenden Felder ändern:
endpoint
: Den Data Lake Storage-Endpunkt des ADLSv2 Speicherkontos in Form vonhttps://example.blob.core.windows.net
. Suchen Sie im Azure-Portal den Endpunkt unter Speicherkonto > Einstellungen > Endpunkte > Data Lake Storage.accessTokenSecretName
: Den Namen des Kubernetes-Geheimnisses, das das SAS-Token enthält (my-sas
aus dem vorherigen Beispiel).
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: "debug" databaseFormat: "delta" target: datalakeStorage: endpoint: "https://example.blob.core.windows.net" authentication: accessTokenSecretName: "my-sas" localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
Erstellen Sie eine DataLakeConnectorTopicMap-Ressource, die die Zuordnung zwischen dem MQTT-Thema und der Delta-Tabelle in Data Lake Storage definiert. Sie können das bereitgestellte YAML als Beispiel verwenden, aber stellen Sie sicher, dass Sie die folgenden Felder ändern:
dataLakeConnectorRef
: Den Namen der DataLakeConnector-Ressource, die Sie zuvor erstellt habenclientId
: Dies ist ein eindeutiger Bezeichner für Ihren MQTT-Client.mqttSourceTopic
: Den Namen des MQTT-Topics, von dem die Daten stammen sollentable.tableName
: Den Namen des Containers, den Sie in Data Lake Storage ergänzen möchten. Wenn das SAS-Token auf das Konto beschränkt ist, wird der Container automatisch erstellt, falls er noch nicht vorhanden ist.table.schema
: Das Schema der Delta-Tabelle, das dem Format und den Feldern der JSON-Nachrichten entsprechen sollte, die Sie an das MQTT-Thema senden.
Wenden Sie die Ressourcen DataLakeConnector und DataLakeConnectorTopicMap mit
kubectl apply -f datalake-connector.yaml
auf Ihren Kubernetes-Cluster an.Beginnen Sie mit dem Senden von JSON-Nachrichten an das MQTT-Thema über Ihren MQTT-Publisher. Die Data Lake Connector-Instanz abonniert das Thema und erfasst die Nachrichten in die Delta-Tabelle.
Überprüfen Sie mithilfe des Azure-Portals, ob die Delta-Tabelle erstellt wurde. Die Dateien werden nach Client-ID, Connectorinstanzname, MQTT-Thema und Zeit organisiert. Öffnen Sie in Ihrem Speicherkonto >Container den Container, den Sie in DataLakeConnectorTopicMap angegeben haben. Stellen Sie sicher, dass _delta_log vorhanden ist und die Parquet-Dateien MQTT-Datenverkehr anzeigen. Öffnen Sie eine Parquet-Datei, um zu bestätigen, dass die Payload mit dem gesendeten und definierten Schema übereinstimmt.
Verwenden der „Verwalteten Identität“ zur Authentifizierung bei ADLSv2
Um eine verwaltete Identität zu verwenden, geben Sie sie als einzige Authentifizierungsmethode unter DataLakeConnector authentication
an. Verwenden Sie az k8s-extension show
, um die Prinzipal-ID für die IoT MQ Arc-Erweiterung zu finden. Weisen Sie dann der verwalteten Identität eine Rolle zu, die Berechtigungen zum Schreiben in das Speicherkonto erteilt, z. B. „Storage Blob Data Contributor“. Weitere Informationen finden Sie unter Autorisieren des Zugriffs auf Blobs mit Microsoft Entra ID.
authentication:
systemAssignedManagedIdentity:
audience: https://my-account.blob.core.windows.net
Konfigurieren, um Daten mithilfe der verwalteten Identität an Azure Data Explorer zu senden
Konfigurieren Sie den Data Lake-Connector, um Daten mithilfe der verwalteten Identität an einen Azure Data Explorer-Endpunkt zu senden.
Führen Sie die Schritte in den Voraussetzungen aus (einschließlich der Schritte für einen vollständigen Azure Data Explorer-Cluster). Die Option „Kostenloser Cluster“ funktioniert nicht.
Nachdem das Cluster erstellt wurde, erstellen Sie eine Datenbank zum Speichern Ihrer Daten.
Sie können eine Tabelle für bestimmte Daten über das Azure-Portal erstellen und Spalten manuell erstellen oder KQL auf der Abfrageregisterkarte verwenden. Beispiel:
.create table thermostat ( externalAssetId: string, assetName: string, CurrentTemperature: real, Pressure: real, MqttTopic: string, Timestamp: datetime )
Streamingerfassung aktivieren
Aktivieren Sie die Streamingerfassung für Ihre Tabelle und Datenbank. Führen Sie auf der Registerkarte „Abfrage“ den folgenden Befehl aus, und ersetzen Sie <DATABASE_NAME>
durch ihren Datenbanknamen:
.alter database <DATABASE_NAME> policy streamingingestion enable
Hinzufügen der verwalteten Identität zum Azure Data Explorer-Cluster
Damit sich der Connector beim Azure-Daten-Explorer authentifizieren kann, müssen Sie die verwaltete Identität dem Azure Data Explorer-Cluster hinzufügen.
- Wechseln Sie im Azure-Portal zum Arc-verbundenen Kubernetes-Cluster und wählen Sie Einstellungen>Erweiterungen aus. Suchen Sie in der Erweiterungsliste nach dem Namen Ihrer IoT MQ-Erweiterung. Der Name beginnt mit
mq-
gefolgt von fünf zufälligen Zeichen. Beispiel: mq-4jgjs. Der Name der IoT MQ-Erweiterung ist identisch mit dem Namen der verwalteten MQ-Identität. - Wählen Sie in Ihrer Azure Data Explorer-Datenbank Berechtigungen>Hinzufügen>Ingestor aus. Suchen Sie nach dem Namen der verwalteten MQ-Identität, und fügen Sie ihn hinzu.
Weitere Informationen zum Hinzufügen von Berechtigungen finden Sie unter Verwalten von Azure Data Explorer-Clusterberechtigungen.
Jetzt können Sie den Connector bereitstellen und Daten an Azure Data Explorer senden.
Beispielbereitstellungsdatei
Beispielbereitstellungsdatei für den Azure Data Explorer-Connector. Kommentare, die mit TODO
beginnen, erfordern, dass Sie Platzhaltereinstellungen durch Ihre Angaben ersetzen.
apiVersion: mq.iotoperations.azure.com/v1beta1
name: my-adx-connector
namespace: azure-iot-operations
spec:
repository: mcr.microsoft.com/azureiotoperations/datalake
tag: 0.4.0-preview
pullPolicy: Always
databaseFormat: adx
target:
# TODO: insert the ADX cluster endpoint
endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
authentication:
systemAssignedManagedIdentity:
audience: https://api.kusto.windows.net
localBrokerConnection:
endpoint: aio-mq-dmqtt-frontend:8883
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
authentication:
kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: adx-topicmap
namespace: azure-iot-operations
spec:
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
# TODO: add DB and table name
tablePath: <DATABASE_NAME>
tableName: <TABLE_NAME>
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: MqttTopic
format: utf8
optional: false
mapping: $topic
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
In diesem Beispiel werden Daten aus dem azure-iot-operations/data/thermostat
-Thema mit Nachrichten im JSON-Format akzeptiert, z. B. folgendes:
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
DataLakeConnector
Ein DataLakeConnector ist eine benutzerdefinierte Kubernetes-Ressource, die die Konfiguration und Eigenschaften einer Data Lake Connector-Instanz definiert. Ein Data Lake Connector erfasst Daten aus MQTT-Themen in Delta-Tabellen in einem Data Lake Storage-Konto.
Das Feld Spezifikation einer DataLakeConnector-Ressource enthält die folgenden Unterfelder:
protocol
: Die MQTT-Version Kann entwederv5
oderv3
sein.image
: Das Image-Feld gibt das Containerimage des Data Lake Connector-Moduls an. Es hat die folgenden Subfelder:repository
: Den Namen der Containerregistrierung und des Repositorys, in dem das Image gespeichert ist.tag
: Das Tag des zu verwendenden Images.pullPolicy
: Die Pullrichtlinie für das Image. Kann entwederAlways
,IfNotPresent
oderNever
sein.
instances
: Die Anzahl der Replikate des auszuführenden Data Lake-Connectors.logLevel
: Die Protokollebene für das Data Lake Connector-Modul. Kann entwedertrace
,debug
,info
,warn
,error
, oderfatal
sein.databaseFormat
: Das Format der Daten, die in Data Lake Storage erfasst werden sollen. Kann entwederdelta
oderparquet
sein.target
: Das Zielfeld gibt das Ziel der Datenerfassung an. Es kanndatalakeStorage
,fabricOneLake
,adx
oderlocalStorage
sein.datalakeStorage
: Gibt die Konfiguration und die Eigenschaften des ADLSv2-Kontos an. Es hat die folgenden Subfelder:endpoint
: Die URL des Data Lake Storage-Kontoendpunkts. Fügen Sie keinen nachgestellten Schrägstrich/
ein.authentication
: Das Feld Authentifizierung gibt den Typ und die Anmeldeinformationen für den Zugriff auf das Data Lake Storage-Konto an. Folgende Werte sind möglich:accessTokenSecretName
: Der Name des Kubernetes-Geheimnisses für die Verwendung der gemeinsamen Zugriffstoken-Authentifizierung für das Data Lake Storage-Konto. Dieses Feld ist ein Pflichtfeld, wenn der TypaccessToken
verwendet wird.systemAssignedManagedIdentity
: Für die Verwendung der vom System verwalteten Identität zur Authentifizierung. Es gibt ein Subfeld:audience
: Eine Zeichenfolge in Form vonhttps://<my-account-name>.blob.core.windows.net
für die Zielgruppe der verwalteten Identitätstoken, die auf die Kontoebene beschränkt ist, oderhttps://storage.azure.com
für ein beliebiges Speicherkonto.
fabricOneLake
: Gibt die Konfiguration und die Eigenschaften von Microsoft Fabric OneLake an. Es hat die folgenden Subfelder:endpoint
: Die URL des Microsoft Fabric OneLake-Endpunkts. In der Regelhttps://onelake.dfs.fabric.microsoft.com
, da dies der globale OneLake-Endpunkt ist. Wenn Sie einen regionalen Endpunkt verwenden, hat dieser die Formhttps://<region>-onelake.dfs.fabric.microsoft.com
. Fügen Sie keinen nachgestellten Schrägstrich/
ein. Weitere Informationen finden Sie unter Herstellen einer Verbindung mit Microsoft OneLake.names
: Gibt die Namen des Arbeitsbereichs und des Lakehouse an. Verwenden Sie entweder dieses Feld oderguids
. Sie sollten nicht beides verwenden. Es hat die folgenden Subfelder:workspaceName
: Der Name des ArbeitsbereichslakehouseName
: Den Namen des Lakehouse
guids
: Gibt die GUIDs des Arbeitsbereichs und des Lakehouse an. Verwenden Sie entweder dieses Feld odernames
. Sie sollten nicht beides verwenden. Es hat die folgenden Subfelder:workspaceGuid
: Die GUID des Arbeitsbereichs.lakehouseGuid
: Die GUID des Lakehouse.
fabricPath
: Den Speicherort der Daten im Fabric-Arbeitsbereich. Es kann entwedertables
oderfiles
sein. Wenn estables
ist, werden die Daten in Fabric OneLake als Tabellen gespeichert. Wenn esfiles
ist, werden die Daten in Fabric OneLake als Dateien gespeichert. Wenn esfiles
ist, muss dasdatabaseFormat
parquet
sein.authentication
: Das Feld Authentifizierung gibt den Typ und die Anmeldedaten für den Zugriff auf Microsoft Fabric OneLake an. Kann im Moment nursystemAssignedManagedIdentity
sein. Es gibt ein Subfeld:systemAssignedManagedIdentity
: Für die Verwendung der vom System verwalteten Identität zur Authentifizierung. Es gibt ein Subfeld:audience
: Eine Zeichenkette für die Zielgruppe des verwalteten Identitätstokens und sie musshttps://storage.azure.com
sein.
adx
: Gibt die Konfiguration und die Eigenschaften der Azure Data Explorer-Datenbank an. Es hat die folgenden Subfelder:endpoint
: Die URL des Azure Data Explorer-Clusterendpunkts, z. B.https://<CLUSTER>.<REGION>.kusto.windows.net
. Fügen Sie keinen nachgestellten Schrägstrich/
ein.authentication
: Das Authentifizierungsfeld gibt den Typ und die Anmeldeinformationen für den Zugriff auf den Azure Data Explorer-Cluster an. Kann im Moment nursystemAssignedManagedIdentity
sein. Es gibt ein Subfeld:systemAssignedManagedIdentity
: Für die Verwendung der vom System verwalteten Identität zur Authentifizierung. Es gibt ein Subfeld:audience
: Eine Zeichenfolge für die Tokenzielgruppe der verwalteten Identität, diehttps://api.kusto.windows.net
lauten muss.
localStorage
: Gibt die Konfiguration und die Eigenschaften des lokalen Speicherkontos an. Es hat die folgenden Subfelder:volumeName
: Den Namen des Volumes, das in die einzelnen Connector-Pods eingebunden ist.
localBrokerConnection
: Wird verwendet, um die Standardverbindungskonfiguration zum IoT MQ MQTT Vermittler außer Kraft zu setzen. Siehe Verwalten der lokalen Brokerverbindung.
DataLakeConnectorTopicMap
Eine DataLakeConnectorTopicMap ist eine benutzerdefinierte Kubernetes-Ressource, die die Zuordnung zwischen einem MQTT-Thema und einer Delta-Tabelle in einem Data Lake Storage-Konto definiert. Eine DataLakeConnectorTopicMap-Ressource verweist auf eine DataLakeConnector-Ressource, die auf demselben Edge-Gerät läuft und Daten aus dem MQTT-Topic in die Delta-Tabelle erfasst.
Das Feld Spezifikation einer DataLakeConnectorTopicMap-Ressource enthält die folgenden Unterfelder:
dataLakeConnectorRef
: Den Namen der DataLakeConnector-Ressource, zu der diese Themenzuordnung gehört.mapping
: Das Mapping-Feld gibt die Details und Eigenschaften des MQTT-Themas und der Delta-Tabelle an. Es hat die folgenden Subfelder:allowedLatencySecs
: Die maximale Wartezeit in Sekunden zwischen dem Empfang einer Nachricht vom MQTT-Topic und ihrer Aufnahme in die Delta-Tabelle. Dieses Feld ist ein Pflichtfeld.clientId
: Einen eindeutigen Bezeichner für den MQTT-Client, der das Thema abonniert.maxMessagesPerBatch
: Die maximale Anzahl von Nachrichten, die in einem Batch in die Delta-Tabelle erfasst werden. Aufgrund einer vorübergehenden Einschränkung muss dieser Wert kleiner als 16 sein, wennqos
auf 1 festgelegt ist. Dieses Feld ist ein Pflichtfeld.messagePayloadType
: Den Typ der Payload, die an das MQTT-Thema gesendet wird. Kann entwederjson
oderavro
(noch nicht unterstützt) sein.mqttSourceTopic
: Den Namen der MQTT-Themen, die abonniert werden sollen. Unterstützt die MQTT-Themen-Platzhalternotation.qos
: Die Dienstgüte für das Abonnieren des MQTT-Themas. Kann entweder 0 oder 1 sein.table
: Das Feld Tabelle gibt die Konfiguration und die Eigenschaften der Delta-Tabelle im Data Lake Storage-Konto an. Es hat die folgenden Subfelder:tableName
: Den Namen der Delta-Tabelle, die im Data Lake Storage-Konto erstellt oder ergänzt werden soll. Dieses Feld wird auch als Containername bezeichnet, wenn es mit Azure Data Lake Storage Gen2 verwendet wird. Er kann jeden englischen Kleinbuchstaben und den Unterstrich (_
) enthalten und eine Länge von bis zu 256 Zeichen haben. Es sind keine Bindestriche (-
) oder Leerzeichen erlaubt.tablePath
: Der Name der Azure Data Explorer-Datenbank, wenn ein Connector vom Typadx
verwendet wird.schema
: Das Schema der Delta-Tabelle, das mit dem Format und den Feldern der Payload der Nachricht übereinstimmen sollte. Es handelt sich um ein Array von Objekten, die jeweils die folgenden Unterfelder enthalten:name
: Den Namen der Spalte in der Delta-Tabelle.format
: Den Datentyp der Spalte in der Delta-Tabelle. Kann entwederboolean
,int8
,int16
,int32
,int64
,uInt8
,uInt16
,uInt32
,uInt64
,float16
,float32
,float64
,date32
,timestamp
,binary
, oderutf8
sein. Typen ohne Vorzeichen, wieuInt8
, werden nicht vollständig unterstützt und werden wie Typen mit Vorzeichen behandelt, wenn sie hier angegeben werden.optional
: Ein boolescher Wert, der angibt, ob die Spalte optional ist oder nicht. Dieses Feld ist optional und hat den Standardwert false.mapping
: JSON-Pfadausdruck, der definiert, wie der Wert der Spalte aus der Payload der MQTT-Nachricht extrahiert werden soll. Die integrierten Zuordnungen$client_id
,$topic
,$property
und$received_time
stehen zur Verfügung, um den JSON-Code im MQTT-Nachrichtentext zu erweitern. Dieses Feld ist ein Pflichtfeld. Verwenden Sie $property für MQTT-Benutzereigenschaften. Beispielsweise stellt $property.assetId den Wert der assetId-Eigenschaft aus der MQTT-Nachricht dar.
Hier ist ein Beispiel für eine DataLakeConnectorTopicMap-Ressource:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: datalake-topicmap
namespace: azure-iot-operations
spec:
dataLakeConnectorRef: my-datalake-connector
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
tableName: thermostat
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
Stringified mit Escapezeichen wie "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}"
wird nicht unterstützt und führt dazu, dass der Connector den Fehler convertor found a null value (Konverter hat einen Nullwert gefunden) ausgibt.
Eine Beispielnachricht für das Thema azure-iot-operations/data/thermostat
, das mit diesem Schema funktioniert:
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
Dies entspricht:
externalAssetId | assetName | CurrentTemperature | Druck | mqttTopic | Zeitstempel |
---|---|---|---|---|---|
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx | thermostat-de | 5506 | 5506 | dlc | 2024-04-02T22:36:03.1827681Z |
Wichtig
Wenn das Datenschema aktualisiert wird, z. B. wenn ein Datentyp oder ein Name geändert wird, funktioniert die Transformation der eingehenden Daten möglicherweise nicht mehr. Sie müssen den Namen der Datentabelle ändern, wenn sich das Schema ändert.
Delta oder Parquet
Es werden sowohl Delta- als auch Parquetformate unterstützt.
Verwalten der lokalen Brokerverbindung
Wie die MQTT-Brücke fungiert der Data Lake Connector als Client für den IoT MQ MQTT Vermittler. Wenn Sie den Listener-Port oder die Authentifizierung Ihres IoT MQ MQTT Vermittlers angepasst haben, überschreiben Sie auch die lokale MQTT-Verbindungskonfiguration für den Data Lake Connector. Weitere Informationen finden Sie unter MQTT-Bridge lokale Broker-Verbindung.
Zugehöriger Inhalt
Veröffentlichen und Abonnieren von MQTT-Nachrichten mit Azure IoT MQ (Vorschau)
Feedback
https://aka.ms/ContentUserFeedback.
Bald verfügbar: Im Laufe des Jahres 2024 werden wir GitHub-Issues stufenweise als Feedbackmechanismus für Inhalte abbauen und durch ein neues Feedbacksystem ersetzen. Weitere Informationen finden Sie unterFeedback senden und anzeigen für