Guida introduttiva: Usare le pipeline del processore di dati per elaborare i dati dagli asset OPC UA

Importante

Anteprima delle operazioni di Azure IoT: abilitata da Azure Arc è attualmente disponibile in ANTEPRIMA. Non è consigliabile usare questo software di anteprima negli ambienti di produzione.

Vedere le condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure per termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale.

In questo argomento di avvio rapido si usano pipeline di Azure IoT Data Processor (anteprima) per elaborare e arricchire i messaggi dagli asset OPC UA prima di inviare i dati a una lakehouse di Microsoft Fabric OneLake per l'archiviazione e l'analisi.

Prerequisiti

Prima di iniziare questa guida introduttiva, è necessario completare le guide introduttive seguenti:

È necessaria anche una sottoscrizione di Microsoft Fabric. È possibile iscriversi per ottenere una versione di valutazione gratuita di Microsoft Fabric (anteprima). Nella sottoscrizione di Microsoft Fabric verificare che le impostazioni seguenti siano abilitate per il tenant:

Per altre informazioni, vedere Informazioni sulle impostazioni del tenant in Microsoft Fabric>.

Quale problema risolveremo?

Prima di inviare dati al cloud per l'archiviazione e l'analisi, è possibile elaborare e arricchire i dati. Ad esempio, è possibile aggiungere informazioni contestualizzate ai dati oppure filtrare i dati che non sono rilevanti per l'analisi. Le pipeline del processore di dati IoT di Azure consentono di elaborare e arricchire i dati prima di inviarli al cloud.

Creare un'entità servizio

Per creare un'entità servizio che consente alla pipeline di accedere all'area di lavoro di Microsoft Fabric:

  1. Usare il comando seguente dell'interfaccia della riga di comando di Azure per creare un'entità servizio.

    az ad sp create-for-rbac --name <YOUR_SP_NAME> 
    
  2. L'output di questo comando include un appIdoggetto , displayName, passworde tenant. Prendere nota di questi valori da usare quando si configura l'accesso all'area di lavoro infrastruttura, creare un segreto e configurare una destinazione della pipeline:

    {
        "appId": "<app-id>",
        "displayName": "<name>",
        "password": "<client-secret>",
        "tenant": "<tenant-id>"
    }
    

Concedere l'accesso all'area di lavoro di Microsoft Fabric

Passare a Microsoft Fabric.

Per assicurarsi di visualizzare l'opzione Gestisci accesso nell'area di lavoro di Microsoft Fabric, creare una nuova area di lavoro:

  1. Selezionare Aree di lavoro nella barra di spostamento a sinistra e quindi selezionare Nuova area di lavoro:

    Screenshot that shows how to create a new Microsoft Fabric workspace.

  2. Immettere un nome per l'area di lavoro, ad esempio Il nome dell'area di lavoro AIO e selezionare Applica.

Per concedere all'entità servizio l'accesso all'area di lavoro di Microsoft Fabric:

  1. Passare all'area di lavoro di Microsoft Fabric e selezionare Gestisci accesso:

    Screenshot that shows how to access the Manage access option in a workspace.

  2. Selezionare Aggiungi persone o gruppi, quindi incollare il nome visualizzato dell'entità servizio nel passaggio precedente e concedere almeno l'accesso collaboratore .

    Screenshot that shows how to add a service principal to a workspace and add it to the contributor role.

  3. Selezionare Aggiungi per concedere le autorizzazioni di collaboratore all'entità servizio nell'area di lavoro.

Creare una lakehouse

Creare una lakehouse nell'area di lavoro di Microsoft Fabric:

  1. Passare a Ingegneria dei dati e quindi selezionare Lakehouse (anteprima):

    Screenshot that shows how to create a lakehouse.

  2. Immettere un nome per il lakehouse, ad esempio yourname_pipeline_destination e selezionare Crea.

Aggiungere un segreto al cluster

