Creare una pipeline di dati end-to-end in Databricks

Questo articolo illustra come creare e distribuire una pipeline di elaborazione dati end-to-end, tra cui come inserire dati non elaborati, trasformare i dati ed eseguire analisi sui dati elaborati.

Nota

Anche se questo articolo illustra come creare una pipeline di dati completa usando notebook di Databricks e un processo di Azure Databricks per orchestrare un flusso di lavoro, Databricks consiglia di usare tabelle Live Delta, un'interfaccia dichiarativa per la creazione di pipeline di elaborazione dati affidabili, gestibili e testabili.

Che cos'è una pipeline di dati?

Una pipeline di dati implementa i passaggi necessari per spostare i dati dai sistemi di origine, trasformare i dati in base ai requisiti e archiviare i dati in un sistema di destinazione. Una pipeline di dati include tutti i processi necessari per trasformare i dati non elaborati in dati preparati che gli utenti possono utilizzare. Ad esempio, una pipeline di dati potrebbe preparare i dati in modo che gli analisti dei dati e i data scientist possano estrarre valore dai dati tramite analisi e creazione di report.

Un flusso di lavoro di estrazione, trasformazione e caricamento (ETL) è un esempio comune di una pipeline di dati. Nell'elaborazione ETL i dati vengono inseriti dai sistemi di origine e scritti in un'area di gestione temporanea, trasformati in base ai requisiti (garantire la qualità dei dati, deduplicare i record e così via) e quindi scritti in un sistema di destinazione, ad esempio un data warehouse o un data lake.

Passaggi della pipeline di dati

Per iniziare a creare pipeline di dati in Azure Databricks, l'esempio incluso in questo articolo illustra come creare un flusso di lavoro di elaborazione dati:

  • Usare le funzionalità di Azure Databricks per esplorare un set di dati non elaborato.
  • Creare un notebook di Databricks per inserire dati di origine non elaborati e scrivere i dati non elaborati in una tabella di destinazione.
  • Creare un notebook di Databricks per trasformare i dati di origine non elaborati e scrivere i dati trasformati in una tabella di destinazione.
  • Creare un notebook di Databricks per eseguire query sui dati trasformati.
  • Automatizzare la pipeline di dati con un processo di Azure Databricks.

Requisiti

Esempio: Set di dati Million Song

Il set di dati usato in questo esempio è un subset di Million Song Dataset, una raccolta di caratteristiche e metadati per le tracce musicali contemporanee. Questo set di dati è disponibile nei set di dati di esempio inclusi nell'area di lavoro di Azure Databricks.

Passaggio 1: Creare un cluster

Per eseguire l'elaborazione e l'analisi dei dati in questo esempio, creare un cluster per fornire le risorse di calcolo necessarie per eseguire i comandi.

Nota

Poiché questo esempio usa un set di dati di esempio archiviato in DBFS e consiglia di rendere persistenti le tabelle in Unity Catalog, si crea un cluster configurato con la modalità di accesso utente singolo. La modalità di accesso utente singolo fornisce l'accesso completo a DBFS, consentendo anche l'accesso a Unity Catalog. Vedere Procedure consigliate per DBFS e Il catalogo unity.

  1. Fare clic su Calcolo nella barra laterale.
  2. Nella pagina Calcolo fare clic su Crea cluster.
  3. Nella pagina Nuovo cluster immettere un nome univoco per il cluster.
  4. In modalità di accesso selezionare Utente singolo.
  5. In Accesso utente singolo o entità servizio selezionare il nome utente.
  6. Lasciare i valori rimanenti nello stato predefinito e fare clic su Crea cluster.

Per altre informazioni sui cluster Databricks, vedere Calcolo.

Passaggio 2: Esplorare i dati di origine

Per informazioni su come usare l'interfaccia di Azure Databricks per esplorare i dati di origine non elaborati, vedere Esplorare i dati di origine per una pipeline di dati. Se si vuole passare direttamente all'inserimento e alla preparazione dei dati, continuare con il passaggio 3: Inserire i dati non elaborati.

Passaggio 3: Inserire i dati non elaborati

In questo passaggio i dati non elaborati vengono caricati in una tabella per renderli disponibili per un'ulteriore elaborazione. Per gestire gli asset di dati nella piattaforma Databricks, ad esempio tabelle, Databricks consiglia Unity Catalog. Tuttavia, se non si dispone delle autorizzazioni per creare il catalogo e lo schema necessari per pubblicare tabelle nel catalogo Unity, è comunque possibile completare i passaggi seguenti pubblicando le tabelle nel metastore Hive.

