Condividi tramite


Orchestrazione dei processi Apache Flink® con il gestore dell'orchestrazione del flusso di lavoro di Azure Data Factory (basato su Apache Airflow)

Nota

Azure HDInsight su AKS verrà ritirato il 31 gennaio 2025. Prima del 31 gennaio 2025, sarà necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare interruzioni improvvise dei carichi di lavoro. I cluster rimanenti nella sottoscrizione verranno arrestati e rimossi dall’host.

Solo il supporto di base sarà disponibile fino alla data di ritiro.

Importante

Questa funzionalità è attualmente disponibile solo in anteprima. Le Condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure includono termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale. Per informazioni su questa anteprima specifica, vedere Informazioni sull'anteprima di Azure HDInsight nel servizio Azure Kubernetes. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguire Microsoft per altri aggiornamenti nella Community di Azure HDInsight.

Questo articolo illustra la gestione di un processo Flink usando l'API REST di Azure e la pipeline di dati di orchestrazione con il gestore dell'orchestrazione del flusso di lavoro di Azure Data Factory. Il servizio Gestore dell'orchestrazione del flusso di lavoro di Azure Data Factory è un modo semplice ed efficiente per creare e gestire ambienti Apache Airflow, in modo da eseguire facilmente pipeline di dati su larga scala.

Apache Airflow è una piattaforma open source che crea, pianifica e monitora flussi di lavoro di dati complessi a livello di programmazione. Consente di definire un set di attività, denominate operatori, che possono essere combinate in grafi diretti aciclici (DAG) per rappresentare le pipeline di dati.

Il diagramma seguente illustra il posizionamento di Airflow, Key Vault e HDInsight su AKS in Azure.

Screenshot che mostra il posizionamento del flusso d’aria, dell'insieme di credenziali delle chiavi e di HDInsight su AKS in Azure.

In base all'ambito, vengono create più entità servizio si Azure per limitare l'accesso necessario e gestire il ciclo di vita delle credenziali del client in modo indipendente.

È consigliabile ruotare periodicamente le chiavi di accesso o i segreti.

Passaggi di configurazione

  1. Configurare il cluster Flink

  2. Caricare il file JAR del processo Flink nell'account di archiviazione. Può trattarsi dell'account di archiviazione primario associato al cluster Flink o di qualsiasi altro account di archiviazione, in cui è necessario assegnare il ruolo di "Proprietario dei dati BLOB di archiviazione" all'identità gestita assegnata all'utente e usata per il cluster in questo account di archiviazione.

  3. Azure Key Vault: è possibile seguire questa esercitazione per creare un nuovo insieme di credenziali delle chiavi di Azure nel caso in cui non ne sia disponibile uno.

  4. Creare un’entità servizio Microsoft Entra per accedere a Key Vault: concedere l'autorizzazione per accedere all’insieme di credenziali delle chiavi di Azure con il ruolo "Responsabile dei segreti di Key Vault" e prendere nota della password "appId" e del "tenant" nella risposta. È necessario che Airflow usi lo stesso sistema di archiviazione di Key Vault come back-end per l'archiviazione di informazioni riservate.

    az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID> 
    
  5. Abilitare Gestore dell'orchestrazione del flusso di lavoro di Azure Key Vault per archiviare e gestire le informazioni riservate in modo sicuro e centralizzato. In questo modo è possibile usare variabili e connessioni, che vengono automaticamente archiviate in Azure Key Vault. Il nome delle connessioni e delle variabili deve essere preceduto da variables_prefix definito in AIRFLOW__SECRETS__BACKEND_KWARGS. Ad esempio, se il prefisso variables_prefix presenta un valore come hdinsight-aks-variables, nel caso di una variabile chiave come hello, è consigliabile archiviare la variabile in hdinsight-aks-variable -hello.

    • Aggiungere le impostazioni seguenti per le sostituzioni della configurazione airflow nelle proprietà di runtime integrate:

      • AIRFLOW__SECRETS__BACKEND: "airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"

      • AIRFLOW__SECRETS__BACKEND_KWARGS:
        "{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”

    • Aggiungere l'impostazione seguente per la configurazione delle variabili d'ambiente nelle proprietà del runtime integrato di Airflow:

      • AZURE_CLIENT_ID = <App Id from Create Azure AD Service Principal>

      • AZURE_TENANT_ID = <Tenant from Create Azure AD Service Principal>

      • AZURE_CLIENT_SECRET = <Password from Create Azure AD Service Principal>

      Aggiungere i requisiti di Airflow seguenti: apache-airflow-providers-microsoft-azure

      La schermata mostra la configurazione del flusso d'aria e le variabili d'ambiente.

  6. Creare entità servizio Microsoft Entra per accedere ad Azure: concedere l'autorizzazione per accedere a HDInsight su AKS con il ruolo Collaboratore, prendere nota di appId, password e tenant dalla risposta.

    az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>

  7. Creare i segreti seguenti nell'insieme di credenziali delle chiavi con il valore di appId, password e tenant dell'entità servizio AD precedente, preceduto dalla proprietà variables_prefix definito in AIRFLOW__SECRETS__BACKEND_KWARGS. Il codice DAG può accedere a una delle variabili seguenti senza variables_prefix.

    • hdinsight-aks-variables-api-client-id=<App ID from previous step>

    • hdinsight-aks-variables-api-secret=<Password from previous step>

    • hdinsight-aks-variables-tenant-id=<Tenant from previous step>

    from airflow.models import Variable 
    
    def retrieve_variable_from_akv(): 
    
        variable_value = Variable.get("client-id") 
    
        print(variable_value) 
    