Per accedere alla lakehouse da una pipeline del responsabile del trattamento dei dati, è necessario abilitare il cluster per accedere ai dettagli dell'entità servizio creati in precedenza. È necessario configurare Azure Key Vault con i dettagli dell'entità servizio in modo che il cluster possa recuperarli.

Usare il comando seguente per aggiungere un segreto all'insieme di credenziali delle chiavi di Azure contenente il segreto client annotato quando è stata creata l'entità servizio. L'insieme di credenziali delle chiavi di Azure è stato creato nell'argomento di avvio rapido Distribuire le operazioni di Azure IoT in un cluster Kubernetes abilitato per Arc:

az keyvault secret set --vault-name <your-key-vault-name> --name AIOFabricSecret --value <client-secret>

Per aggiungere il riferimento al segreto al cluster Kubernetes, modificare la risorsa aio-default-spcsecretproviderclass :

  1. Immettere il comando seguente nel computer in cui è in esecuzione il cluster per modificare la risorsa aio-default-spcsecretproviderclass . La configurazione YAML per la risorsa viene aperta nell'editor predefinito:

    kubectl edit secretproviderclass aio-default-spc -n azure-iot-operations
    
  2. Aggiungere una nuova voce alla matrice di segreti per il nuovo segreto di Azure Key Vault. La spec sezione è simile all'esempio seguente:

    # Edit the object below. Lines beginning with a '#' will be ignored,
    # and an empty file will abort the edit. If an error occurs while saving this file will be
    # reopened with the relevant failures.
    #
    apiVersion: secrets-store.csi.x-k8s.io/v1
    kind: SecretProviderClass
    metadata:
      creationTimestamp: "2023-11-16T11:43:31Z"
      generation: 2
      name: aio-default-spc
      namespace: azure-iot-operations
      resourceVersion: "89083"
      uid: cda6add7-3931-47bd-b924-719cc862ca29
    spec:                                      
      parameters:                              
        keyvaultName: <this is the name of your key vault>         
        objects: |                             
          array:                               
            - |                                
              objectName: azure-iot-operations
              objectType: secret           
              objectVersion: ""            
            - |                            
              objectName: AIOFabricSecret  
              objectType: secret           
              objectVersion: ""            
        tenantId: <this is your tenant id>
        usePodIdentity: "false"                       
      provider: azure
    
  3. Salvare le modifiche e uscire dall'editor.

Il driver CSI aggiorna i segreti usando un intervallo di polling, pertanto il nuovo segreto non è disponibile per il pod finché non viene raggiunto l'intervallo di polling. Per aggiornare immediatamente il pod, riavviare i pod per il componente. Per il responsabile del trattamento dei dati, eseguire i comandi seguenti:

kubectl delete pod aio-dp-reader-worker-0 -n azure-iot-operations
kubectl delete pod aio-dp-runner-worker-0 -n azure-iot-operations

Creare una pipeline di base

Creare una pipeline di base per passare i dati a un argomento MQTT separato.

