Condividi tramite


Orchestrare i processi di Azure Databricks con Apache Airflow

Questo articolo descrive il supporto di Apache Airflow per orchestrare le pipeline di dati con Azure Databricks, contiene istruzioni per l'installazione e la configurazione di Airflow in locale e fornisce un esempio di distribuzione ed esecuzione di un flusso di lavoro di Azure Databricks con Airflow.

Orchestrazione dei processi in una pipeline di dati

Lo sviluppo e la distribuzione di una pipeline di elaborazione dati spesso richiedono la gestione di dipendenze complesse tra le attività. Ad esempio, una pipeline potrebbe leggere i dati da un'origine, pulire i dati, trasformare i dati puliti e scrivere i dati trasformati in una destinazione. È anche necessario il supporto per i test, la pianificazione e la risoluzione degli errori durante l'operazionalizzazione di una pipeline.

I sistemi del flusso di lavoro affrontano questi problemi consentendo di definire le dipendenze tra attività, pianificare l'esecuzione delle pipeline e monitorare i flussi di lavoro. Apache Airflow è una soluzione open source per la gestione e la pianificazione delle pipeline di dati. Il flusso di aria rappresenta le pipeline di dati come grafici aciclici diretti (DAG) di operazioni. Si definisce un flusso di lavoro in un file Python e Airflow gestisce la pianificazione e l'esecuzione. La connessione Airflow di Azure Databricks consente di sfruttare il motore Spark ottimizzato offerto da Azure Databricks con le funzionalità di pianificazione di Airflow.

Requisiti

  • L'integrazione tra Airflow e Azure Databricks richiede Airflow versione 2.5.0 e successive. Gli esempi in questo articolo vengono testati con Airflow versione 2.6.1.
  • Airflow richiede Python 3.8, 3.9, 3.10 o 3.11. Gli esempi in questo articolo vengono testati con Python 3.8.
  • Le istruzioni in questo articolo per installare ed eseguire Airflow richiedono pipenv per creare un ambiente virtuale Python.

Operatori del flusso di aria per Databricks

Un DAG airflow è costituito da attività, in cui ogni attività esegue un operatore airflow. Gli operatori airflow che supportano l'integrazione con Databricks vengono implementati nel provider Databricks.

Il provider Databricks include operatori per eseguire diverse attività in un'area di lavoro di Azure Databricks, tra cui l'importazione di dati in una tabella, l'esecuzione di query SQL e l'uso delle cartelle Git di Databricks.

Il provider Databricks implementa due operatori per l'attivazione dei processi:

Per creare un nuovo processo di Azure Databricks o reimpostare un processo esistente, il provider Databricks implementa DatabricksCreateJobsOperator. DatabricksCreateJobsOperator usa le richieste API POST /api/2.1/jobs/create e POST /api/2.1/jobs/reset. È possibile usare DatabricksCreateJobsOperator con per DatabricksRunNowOperator creare ed eseguire un processo.

Nota

L'uso degli operatori di Databricks per attivare un processo richiede la fornitura di credenziali nella configurazione della connessione Databricks. Vedere Creare un token di accesso personale di Azure Databricks per Airflow.

Gli operatori Databricks Airflow scrivono l'URL della pagina di esecuzione del processo nei log airflow ogni polling_period_seconds (il valore predefinito è 30 secondi). Per altre informazioni, vedere la pagina del pacchetto apache-airflow-providers-databricks nel sito Web Airflow.

Installare l'integrazione di Airflow di Azure Databricks in locale

Per installare Airflow e il provider Databricks in locale per il test e lo sviluppo, seguire questa procedura. Per altre opzioni di installazione di Airflow, inclusa la creazione di un'installazione di produzione, vedere installazione nella documentazione di Airflow.

Aprire un terminale ed eseguire i comandi seguenti:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Sostituire <firstname>, <lastname>e <email> con il nome utente e l'indirizzo di posta elettronica. Verrà richiesto di immettere una password per l'utente amministratore. Assicurarsi di salvare questa password perché è necessario accedere all'interfaccia utente di Airflow.

Questo script esegue i passaggi seguenti:

  1. Crea una directory denominata airflow e cambia in tale directory.
  2. pipenv Usa per creare e generare un ambiente virtuale Python. Databricks consiglia di usare un ambiente virtuale Python per isolare le versioni dei pacchetti e le dipendenze del codice in tale ambiente. Questo isolamento consente di ridurre le mancate corrispondenze impreviste della versione del pacchetto e i conflitti di dipendenza del codice.
  3. Inizializza una variabile di ambiente denominata AIRFLOW_HOME impostata sul percorso della airflow directory.
  4. Installa Airflow e i pacchetti del provider Airflow Databricks.
  5. Crea una airflow/dags directory. Airflow usa la dags directory per archiviare le definizioni di DAG.
  6. Inizializza un database SQLite usato da Airflow per tenere traccia dei metadati. In una distribuzione airflow di produzione è necessario configurare Airflow con un database standard. Il database SQLite e la configurazione predefinita per la airflow distribuzione airflow vengono inizializzati nella directory .
  7. Crea un utente amministratore per Airflow.