Per inserire dati, Databricks consiglia di usare il caricatore automatico. Il caricatore automatico rileva ed elabora automaticamente nuovi file quando arrivano nell'archiviazione di oggetti cloud.

È possibile configurare il caricatore automatico per rilevare automaticamente lo schema dei dati caricati, consentendo di inizializzare le tabelle senza dichiarare in modo esplicito lo schema dei dati ed evolvere lo schema della tabella man mano che vengono introdotte nuove colonne. In questo modo si elimina la necessità di tenere traccia e applicare manualmente le modifiche dello schema nel tempo. Databricks consiglia l'inferenza dello schema quando si usa il caricatore automatico. Tuttavia, come illustrato nel passaggio di esplorazione dei dati, i dati dei brani non contengono informazioni sull'intestazione. Poiché l'intestazione non viene archiviata con i dati, è necessario definire in modo esplicito lo schema, come illustrato nell'esempio seguente.

  1. Nella barra laterale fare clic su New IconNuovo e selezionare Notebook dal menu. Verrà visualizzata la finestra di dialogo Crea notebook .

  2. Immettere un nome per il notebook, Ingest songs dataad esempio . Per impostazione predefinita:

    • Python è il linguaggio selezionato.
    • Il notebook è collegato all'ultimo cluster usato. In questo caso, il cluster creato nel passaggio 1: Creare un cluster.
  3. Immettere quanto segue nella prima cella del notebook:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Se si usa Il catalogo unity, sostituire <table-name> con un catalogo, uno schema e un nome di tabella per contenere i record inseriti , ad esempio data_pipelines.songs_data.raw_song_data. In caso contrario, sostituire <table-name> con il nome di una tabella per contenere i record inseriti, raw_song_dataad esempio .

    Sostituire <checkpoint-path> con un percorso di una directory in DBFS per gestire i file di checkpoint, /tmp/pipeline_get_started/_checkpoint/song_dataad esempio .

  4. Fare clic su Run Menue selezionare Esegui cella. Questo esempio definisce lo schema dei dati usando le informazioni di README, inserisce i dati dei brani da tutti i file contenuti in file_pathe scrive i dati nella tabella specificata da table_name.

Passaggio 4: Preparare i dati non elaborati

Per preparare i dati non elaborati per l'analisi, i passaggi seguenti trasformano i dati dei brani non elaborati filtrando le colonne non necessarie e aggiungendo un nuovo campo contenente un timestamp per la creazione del nuovo record.

  1. Nella barra laterale fare clic su New IconNuovo e selezionare Notebook dal menu. Verrà visualizzata la finestra di dialogo Crea notebook .

  2. Immettere un nome per il notebook. Ad esempio: Prepare songs data. Modificare la lingua predefinita in SQL.

  3. Immettere quanto segue nella prima cella del notebook:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Se si usa Il catalogo Unity, sostituire <table-name> con un catalogo, uno schema e un nome di tabella per contenere i record filtrati e trasformati (ad esempio, data_pipelines.songs_data.prepared_song_data). In caso contrario, sostituire <table-name> con il nome di una tabella per contenere i record filtrati e trasformati ,ad esempio prepared_song_data.

    Sostituire <raw-songs-table-name> con il nome della tabella contenente i record dei brani non elaborati inseriti nel passaggio precedente.

  4. Fare clic su Run Menue selezionare Esegui cella.

Passaggio 5: Eseguire una query sui dati trasformati

In questo passaggio si estende la pipeline di elaborazione aggiungendo query per analizzare i dati dei brani. Queste query usano i record preparati creati nel passaggio precedente.

  1. Nella barra laterale fare clic su New IconNuovo e selezionare Notebook dal menu. Verrà visualizzata la finestra di dialogo Crea notebook .

  2. Immettere un nome per il notebook. Ad esempio: Analyze songs data. Modificare la lingua predefinita in SQL.

  3. Immettere quanto segue nella prima cella del notebook:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Sostituire <prepared-songs-table-name> con il nome della tabella contenente i dati preparati. Ad esempio: data_pipelines.songs_data.prepared_song_data.

  4. Fare clic Down Caret nel menu Azioni cella, selezionare Aggiungi cella sottostante e immettere quanto segue nella nuova cella:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Sostituire <prepared-songs-table-name> con il nome della tabella preparata creata nel passaggio precedente. Ad esempio: data_pipelines.songs_data.prepared_song_data.

  5. Per eseguire le query e visualizzare l'output, fare clic su Esegui tutto.

