Guida introduttiva: Usare le pipeline del processore di dati per elaborare i dati dagli asset OPC UA
Importante
Anteprima delle operazioni di Azure IoT: abilitata da Azure Arc è attualmente disponibile in ANTEPRIMA. Non è consigliabile usare questo software di anteprima negli ambienti di produzione.
Vedere le condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure per termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale.
In questo argomento di avvio rapido si usano pipeline di Azure IoT Data Processor (anteprima) per elaborare e arricchire i messaggi dagli asset OPC UA prima di inviare i dati a una lakehouse di Microsoft Fabric OneLake per l'archiviazione e l'analisi.
Prerequisiti
Prima di iniziare questa guida introduttiva, è necessario completare le guide introduttive seguenti:
- Avvio rapido: Distribuire operazioni IoT di Azure in un cluster Kubernetes abilitato per Arc
- Guida introduttiva: Aggiungere asset OPC UA al cluster operazioni IoT di Azure
È necessaria anche una sottoscrizione di Microsoft Fabric. È possibile iscriversi per ottenere una versione di valutazione gratuita di Microsoft Fabric (anteprima). Nella sottoscrizione di Microsoft Fabric verificare che le impostazioni seguenti siano abilitate per il tenant:
- Consentire alle entità servizio di usare API Power BI
- Gli utenti possono accedere ai dati archiviati in OneLake con app esterne a Fabric
Per altre informazioni, vedere Informazioni sulle impostazioni del tenant in Microsoft Fabric>.
Quale problema risolveremo?
Prima di inviare dati al cloud per l'archiviazione e l'analisi, è possibile elaborare e arricchire i dati. Ad esempio, è possibile aggiungere informazioni contestualizzate ai dati oppure filtrare i dati che non sono rilevanti per l'analisi. Le pipeline del processore di dati IoT di Azure consentono di elaborare e arricchire i dati prima di inviarli al cloud.
Creare un'entità servizio
Per creare un'entità servizio che consente alla pipeline di accedere all'area di lavoro di Microsoft Fabric:
Usare il comando seguente dell'interfaccia della riga di comando di Azure per creare un'entità servizio.
az ad sp create-for-rbac --name <YOUR_SP_NAME>
L'output di questo comando include un
appId
oggetto ,displayName
,password
etenant
. Prendere nota di questi valori da usare quando si configura l'accesso all'area di lavoro infrastruttura, creare un segreto e configurare una destinazione della pipeline:{ "appId": "<app-id>", "displayName": "<name>", "password": "<client-secret>", "tenant": "<tenant-id>" }
Concedere l'accesso all'area di lavoro di Microsoft Fabric
Passare a Microsoft Fabric.
Per assicurarsi di visualizzare l'opzione Gestisci accesso nell'area di lavoro di Microsoft Fabric, creare una nuova area di lavoro:
Selezionare Aree di lavoro nella barra di spostamento a sinistra e quindi selezionare Nuova area di lavoro:
Immettere un nome per l'area di lavoro, ad esempio Il nome dell'area di lavoro AIO e selezionare Applica.
Per concedere all'entità servizio l'accesso all'area di lavoro di Microsoft Fabric:
Passare all'area di lavoro di Microsoft Fabric e selezionare Gestisci accesso:
Selezionare Aggiungi persone o gruppi, quindi incollare il nome visualizzato dell'entità servizio nel passaggio precedente e concedere almeno l'accesso collaboratore .
Selezionare Aggiungi per concedere le autorizzazioni di collaboratore all'entità servizio nell'area di lavoro.
Creare una lakehouse
Creare una lakehouse nell'area di lavoro di Microsoft Fabric:
Passare a Ingegneria dei dati e quindi selezionare Lakehouse (anteprima):
Immettere un nome per il lakehouse, ad esempio yourname_pipeline_destination e selezionare Crea.
Aggiungere un segreto al cluster
Per accedere alla lakehouse da una pipeline del responsabile del trattamento dei dati, è necessario abilitare il cluster per accedere ai dettagli dell'entità servizio creati in precedenza. È necessario configurare Azure Key Vault con i dettagli dell'entità servizio in modo che il cluster possa recuperarli.
Usare il comando seguente per aggiungere un segreto all'insieme di credenziali delle chiavi di Azure contenente il segreto client annotato quando è stata creata l'entità servizio. L'insieme di credenziali delle chiavi di Azure è stato creato nell'argomento di avvio rapido Distribuire le operazioni di Azure IoT in un cluster Kubernetes abilitato per Arc:
az keyvault secret set --vault-name <your-key-vault-name> --name AIOFabricSecret --value <client-secret>
Per aggiungere il riferimento al segreto al cluster Kubernetes, modificare la risorsa aio-default-spcsecretproviderclass
:
Immettere il comando seguente nel computer in cui è in esecuzione il cluster per modificare la risorsa aio-default-spc
secretproviderclass
. La configurazione YAML per la risorsa viene aperta nell'editor predefinito:kubectl edit secretproviderclass aio-default-spc -n azure-iot-operations
Aggiungere una nuova voce alla matrice di segreti per il nuovo segreto di Azure Key Vault. La
spec
sezione è simile all'esempio seguente:# Edit the object below. Lines beginning with a '#' will be ignored, # and an empty file will abort the edit. If an error occurs while saving this file will be # reopened with the relevant failures. # apiVersion: secrets-store.csi.x-k8s.io/v1 kind: SecretProviderClass metadata: creationTimestamp: "2023-11-16T11:43:31Z" generation: 2 name: aio-default-spc namespace: azure-iot-operations resourceVersion: "89083" uid: cda6add7-3931-47bd-b924-719cc862ca29 spec: parameters: keyvaultName: <this is the name of your key vault> objects: | array: - | objectName: azure-iot-operations objectType: secret objectVersion: "" - | objectName: AIOFabricSecret objectType: secret objectVersion: "" tenantId: <this is your tenant id> usePodIdentity: "false" provider: azure
Salvare le modifiche e uscire dall'editor.
Il driver CSI aggiorna i segreti usando un intervallo di polling, pertanto il nuovo segreto non è disponibile per il pod finché non viene raggiunto l'intervallo di polling. Per aggiornare immediatamente il pod, riavviare i pod per il componente. Per il responsabile del trattamento dei dati, eseguire i comandi seguenti:
kubectl delete pod aio-dp-reader-worker-0 -n azure-iot-operations
kubectl delete pod aio-dp-runner-worker-0 -n azure-iot-operations
Creare una pipeline di base
Creare una pipeline di base per passare i dati a un argomento MQTT separato.
Nei passaggi seguenti lasciare tutti i valori predefiniti, se non diversamente specificato:
Nel portale delle operazioni di Azure IoT passare a Pipeline di dati nel cluster.
Per creare una nuova pipeline, selezionare + Crea pipeline.
Selezionare Configura MQ di origine>, quindi immettere le informazioni nell'argomento MQTT dei dati del termostato e quindi selezionare Applica:
Parametro valore Name input data
Gestore tls://aio-mq-dmqtt-frontend:8883
Authentication Service account token (SAT)
Argomento azure-iot-operations/data/opc.tcp/opc.tcp-1/#
Formato dati JSON
Selezionare Trasforma dalle fasi della pipeline come seconda fase della pipeline. Immettere i valori seguenti e quindi selezionare Applica:
Parametro Valore Display name passthrough
Query .
Questa semplice trasformazione JQ passa attraverso il messaggio in arrivo invariato.
Infine, selezionare Aggiungi destinazione, selezionare MQ nell'elenco delle destinazioni, immettere le informazioni seguenti e quindi selezionare Applica:
Parametro Valore Display name output data
Gestore tls://aio-mq-dmqtt-frontend:8883
Authentication Service account token (SAT)
Argomento dp-output
Formato dati JSON
Percorso .payload
Selezionare il nome della pipeline, il nome> della pipeline e modificarlo in passthrough-data-pipeline.< Selezionare Applica.
Selezionare Salva per salvare e distribuire la pipeline. La distribuzione di questa pipeline nel cluster richiede alcuni secondi.
Eseguire il comando seguente per creare un ambiente shell nel pod mqtt-client creato nella guida introduttiva precedente:
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh
Nella shell nel pod mqtt-client connettersi di nuovo al broker MQ usando il client MQTT. Questa volta specificare l'argomento
dp-output
.mqttui -b mqtts://aio-mq-dmqtt-frontend:8883 -u '$sat' --password $(cat /var/run/secrets/tokens/mq-sat) --insecure "dp-output"
Viene visualizzato lo stesso flusso di dati visualizzato in precedenza. Questo comportamento è previsto perché la pipeline di dati pass-through distribuita non trasforma i dati. La pipeline instrada i dati da un argomento MQTT a un altro.
I passaggi successivi sono la creazione di altre due pipeline per elaborare e contestualizzare i dati. Queste pipeline inviano i dati elaborati a un'infrastruttura lakehouse nel cloud per l'analisi.
Creare una pipeline di dati di riferimento
Creare una pipeline di dati di riferimento per archiviare temporaneamente i dati di riferimento in un set di dati di riferimento. Successivamente, si usano questi dati di riferimento per arricchire i dati inviati al lakehouse di Microsoft Fabric.
Nei passaggi seguenti lasciare tutti i valori predefiniti, se non diversamente specificato:
Nel portale delle operazioni di Azure IoT passare a Pipeline di dati nel cluster.
Selezionare + Crea pipeline per creare una nuova pipeline.
Selezionare Configura MQ di origine>, quindi immettere le informazioni nell'argomento dei dati di riferimento e quindi selezionare Applica:
Parametro valore Name reference data
Gestore tls://aio-mq-dmqtt-frontend:8883
Authentication Service account token (SAT)
Argomento reference_data
Formato dati JSON
Selezionare + Aggiungi destinazione e impostare la destinazione su Set di dati di riferimento.
Selezionare Crea nuovo accanto a Set di dati per configurare un set di dati di riferimento per archiviare i dati di riferimento per la contestualizzazione. Usare le informazioni nella tabella seguente per creare il set di dati di riferimento:
Parametro valore Name equipment-data
Ora di scadenza 1h
Selezionare Crea per salvare i dettagli della destinazione del set di dati di riferimento. La distribuzione del set di dati nel cluster richiede alcuni secondi e diventa visibile nella visualizzazione elenco set di dati.
Usare i valori nella tabella seguente per configurare la fase di destinazione. Selezionare quindi Applica:
Parametro valore Name reference data output
Set di dati equipment-data
(selezionare dall'elenco a discesa)Selezionare il nome della pipeline, il nome> della pipeline e modificarlo in reference-data-pipeline.< Selezionare Applica.
Selezionare la fase centrale ed eliminarla. Usare quindi il cursore per connettere la fase di input alla fase di output. Il risultato è simile allo screenshot seguente:
Selezionare Salva per salvare la pipeline.
Per archiviare i dati di riferimento, pubblicarli come messaggio MQTT nell'argomento reference_data
usando lo strumento mqttui:
Creare un ambiente shell nel pod mqtt-client creato nella guida introduttiva precedente:
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh
Pubblicare il messaggio:
mqttui -b mqtts://aio-mq-dmqtt-frontend:8883 -u '$sat' --password $(cat /var/run/secrets/tokens/mq-sat) --insecure publish "reference_data" '{ "customer": "Contoso", "batch": 102, "equipment": "Boiler", "location": "Seattle", "isSpare": true }'
Dopo aver pubblicato il messaggio, la pipeline riceve il messaggio e archivia i dati nel set di dati di riferimento dei dati delle apparecchiature.
Creare una pipeline di dati per arricchire i dati
Creare una pipeline del responsabile del trattamento dei dati per elaborare e arricchire i dati prima di inviarli al lakehouse di Microsoft Fabric. Questa pipeline usa i dati archiviati nel set di dati di riferimento dei dati delle apparecchiature per arricchire i messaggi.
Nel portale delle operazioni di Azure IoT passare a Pipeline di dati nel cluster.
Selezionare + Crea pipeline per creare una nuova pipeline.
Selezionare Configure source MQ (Configura MQ di origine>), usare le informazioni nella tabella seguente per immettere informazioni nell'argomento MQTT relativo ai dati del termostato e quindi selezionare Applica:
Parametro Valore Display name OPC UA data
Gestore tls://aio-mq-dmqtt-frontend:8883
Authentication Service account token (SAT)
Argomento azure-iot-operations/data/opc.tcp/opc.tcp-1/thermostat
Formato dati JSON
Per tenere traccia dell'ultimo valore noto (LKV) della temperatura, selezionare Fasi e selezionare Ultimi valori noti. Usare le informazioni riportate nelle tabelle seguenti per configurare la fase per tenere traccia delle LKV della temperatura per i messaggi con solo messaggi di stato caldaia, quindi selezionare Applica:
Parametro Valore Display name lkv stage
Aggiungere due proprietà:
Percorso di input Percorso di output Ora di scadenza .payload.payload["temperature"]
.payload.payload.temperature_lkv
01h
.payload.payload["Tag 10"]
.payload.payload.tag1_lkv
01h
Questa fase arricchisce i messaggi in arrivo con i valori più recenti
temperature
eTag 10
se mancanti. I valori più recenti rilevati vengono conservati per 1 ora. Se nel messaggio vengono visualizzate le proprietà rilevate, il valore più recente rilevato viene aggiornato per assicurarsi che i valori siano sempre aggiornati.Per arricchire il messaggio con i dati di riferimento contestuali, selezionare Arricchisci nelle fasi della pipeline. Configurare la fase usando i valori nella tabella seguente e quindi selezionare Applica:
Parametro valore Name enrich with reference dataset
Set di dati equipment-data
(dall'elenco a discesa)Percorso di output .payload.enrich
Questo passaggio arricchisce il messaggio OPC UA con i dati del set di dati equipment-data creato dalla pipeline di dati di riferimento.
Poiché non si forniscono condizioni, il messaggio viene arricchito con tutti i dati di riferimento. È possibile usare join basati su ID (
KeyMatch
) e join basati su timestamp (PastNearest
eFutureNearest
) per filtrare i dati di riferimento arricchiti in base alle condizioni specificate.Per trasformare i dati, selezionare Trasforma dalle fasi della pipeline. Configurare la fase usando i valori nella tabella seguente e quindi selezionare Applica:
Parametro Valore Display name construct full payload
L'espressione jq seguente formatta la proprietà payload per includere tutti i valori di telemetria e tutte le informazioni contestuali come coppie chiave-valore:
.payload = { assetName: .payload.dataSetWriterName, Timestamp: .payload.timestamp, Customer: .payload.enrich?.customer, Batch: .payload.enrich?.batch, Equipment: .payload.enrich?.equipment, IsSpare: .payload.enrich?.isSpare, Location: .payload.enrich?.location, CurrentTemperature : .payload.payload."temperature"?.Value, LastKnownTemperature: .payload.payload."temperature_lkv"?.Value, Pressure: (if .payload.payload | has("Tag 10") then .payload.payload."Tag 10"?.Value else .payload.payload."tag1_lkv"?.Value end) }
Usare l'espressione precedente come espressione di trasformazione. Questa espressione di trasformazione compila un payload contenente solo le coppie chiave valore necessarie per i dati di telemetria e i dati contestuali. Rinomina anche i tag con nomi descrittivi.
Infine, selezionare Aggiungi destinazione, selezionare Fabric Lakehouse, quindi immettere le informazioni seguenti per configurare la destinazione. È possibile trovare l'ID dell'area di lavoro e l'ID lakehouse dall'URL usato per accedere al lakehouse di Fabric. L'URL è simile al seguente:
https://msit.powerbi.com/groups/<workspace ID>/lakehouses/<lakehouse ID>?experience=data-engineering
.Parametro valore Name processed OPC UA data
URL https://msit-onelake.pbidedicated.windows.net
Authentication Service principal
ID tenant ID tenant annotato in precedenza quando è stata creata l'entità servizio. ID client L'ID client è l'ID app annotato in precedenza quando è stata creata l'entità servizio. Segreto AIOFabricSecret
: riferimento al segreto di Azure Key Vault aggiunto.Area di lavoro ID area di lavoro di Microsoft Fabric annotato in precedenza. Lakehouse L'ID lakehouse di cui hai preso nota in precedenza. Tabella OPCUA
Percorso batch .payload
Usare la configurazione seguente per configurare le colonne nell'output:
Nome Tipo Percorso Timestamp: Timestamp: .Timestamp
AssetName Stringa .assetName
Cliente Stringa .Customer
Batch Intero .Batch
CurrentTemperature Mobile .CurrentTemperature
LastKnownTemperature Mobile .LastKnownTemperature
Pressione Mobile .Pressure
IsSpare Boolean .IsSpare
Selezionare il nome della pipeline, il nome> della pipeline e modificarlo in contestualizzato-data-pipeline.< Selezionare Applica.
Selezionare Salva per salvare la pipeline.
Dopo un breve periodo di tempo, i dati della pipeline iniziano a popolare la tabella nel lakehouse.
Suggerimento
Assicurarsi che nessun altro processo scriva nella tabella OPCUA nel lakehouse. Se si scrive nella tabella da più origini, è possibile che nella tabella vengano visualizzati dati danneggiati.
Come abbiamo risolto il problema?
In questo argomento di avvio rapido sono state usate le pipeline del responsabile del trattamento dei dati per elaborare i dati OPC UA prima di inviarli a microsoft Fabric lakehouse. Sono state usate le pipeline per:
- Arricchire i dati con informazioni contestuali, ad esempio il nome del cliente e il numero di batch.
- Compilare i punti dati mancanti usando gli ultimi valori noti.
- Strutturare i dati in un formato appropriato per la tabella lakehouse.
Pulire le risorse
Se non si intende continuare a usare questa distribuzione, eliminare il cluster Kubernetes in cui sono state distribuite le operazioni di Azure IoT e rimuovere il gruppo di risorse di Azure che contiene il cluster.
È anche possibile eliminare l'area di lavoro di Microsoft Fabric.
Passaggio successivo
Avvio rapido: Distribuire operazioni IoT di Azure in un cluster Kubernetes abilitato per Arc