Suggerimento

Per confermare l'installazione del provider Databricks, eseguire il comando seguente nella directory di installazione di Airflow:

airflow providers list

Avviare il server Web Airflow e l'utilità di pianificazione

Il server Web Airflow è necessario per visualizzare l'interfaccia utente airflow. Per avviare il server Web, aprire un terminale nella directory di installazione di Airflow ed eseguire i comandi seguenti:

Nota

Se l'avvio del server Web Airflow non riesce a causa di un conflitto di porte, è possibile modificare la porta predefinita nella configurazione Airflow.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

L'utilità di pianificazione è il componente Airflow che pianifica i DAG. Per avviare l'utilità di pianificazione, aprire un nuovo terminale nella directory di installazione di Airflow ed eseguire i comandi seguenti:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Testare l'installazione di Airflow

Per verificare l'installazione di Airflow, è possibile eseguire uno dei DAG di esempio inclusi in Airflow:

  1. In una finestra del browser aprire http://localhost:8080/home. Accedere all'interfaccia utente di Airflow con il nome utente e la password creati durante l'installazione di Airflow. Viene visualizzata la pagina DaG Airflow.
  2. Fare clic sull'interruttore DAG Pause/Unpause (Sospendi/Annulla) per annullare la sospensione di uno dei gruppi di disponibilità di esempio, example_python_operatorad esempio .
  3. Attivare il DAG di esempio facendo clic sul pulsante Trigger DAG (Trigger DAG ).
  4. Fare clic sul nome del dag per visualizzare i dettagli, incluso lo stato di esecuzione del DAG.

Creare un token di accesso personale di Azure Databricks per Airflow

Airflow si connette a Databricks usando un token di accesso personale di Azure Databricks. Per creare un token di accesso personale:

  1. Nell'area di lavoro di Azure Databricks fare clic sul nome utente di Azure Databricks nella barra superiore e quindi selezionare Impostazioni nell'elenco a discesa.
  2. Fare clic su Sviluppatore.
  3. Accanto a Token di accesso fare clic su Gestisci.
  4. Fare clic su Generare nuovi token.
  5. (Facoltativo) Immettere un commento che consente di identificare questo token in futuro e modificare la durata predefinita del token di 90 giorni. Per creare un token senza durata (scelta non consigliata), lasciare vuota la casella Durata (giorni) (vuota).
  6. Fare clic su Genera.
  7. Copiare il token visualizzato in un percorso sicuro e quindi fare clic su Fine.

Nota

Assicurarsi di salvare il token copiato in un percorso sicuro. Non condividere il token copiato con altri utenti. Se si perde il token copiato, non è possibile rigenerare lo stesso token esatto. È invece necessario ripetere questa procedura per creare un nuovo token. Se si perde il token copiato o si ritiene che il token sia stato compromesso, Databricks consiglia vivamente di eliminare immediatamente il token dall'area di lavoro facendo clic sull'icona del cestino (Revoca) accanto al token nella pagina Token di accesso.

Se non è possibile creare o usare token nell'area di lavoro, questo potrebbe essere dovuto al fatto che l'amministratore dell'area di lavoro ha disabilitato i token o non ha concesso l'autorizzazione per creare o usare token. Vedere l'amministratore dell'area di lavoro o quanto segue:

Nota

Come procedura consigliata per la sicurezza, quando si esegue l'autenticazione con strumenti automatizzati, sistemi, script e app, Databricks consiglia di usare token di accesso personali appartenenti alle entità servizio anziché agli utenti dell'area di lavoro. Per creare token per le entità servizio, vedere Gestire i token per un'entità servizio.

È anche possibile eseguire l'autenticazione ad Azure Databricks usando un token microsoft Entra ID (in precedenza Azure Active Directory). Vedere Databricks Connection (Connessione a Databricks) nella documentazione di Airflow.

Configurare una connessione di Azure Databricks