Passaggio 6: Creare un processo di Azure Databricks per eseguire la pipeline

È possibile creare un flusso di lavoro per automatizzare l'esecuzione dei passaggi di inserimento, elaborazione e analisi dei dati usando un processo di Azure Databricks.

  1. Nell'area di lavoro Data Science & Engineering eseguire una delle operazioni seguenti:
    • Fare clic su Jobs IconFlussi di lavoro nella barra laterale e fare clic su .Create Job Button
    • Nella barra laterale fare clic su New IconNuovo e selezionare Processo.
  2. Nella finestra di dialogo attività della scheda Attività sostituire Aggiungi un nome per il processo con il nome del processo. Ad esempio, "Flusso di lavoro Canzoni".
  3. In Nome attività immettere un nome per la prima attività, Ingest_songs_dataad esempio .
  4. In Tipo selezionare il tipo di attività Notebook .
  5. In Origine selezionare Area di lavoro.
  6. Usare il browser file per trovare il notebook di inserimento dati, fare clic sul nome del notebook e fare clic su Conferma.
  7. In Cluster selezionare Shared_job_cluster o il cluster creato nel Create a cluster passaggio.
  8. Fai clic su Crea.
  9. Fare clic Add Task Button sotto l'attività appena creata e selezionare Notebook.
  10. In Nome attività immettere un nome per l'attività, Prepare_songs_dataad esempio .
  11. In Tipo selezionare il tipo di attività Notebook .
  12. In Origine selezionare Area di lavoro.
  13. Usare il browser file per trovare il notebook di preparazione dei dati, fare clic sul nome del notebook e fare clic su Conferma.
  14. In Cluster selezionare Shared_job_cluster o il cluster creato nel Create a cluster passaggio.
  15. Fai clic su Crea.
  16. Fare clic Add Task Button sotto l'attività appena creata e selezionare Notebook.
  17. In Nome attività immettere un nome per l'attività, Analyze_songs_dataad esempio .
  18. In Tipo selezionare il tipo di attività Notebook .
  19. In Origine selezionare Area di lavoro.
  20. Usare il browser file per trovare il notebook di analisi dei dati, fare clic sul nome del notebook e fare clic su Conferma.
  21. In Cluster selezionare Shared_job_cluster o il cluster creato nel Create a cluster passaggio.
  22. Fai clic su Crea.
  23. Per eseguire il flusso di lavoro, fare clic su Run Now Button. Per visualizzare i dettagli per l'esecuzione, fare clic sul collegamento nella colonna Ora di inizio per l'esecuzione nella visualizzazione esecuzioni del processo. Fare clic su ogni attività per visualizzare i dettagli per l'esecuzione dell'attività.
  24. Per visualizzare i risultati al termine del flusso di lavoro, fare clic sull'attività di analisi dei dati finale. Viene visualizzata la pagina Output e vengono visualizzati i risultati della query.

Passaggio 7: Pianificare il processo della pipeline di dati

Nota

Per illustrare l'uso di un processo di Azure Databricks per orchestrare un flusso di lavoro pianificato, questo esempio introduttivo separa i passaggi di inserimento, preparazione e analisi in notebook separati e ogni notebook viene quindi usato per creare un'attività nel processo. Se tutta l'elaborazione è contenuta in un singolo notebook, è possibile pianificare facilmente il notebook direttamente dall'interfaccia utente del notebook di Azure Databricks. Vedere Creare e gestire processi di notebook pianificati.

Un requisito comune consiste nell'eseguire una pipeline di dati su base pianificata. Per definire una pianificazione per il processo che esegue la pipeline:

  1. Fare clic su Jobs IconFlussi di lavoro nella barra laterale.
  2. Nella colonna Nome fare clic sul nome del processo. Nel pannello laterale vengono visualizzati i dettagli del processo.
  3. Fare clic su Aggiungi trigger nel pannello Dettagli processo e selezionare Pianificato in Tipo di trigger.
  4. Specificare il periodo, l'ora di inizio e il fuso orario. Facoltativamente, selezionare la casella di controllo Mostra sintassi Cron per visualizzare e modificare la pianificazione nella sintassi Cron di Quarzi.
  5. Fare clic su Salva.

Altre informazioni