Nei passaggi seguenti lasciare tutti i valori predefiniti, se non diversamente specificato:

  1. Nel portale delle operazioni di Azure IoT passare a Pipeline di dati nel cluster.

  2. Per creare una nuova pipeline, selezionare + Crea pipeline.

  3. Selezionare Configura MQ di origine>, quindi immettere le informazioni nell'argomento MQTT dei dati del termostato e quindi selezionare Applica:

    Parametro valore
    Name input data
    Gestore tls://aio-mq-dmqtt-frontend:8883
    Authentication Service account token (SAT)
    Argomento azure-iot-operations/data/opc.tcp/opc.tcp-1/#
    Formato dati JSON
  4. Selezionare Trasforma dalle fasi della pipeline come seconda fase della pipeline. Immettere i valori seguenti e quindi selezionare Applica:

    Parametro Valore
    Display name passthrough
    Query .

    Questa semplice trasformazione JQ passa attraverso il messaggio in arrivo invariato.

  5. Infine, selezionare Aggiungi destinazione, selezionare MQ nell'elenco delle destinazioni, immettere le informazioni seguenti e quindi selezionare Applica:

    Parametro Valore
    Display name output data
    Gestore tls://aio-mq-dmqtt-frontend:8883
    Authentication Service account token (SAT)
    Argomento dp-output
    Formato dati JSON
    Percorso .payload
  6. Selezionare il nome della pipeline, il nome> della pipeline e modificarlo in passthrough-data-pipeline.< Selezionare Applica.

  7. Selezionare Salva per salvare e distribuire la pipeline. La distribuzione di questa pipeline nel cluster richiede alcuni secondi.

  8. Eseguire il comando seguente per creare un ambiente shell nel pod mqtt-client creato nella guida introduttiva precedente:

    kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh
    
  9. Nella shell nel pod mqtt-client connettersi di nuovo al broker MQ usando il client MQTT. Questa volta specificare l'argomento dp-output.

    mqttui -b mqtts://aio-mq-dmqtt-frontend:8883 -u '$sat' --password $(cat /var/run/secrets/tokens/mq-sat) --insecure "dp-output"
    
  10. Viene visualizzato lo stesso flusso di dati visualizzato in precedenza. Questo comportamento è previsto perché la pipeline di dati pass-through distribuita non trasforma i dati. La pipeline instrada i dati da un argomento MQTT a un altro.

I passaggi successivi sono la creazione di altre due pipeline per elaborare e contestualizzare i dati. Queste pipeline inviano i dati elaborati a un'infrastruttura lakehouse nel cloud per l'analisi.

Creare una pipeline di dati di riferimento

Creare una pipeline di dati di riferimento per archiviare temporaneamente i dati di riferimento in un set di dati di riferimento. Successivamente, si usano questi dati di riferimento per arricchire i dati inviati al lakehouse di Microsoft Fabric.