Definizione di DAG

Il concetto di DAG (Grafo Aciclico Diretto) costituisce il nucleo di Airflow, che raggruppa i Tasks, organizzandoli in base alle dipendenze e alle relazioni che indicano il modo in cui devono essere eseguiti.

Per dichiarare un DAG sono disponibili tre modi:

  1. È possibile utilizzare un gestore del contesto, che aggiunge il DAG a qualsiasi elemento al suo interno in modo implicito

  2. È possibile usare un costruttore standard, passando il DAG a qualsiasi operatore usato

  3. È possibile utilizzare il decoratore @dag per trasformare una funzione in un generatore di DAG (da airflow.decorators import dag)

I DAG non rappresentano nulla in mancanza di Tasks da eseguire, che si presentano sotto forma di Operatori, Sensori o TaskFlow.

Per informazioni più dettagliate sui DAG, Flusso di controllo, SubDAG, TaskGroup, e così via, visitare direttamente il sito diApache Airflow. 

Esecuzione dei DAG

Il codice di esempio è disponibile in Git. Scaricare il codice in locale nel computer e caricare il file wordcount.py in un archivio BLOB. Seguire la procedura per importare DAG nel flusso di lavoro creato durante la configurazione.

Il wordcount.py è un esempio di orchestrazione di un invio di processo Flink tramite Apache Airflow con HDInsight su AKS. Il DAG ha due attività:

  • get OAuth Token

  • Richiamare l'API REST per l'invio di processi Flink di HDInsight per inviare un nuovo processo

Il DAG prevede di avere la configurazione per l'entità servizio, come descritto durante il processo di installazione per le credenziali client OAuth e il passaggio della configurazione di input seguente per l'esecuzione.

Passaggi di esecuzione

  1. Eseguire il DAG dall'interfaccia utente di Airflow. È possibile aprire l'interfaccia utente di Gestore dell'orchestrazione del flusso di lavoro di Azure Data Factory facendo clic sull'icona di Monitoraggio.

    Screenshot che mostra l'apertura dell'interfaccia utente del gestore dell'orchestrazione del flusso di lavoro di Azure Data Factory facendo clic sull'icona di monitoraggio.

  2. Selezionare il DAG "FlinkWordCountExample" nella pagina "DAG".

    Screenshot che mostra la selezione dell'esempio di conteggio parole Flink.

  3. Fare clic sull'icona "esegui" nell'angolo in alto a destra e selezionare "Attiva DAG con configurazione".

    Screenshot che mostra l'icona di esecuzione selezionata.

  4. Passare il file JSON di configurazione richiesto

    { 
    
      "jarName":"WordCount.jar", 
    
      "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", 
    
      "subscritpion":"<cluster subscription id>", 
    
      "rg":"<cluster resource group>", 
    
      "poolNm":"<cluster pool name>", 
    
      "clusterNm":"<cluster name>" 
    
    } 
    
  5. Fare clic sul pulsante "Attiva" per avviare l'esecuzione del DAG.

  6. È possibile visualizzare lo stato delle attività del DAG dalla relativa esecuzione

    Screenshot che mostra lo stato dell'attività DAG.

  7. Convalidare l'esecuzione del processo dal portale

    Screenshot che mostra la convalida dell'esecuzione del processo.

  8. Convalidare il processo da "Apache Flink Dashboard"

    Screenshot che mostra il dashboard Apache Flink.

Codice di esempio

Questo è un esempio di orchestrazione della pipeline di dati usando Airflow con HDInsight su AKS.

Il DAG prevede di avere la configurazione per l'entità servizio per le credenziali client OAuth e di passare la configurazione di input seguente per l'esecuzione:

{
 'jarName':'WordCount.jar',
 'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net', 
 'subscritpion':'<cluster subscription id>',
 'rg':'<cluster resource group>', 
 'poolNm':'<cluster pool name>',
 'clusterNm':'<cluster name>'
 }

Riferimento