L'installazione di Airflow contiene una connessione predefinita per Azure Databricks. Per aggiornare la connessione per connettersi all'area di lavoro usando il token di accesso personale creato in precedenza:

  1. In una finestra del browser aprire http://localhost:8080/connection/list/. Se viene richiesto di accedere, immettere il nome utente e la password dell'amministratore.
  2. In ID Conn individuare databricks_default e fare clic sul pulsante Modifica record .
  3. Sostituire il valore nel campo Host con il nome dell'istanza dell'area di lavoro della distribuzione di Azure Databricks, https://adb-123456789.cloud.databricks.comad esempio .
  4. Nel campo Password immettere il token di accesso personale di Azure Databricks.
  5. Fare clic su Salva.

Se si usa un token ID Microsoft Entra, vedere Connessione a Databricks nella documentazione di Airflow per informazioni sulla configurazione dell'autenticazione.

Esempio: Creare un DAG airflow per eseguire un processo di Azure Databricks

L'esempio seguente illustra come creare una semplice distribuzione airflow eseguita nel computer locale e distribuisce un esempio di DAG per attivare le esecuzioni in Azure Databricks. In questo esempio si eseguiranno le seguenti attività:

  1. Creare un nuovo notebook e aggiungere codice per stampare un messaggio di saluto in base a un parametro configurato.
  2. Creare un processo di Azure Databricks con una singola attività che esegue il notebook.
  3. Configurare una connessione Airflow all'area di lavoro di Azure Databricks.
  4. Creare un DAG Airflow per attivare il processo del notebook. Il dag viene definito in uno script Python usando DatabricksRunNowOperator.
  5. Usare l'interfaccia utente airflow per attivare il DAG e visualizzare lo stato dell'esecuzione.

Creare un notebook

Questo esempio usa un notebook contenente due celle:

  • La prima cella contiene un widget di testo Databricks Utilities che definisce una variabile denominata greeting impostata sul valore worldpredefinito .
  • La seconda cella stampa il valore della greeting variabile preceduta da hello.

Per creare il notebook:

  1. Passare all'area di lavoro di Azure Databricks, fare clic su Nuova icona Nuovo nella barra laterale e selezionare Notebook.

  2. Assegnare un nome al notebook, ad esempio Hello Airflow, e assicurarsi che il linguaggio predefinito sia impostato su Python.

  3. Copiare il codice Python seguente e incollarlo nella prima cella del notebook.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Aggiungere una nuova cella sotto la prima cella e copiare e incollare il codice Python seguente nella nuova cella:

    print("hello {}".format(greeting))
    

Creare un processo

  1. Fare clic su Icona Flussi di lavoro Flussi di lavoro nella barra laterale.

  2. Fare clic su Pulsante Crea processo.

    Viene visualizzata la scheda Attività con la finestra di dialogo Crea attività.

    Finestra di dialogo Crea prima attività

  3. Sostituire Aggiungi un nome per il processo con il nome del processo.

  4. Nel campo Nome attività immettere un nome per l'attività, ad esempio greeting-task.

  5. Nel menu a discesa Tipo selezionare Notebook.

  6. Nel menu a discesa Origine selezionare Area di lavoro.

  7. Fare clic sulla casella di testo Percorso e usare il browser file per trovare il notebook creato, fare clic sul nome del notebook e fare clic su Conferma.

  8. Fare clic su Aggiungi in Parametri. Nel campo Chiave immettere greeting. Nel campo Valore immettere Airflow user.

  9. Fare clic su Crea attività.

Nel pannello Dettagli processo copiare il valore id processo. Questo valore è necessario per attivare il processo da Airflow.

Eseguire il processo

Per testare il nuovo processo nell'interfaccia utente dei flussi di lavoro di Azure Databricks, fare clic Pulsante Esegui ora nell'angolo in alto a destra. Al termine dell'esecuzione, è possibile verificare l'output visualizzando i dettagli dell'esecuzione del processo.

Creare un nuovo DAG airflow

Si definisce un DAG Airflow in un file Python. Per creare un daG per attivare il processo del notebook di esempio:

  1. In un editor di testo o in un IDE creare un nuovo file denominato databricks_dag.py con il contenuto seguente:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Sostituire JOB_ID con il valore dell'ID processo salvato in precedenza.

  2. Salvare il file nella airflow/dags directory. Airflow legge e installa automaticamente i file DAG archiviati in airflow/dags/.

Installare e verificare il DAG in Airflow

Per attivare e verificare il DAG nell'interfaccia utente airflow:

  1. In una finestra del browser aprire http://localhost:8080/home. Viene visualizzata la schermata DaG airflow.
  2. Individuare databricks_dag e fare clic sull'interruttore DAG Pause/Unpause per rimuovere il dag.
  3. Attivare il DAG facendo clic sul pulsante Trigger DAG (Trigger DAG ).
  4. Fare clic su un'esecuzione nella colonna Esecuzioni per visualizzare lo stato e i dettagli dell'esecuzione.