Condividi tramite


Esercitazione: Costruire un'ETL con le pipeline dichiarative di Lakeflow Spark

Questa esercitazione illustra come creare e distribuire una pipeline ETL (estrazione, trasformazione e caricamento) per l'orchestrazione dei dati usando le pipeline dichiarative di Lakeflow Spark e il caricatore automatico. Una pipeline ETL implementa i passaggi per leggere i dati dai sistemi di origine, trasformare tali dati in base ai requisiti, ad esempio i controlli della qualità dei dati e registrare la deduplicazione dei dati e scrivere i dati in un sistema di destinazione, ad esempio un data warehouse o un data lake.

In questa esercitazione si useranno le pipeline e il caricatore automatico per:

  • Inserire dati di origine non elaborati in una tabella di destinazione.
  • Trasformare i dati di origine non elaborati e scrivere i dati trasformati in due viste materializzate di destinazione.
  • Interrogare i dati trasformati.
  • Automatizzare la pipeline ETL con un lavoro di Databricks.

Per altre informazioni sulle pipeline e sul caricatore automatico, vedere Pipeline dichiarative di Lakeflow Spark e Che cos'è il caricatore automatico?

Requisiti

Per completare questa esercitazione, è necessario soddisfare i requisiti seguenti:

Informazioni sul set di dati

Il set di dati usato in questo esempio è un subset di Million Song Dataset, una raccolta di caratteristiche e metadati per le tracce musicali contemporanee. Questo set di dati è disponibile nei set di dati di esempio inclusi nell'area di lavoro di Azure Databricks.

Passaggio 1: Creare una pipeline

Creare prima di tutto una pipeline definendo i set di dati nei file (denominato codice sorgente) usando la sintassi della pipeline. Ogni file di codice sorgente può contenere una sola lingua, ma è possibile aggiungere più file specifici della lingua nella pipeline. Per altre informazioni, vedere Pipeline dichiarative di Lakeflow Spark

Questo tutorial usa il calcolo serverless e Unity Catalog. Per tutte le opzioni di configurazione non specificate, usare le impostazioni predefinite. Se l'ambiente di calcolo serverless non è abilitato o supportato nell'area di lavoro, è possibile completare l'esercitazione come scritto usando le impostazioni di calcolo predefinite.

Per creare una nuova pipeline, seguire questa procedura:

  1. Nell'area di lavoro fare clic sull'icona Più.Novità nella barra laterale, quindi selezionare Pipeline ETL.
  2. Dai alla pipeline un nome univoco.
  3. Appena sotto il nome, selezionare il catalogo predefinito e lo schema per i dati generati. È possibile specificare altre destinazioni nelle trasformazioni, ma questa esercitazione usa queste impostazioni predefinite. È necessario disporre delle autorizzazioni per il catalogo e lo schema creati. Vedere Requisiti.
  4. Per questa esercitazione selezionare Inizia con un file vuoto.
  5. In Percorso cartella specificare un percorso per i file di origine o accettare l'impostazione predefinita (cartella utente).
  6. Scegliere Python o SQL come linguaggio per il primo file di origine (una pipeline può combinare e trovare le lingue corrispondenti, ma ogni file deve trovarsi in un unico linguaggio).
  7. Fare clic su Seleziona.

Viene visualizzato l'editor della pipeline per la nuova pipeline. Viene creato un file di origine vuoto per la lingua, pronto per la prima trasformazione.

Passaggio 2: Sviluppare la logica della pipeline

In questo passaggio si userà l'editor di Pipelines Lakeflow per sviluppare e convalidare il codice sorgente per la pipeline in modo interattivo.

Il codice usa il caricatore automatico per l'inserimento incrementale dei dati. Il caricatore automatico rileva ed elabora automaticamente nuovi file quando arrivano nell'archiviazione di oggetti cloud. Per altre informazioni, vedere Che cos'è il caricatore automatico?