Nei passaggi seguenti lasciare tutti i valori predefiniti, se non diversamente specificato:

  1. Nel portale delle operazioni di Azure IoT passare a Pipeline di dati nel cluster.

  2. Selezionare + Crea pipeline per creare una nuova pipeline.

  3. Selezionare Configura MQ di origine>, quindi immettere le informazioni nell'argomento dei dati di riferimento e quindi selezionare Applica:

    Parametro valore
    Name reference data
    Gestore tls://aio-mq-dmqtt-frontend:8883
    Authentication Service account token (SAT)
    Argomento reference_data
    Formato dati JSON
  4. Selezionare + Aggiungi destinazione e impostare la destinazione su Set di dati di riferimento.

  5. Selezionare Crea nuovo accanto a Set di dati per configurare un set di dati di riferimento per archiviare i dati di riferimento per la contestualizzazione. Usare le informazioni nella tabella seguente per creare il set di dati di riferimento:

    Parametro valore
    Name equipment-data
    Ora di scadenza 1h
  6. Selezionare Crea per salvare i dettagli della destinazione del set di dati di riferimento. La distribuzione del set di dati nel cluster richiede alcuni secondi e diventa visibile nella visualizzazione elenco set di dati.

  7. Usare i valori nella tabella seguente per configurare la fase di destinazione. Selezionare quindi Applica:

    Parametro valore
    Name reference data output
    Set di dati equipment-data (selezionare dall'elenco a discesa)
  8. Selezionare il nome della pipeline, il nome> della pipeline e modificarlo in reference-data-pipeline.< Selezionare Applica.

  9. Selezionare la fase centrale ed eliminarla. Usare quindi il cursore per connettere la fase di input alla fase di output. Il risultato è simile allo screenshot seguente:

    Screenshot that shows the reference data pipeline.

  10. Selezionare Salva per salvare la pipeline.

Per archiviare i dati di riferimento, pubblicarli come messaggio MQTT nell'argomento reference_data usando lo strumento mqttui:

  1. Creare un ambiente shell nel pod mqtt-client creato nella guida introduttiva precedente:

    kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh
    
  2. Pubblicare il messaggio:

    mqttui -b mqtts://aio-mq-dmqtt-frontend:8883 -u '$sat' --password $(cat /var/run/secrets/tokens/mq-sat) --insecure publish "reference_data" '{ "customer": "Contoso", "batch": 102, "equipment": "Boiler", "location": "Seattle", "isSpare": true }'
    

Dopo aver pubblicato il messaggio, la pipeline riceve il messaggio e archivia i dati nel set di dati di riferimento dei dati delle apparecchiature.

Creare una pipeline di dati per arricchire i dati

Creare una pipeline del responsabile del trattamento dei dati per elaborare e arricchire i dati prima di inviarli al lakehouse di Microsoft Fabric. Questa pipeline usa i dati archiviati nel set di dati di riferimento dei dati delle apparecchiature per arricchire i messaggi.

  1. Nel portale delle operazioni di Azure IoT passare a Pipeline di dati nel cluster.

  2. Selezionare + Crea pipeline per creare una nuova pipeline.

  3. Selezionare Configure source MQ (Configura MQ di origine>), usare le informazioni nella tabella seguente per immettere informazioni nell'argomento MQTT relativo ai dati del termostato e quindi selezionare Applica:

    Parametro Valore
    Display name OPC UA data
    Gestore tls://aio-mq-dmqtt-frontend:8883
    Authentication Service account token (SAT)
    Argomento azure-iot-operations/data/opc.tcp/opc.tcp-1/thermostat
    Formato dati JSON
  4. Per tenere traccia dell'ultimo valore noto (LKV) della temperatura, selezionare Fasi e selezionare Ultimi valori noti. Usare le informazioni riportate nelle tabelle seguenti per configurare la fase per tenere traccia delle LKV della temperatura per i messaggi con solo messaggi di stato caldaia, quindi selezionare Applica:

    Parametro Valore
    Display name lkv stage

    Aggiungere due proprietà:

    Percorso di input Percorso di output Ora di scadenza
    .payload.payload["temperature"] .payload.payload.temperature_lkv 01h
    .payload.payload["Tag 10"] .payload.payload.tag1_lkv 01h

    Questa fase arricchisce i messaggi in arrivo con i valori più recenti temperature e Tag 10 se mancanti. I valori più recenti rilevati vengono conservati per 1 ora. Se nel messaggio vengono visualizzate le proprietà rilevate, il valore più recente rilevato viene aggiornato per assicurarsi che i valori siano sempre aggiornati.

  5. Per arricchire il messaggio con i dati di riferimento contestuali, selezionare Arricchisci nelle fasi della pipeline. Configurare la fase usando i valori nella tabella seguente e quindi selezionare Applica:

    Parametro valore
    Name enrich with reference dataset
    Set di dati equipment-data (dall'elenco a discesa)
    Percorso di output .payload.enrich

    Questo passaggio arricchisce il messaggio OPC UA con i dati del set di dati equipment-data creato dalla pipeline di dati di riferimento.

    Poiché non si forniscono condizioni, il messaggio viene arricchito con tutti i dati di riferimento. È possibile usare join basati su ID (KeyMatch) e join basati su timestamp (PastNearest e FutureNearest) per filtrare i dati di riferimento arricchiti in base alle condizioni specificate.

  6. Per trasformare i dati, selezionare Trasforma dalle fasi della pipeline. Configurare la fase usando i valori nella tabella seguente e quindi selezionare Applica:

    Parametro Valore
    Display name construct full payload

    L'espressione jq seguente formatta la proprietà payload per includere tutti i valori di telemetria e tutte le informazioni contestuali come coppie chiave-valore:

    .payload = {
        assetName: .payload.dataSetWriterName,
        Timestamp: .payload.timestamp,
        Customer: .payload.enrich?.customer,
        Batch: .payload.enrich?.batch,
        Equipment: .payload.enrich?.equipment,
        IsSpare: .payload.enrich?.isSpare,
        Location: .payload.enrich?.location,
        CurrentTemperature : .payload.payload."temperature"?.Value,
        LastKnownTemperature: .payload.payload."temperature_lkv"?.Value,
        Pressure: (if .payload.payload | has("Tag 10") then .payload.payload."Tag 10"?.Value else .payload.payload."tag1_lkv"?.Value end)
    }
    

    Usare l'espressione precedente come espressione di trasformazione. Questa espressione di trasformazione compila un payload contenente solo le coppie chiave valore necessarie per i dati di telemetria e i dati contestuali. Rinomina anche i tag con nomi descrittivi.

  7. Infine, selezionare Aggiungi destinazione, selezionare Fabric Lakehouse, quindi immettere le informazioni seguenti per configurare la destinazione. È possibile trovare l'ID dell'area di lavoro e l'ID lakehouse dall'URL usato per accedere al lakehouse di Fabric. L'URL è simile al seguente: https://msit.powerbi.com/groups/<workspace ID>/lakehouses/<lakehouse ID>?experience=data-engineering.

    Parametro valore
    Name processed OPC UA data
    URL https://msit-onelake.pbidedicated.windows.net
    Authentication Service principal
    ID tenant ID tenant annotato in precedenza quando è stata creata l'entità servizio.
    ID client L'ID client è l'ID app annotato in precedenza quando è stata creata l'entità servizio.
    Segreto AIOFabricSecret : riferimento al segreto di Azure Key Vault aggiunto.
    Area di lavoro ID area di lavoro di Microsoft Fabric annotato in precedenza.
    Lakehouse L'ID lakehouse di cui hai preso nota in precedenza.
    Tabella OPCUA
    Percorso batch .payload

    Usare la configurazione seguente per configurare le colonne nell'output:

    Nome Tipo Percorso
    Timestamp: Timestamp: .Timestamp
    AssetName Stringa .assetName
    Cliente Stringa .Customer
    Batch Intero .Batch
    CurrentTemperature Mobile .CurrentTemperature
    LastKnownTemperature Mobile .LastKnownTemperature
    Pressione Mobile .Pressure
    IsSpare Boolean .IsSpare
  8. Selezionare il nome della pipeline, il nome> della pipeline e modificarlo in contestualizzato-data-pipeline.< Selezionare Applica.

  9. Selezionare Salva per salvare la pipeline.

  10. Dopo un breve periodo di tempo, i dati della pipeline iniziano a popolare la tabella nel lakehouse.

Screenshot that shows data from the pipeline appearing in the lakehouse table.

Suggerimento

Assicurarsi che nessun altro processo scriva nella tabella OPCUA nel lakehouse. Se si scrive nella tabella da più origini, è possibile che nella tabella vengano visualizzati dati danneggiati.

Come abbiamo risolto il problema?

In questo argomento di avvio rapido sono state usate le pipeline del responsabile del trattamento dei dati per elaborare i dati OPC UA prima di inviarli a microsoft Fabric lakehouse. Sono state usate le pipeline per:

  • Arricchire i dati con informazioni contestuali, ad esempio il nome del cliente e il numero di batch.
  • Compilare i punti dati mancanti usando gli ultimi valori noti.
  • Strutturare i dati in un formato appropriato per la tabella lakehouse.

Pulire le risorse

Se non si intende continuare a usare questa distribuzione, eliminare il cluster Kubernetes in cui sono state distribuite le operazioni di Azure IoT e rimuovere il gruppo di risorse di Azure che contiene il cluster.

È anche possibile eliminare l'area di lavoro di Microsoft Fabric.

Passaggio successivo

Avvio rapido: Distribuire operazioni IoT di Azure in un cluster Kubernetes abilitato per Arc