Un file di codice sorgente vuoto viene creato e configurato automaticamente per la pipeline. Il file viene creato nella cartella trasformazioni della pipeline. Per impostazione predefinita, tutti i file *.py e *.sql nella cartella delle trasformazioni sono inclusi nella sorgente della tua pipeline.

  1. Copiare e incollare il codice seguente nel file di origine. Assicurarsi di usare la lingua selezionata per il file nel passaggio 1.

    Pitone

    # Import modules
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dp.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dp.materialized_view(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dp.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dp.expect("valid_title", "song_title IS NOT NULL")
    @dp.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dp.materialized_view(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
      '/databricks-datasets/songs/data-001/part*',
      format => "csv",
      header => "false",
      delimiter => "\t",
      schema => """
        artist_id STRING,
        artist_lat DOUBLE,
        artist_long DOUBLE,
        artist_location STRING,
        artist_name STRING,
        duration DOUBLE,
        end_of_fade_in DOUBLE,
        key INT,
        key_confidence DOUBLE,
        loudness DOUBLE,
        release STRING,
        song_hotnes DOUBLE,
        song_id STRING,
        start_of_fade_out DOUBLE,
        tempo DOUBLE,
        time_signature INT,
        time_signature_confidence DOUBLE,
        title STRING,
        year INT,
        partial_sequence STRING
      """,
      schemaEvolutionMode => "none");
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
      artist_name,
      year,
      COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC;
    

    Questa fonte include il codice per tre interrogazioni. È anche possibile inserire tali query in file separati per organizzare i file e scrivere il codice nel modo preferito.

  2. Fare clic sull'icona Riproduci.Eseguire il file o Eseguire la pipeline per avviare l'aggiornamento della pipeline connessa. Con un solo file di origine nella pipeline, questi sono equivalenti a livello funzionale.

Al termine dell'aggiornamento, l'editor viene aggiornato con informazioni sulla pipeline.

  • Il grafico della pipeline (DAG), nella barra laterale a destra del codice, mostra tre tabelle, songs_raw, songs_preparede top_artists_by_year.
  • Nella parte superiore del browser degli asset della pipeline viene visualizzato un riepilogo dell'aggiornamento.
  • I dettagli delle tabelle generate vengono visualizzati nel riquadro inferiore ed è possibile esplorare i dati dalle tabelle selezionando uno.

Sono inclusi i dati non elaborati e puliti, nonché alcune semplici analisi per trovare i migliori artisti per anno. Nel passaggio successivo si creano query ad hoc per un'ulteriore analisi in un file separato nella pipeline.

Passaggio 3: Esplorare i set di dati creati dalla pipeline

In questo passaggio si eseguono query ad hoc sui dati elaborati nella pipeline ETL per analizzare i dati del brano nell'editor SQL di Databricks. Queste query usano i record preparati creati nel passaggio precedente.

Prima di tutto, eseguire una query che trova gli artisti che hanno pubblicato la maggior parte delle canzoni ogni anno dal 1990.

  1. Nella barra laterale del browser asset della pipeline fare clic sull'icona Più.Aggiungere quindi Esplorazione.

  2. Immettere un nome e selezionare SQL per il file di esplorazione. Un notebook SQL viene creato in una nuova explorations cartella. I file nella explorations cartella non vengono eseguiti come parte di un aggiornamento della pipeline per impostazione predefinita. Il notebook SQL include celle che è possibile eseguire insieme o separatamente.

  3. Per creare una tabella di artisti che rilasciano la maggior parte dei brani di ogni anno dopo il 1990, immettere il codice seguente nel nuovo file SQL (se nel file è presente codice di esempio, sostituirlo). Poiché questo notebook non fa parte della pipeline, non usa il catalogo e lo schema predefiniti. Sostituire <catalog>.<schema> con il catalogo e lo schema usati come valori predefiniti per la pipeline.

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.top_artists_by_year
      WHERE year >= 1990
      ORDER BY total_number_of_songs DESC, year DESC;
    
  4. Fare clic sull'icona Riproduci oppure premere Shift + Enter per eseguire questa query.

Ora, esegui un'altra query che trova canzoni con un ritmo 4/4 e un tempo adatto al ballo.

  1. Aggiungere il codice seguente alla cella successiva nello stesso file. Anche in questo caso, sostituire <catalog>.<schema> con il catalogo e lo schema usati come valori predefiniti per la pipeline:

    -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.songs_prepared
      WHERE time_signature = 4 AND tempo between 100 and 140;
    
  2. Fare clic sull'icona Riproduci oppure premere Shift + Enter per eseguire questa query.

Passaggio 4: Creare un'attività per eseguire la pipeline

Creare quindi un flusso di lavoro per automatizzare i passaggi di inserimento, elaborazione e analisi dei dati usando un processo di Databricks eseguito in base a una pianificazione.

  1. Nella parte superiore dell'editor scegliere il pulsante Pianifica .
  2. Se viene visualizzata la finestra di dialogo Pianificazioni , scegliere Aggiungi pianificazione.
  3. Verrà visualizzata la finestra di dialogo Nuova pianificazione , in cui è possibile creare un processo per eseguire la pipeline in base a una pianificazione.
  4. Facoltativamente, dare un nome al job.
  5. Per impostazione predefinita, la pianificazione è impostata per l'esecuzione una volta al giorno. Puoi accettare questo predefinito o impostare un programma personalizzato. Scegliendo Avanzate è possibile impostare un orario specifico in cui verrà eseguito il processo. Se si seleziona Altre opzioni , è possibile creare notifiche durante l'esecuzione del processo.
  6. Seleziona Crea per applicare le modifiche e creare l'attività.

Ora l'attività verrà eseguita ogni giorno per mantenere aggiornata la pipeline. È possibile scegliere di nuovo Pianifica per visualizzare l'elenco delle pianificazioni. È possibile gestire le pianificazioni per la pipeline da tale finestra di dialogo, inclusa l'aggiunta, la modifica o la rimozione di pianificazioni.

Facendo clic sul nome del programma (o del processo), si accede alla pagina del processo nell'elenco Processi e pipeline. Da qui è possibile visualizzare i dettagli sulle esecuzioni dei processi, inclusa la cronologia delle esecuzioni o eseguire immediatamente il processo con il pulsante Esegui adesso .

Vedere Monitoraggio e osservabilità per i processi Lakeflow per ulteriori informazioni sulle esecuzioni dei processi.

Altre informazioni