Condividi tramite


Esercitazione: Creare una pipeline ETL con le pipeline dichiarative di Lakeflow

Informazioni su come creare e distribuire una pipeline ETL (estrazione, trasformazione e caricamento) per l'orchestrazione dei dati usando le pipeline dichiarative di Lakeflow e il caricatore automatico. Una pipeline ETL implementa i passaggi per leggere i dati dai sistemi di origine, trasformare tali dati in base ai requisiti, ad esempio i controlli della qualità dei dati e registrare la deduplicazione dei dati e scrivere i dati in un sistema di destinazione, ad esempio un data warehouse o un data lake.

In questa esercitazione si useranno le pipeline dichiarative di Lakeflow e il caricatore automatico per:

  • Inserire dati di origine non elaborati in una tabella di destinazione.
  • Trasformare i dati di origine non elaborati e scrivere i dati trasformati in due viste materializzate di destinazione.
  • Interrogare i dati trasformati.
  • Automatizzare la pipeline ETL con un lavoro di Databricks.

Per altre informazioni sulle pipeline dichiarative di Lakeflow e sul caricatore automatico, vedere Pipeline dichiarative di Lakeflow e Che cos'è il caricatore automatico?

Requisiti

Per completare questa esercitazione, è necessario soddisfare i requisiti seguenti:

Informazioni sul set di dati

Il set di dati usato in questo esempio è un subset di Million Song Dataset, una raccolta di caratteristiche e metadati per le tracce musicali contemporanee. Questo set di dati è disponibile nei set di dati di esempio inclusi nell'area di lavoro di Azure Databricks.

Passaggio 1: Creare una pipeline

Per prima cosa, si creerà una pipeline ETL in Lakeflow Declarative Pipelines. Lakeflow Declarative Pipelines crea pipeline risolvendo le dipendenze definite nei notebook o nei file (denominati codice sorgente) usando la sintassi delle pipeline dichiarative di Lakeflow. Ogni file di codice sorgente può contenere una sola lingua, ma è possibile aggiungere più notebook o file specifici della lingua nella pipeline. Per altre informazioni, vedere Pipeline dichiarative di Lakeflow

Importante

Lasciare vuoto il campo Codice sorgente per creare e configurare un notebook per la creazione automatica del codice sorgente.

Questo tutorial usa il calcolo serverless e Unity Catalog. Per tutte le opzioni di configurazione non specificate, usare le impostazioni predefinite. Se l'ambiente di calcolo serverless non è abilitato o supportato nell'area di lavoro, è possibile completare l'esercitazione come scritto usando le impostazioni di calcolo predefinite. Se si usano le impostazioni di calcolo predefinite, è necessario selezionare manualmente Il catalogo Unity in Opzioni di archiviazione nella sezione Destinazione dell'interfaccia utente crea pipeline .

Per creare una nuova pipeline ETL in Pipeline Dichiarative di Lakeflow, seguire questi passaggi:

  1. Nell'area di lavoro fare clic sull'icona Flussi di lavoro.Processi e pipeline nella barra laterale.
  2. In Nuovo fare clic su Pipeline ETL.
  3. In il nome pipeline, digita un nome di pipeline univoco.
  4. Selezionare la casella di controllo Serverless.
  5. In Destinazione, per configurare un percorso del catalogo Unity in cui vengono pubblicate le tabelle, selezionare un catalogo esistente e scrivere un nuovo nome in Schema per creare un nuovo schema nel catalogo.
  6. Fare clic su Crea.

Viene visualizzata l'interfaccia utente della pipeline per la nuova pipeline.

Passaggio 2: Sviluppare una pipeline

Importante

I notebook possono contenere solo un singolo linguaggio di programmazione. Non mischiare il codice Python e SQL nei notebook del codice sorgente della pipeline.

In questo passaggio si useranno i notebook di Databricks per sviluppare e convalidare il codice sorgente per le pipeline dichiarative di Lakeflow in modo interattivo.

Il codice usa il caricatore automatico per l'inserimento incrementale dei dati. Il caricatore automatico rileva ed elabora automaticamente nuovi file quando arrivano nell'archiviazione di oggetti cloud. Per altre informazioni, vedere Che cos'è il caricatore automatico?

Un notebook di codice sorgente vuoto viene creato e configurato automaticamente per la pipeline. Il notebook viene creato in una nuova directory nella tua directory utente. Il nome della nuova directory e quello del file corrispondono al nome della pipeline. Ad esempio: /Users/someone@example.com/my_pipeline/my_pipeline.

Quando si sviluppa una pipeline, è possibile scegliere Python o SQL. Sono inclusi esempi per entrambe le lingue. In base alla lingua scelta, verificare di selezionare la lingua predefinita del notebook. Per altre informazioni sul supporto dei notebook per lo sviluppo di codice di Pipeline dichiarative di Lakeflow, vedere Sviluppare ed eseguire il debug di pipeline ETL con un notebook in Pipeline dichiarative di Lakeflow.

  1. Un collegamento per accedere a questo notebook si trova nel campo codice sorgente nel pannello dettagli del pipeline. Fare clic sul collegamento per aprire il notebook prima di procedere al passaggio successivo.

  2. Fare clic su Connetti in alto a destra per aprire il menu di configurazione delle risorse di calcolo.

  3. Passare il puntatore del mouse sul nome della pipeline creata nel passaggio 1.

  4. Fare clic su Connetti.

  5. Accanto al titolo del notebook nella parte superiore selezionare il linguaggio predefinito del notebook (Python o SQL).

  6. Copiare e incollare il codice seguente in una cella del notebook.

    Pitone

    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dlt.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .option("inferSchema", True)
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dlt.table(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
     artist_id STRING,
     artist_lat DOUBLE,
     artist_long DOUBLE,
     artist_location STRING,
     artist_name STRING,
     duration DOUBLE,
     end_of_fade_in DOUBLE,
     key INT,
     key_confidence DOUBLE,
     loudness DOUBLE,
     release STRING,
     song_hotnes DOUBLE,
     song_id STRING,
     start_of_fade_out DOUBLE,
     tempo DOUBLE,
     time_signature INT,
     time_signature_confidence DOUBLE,
     title STRING,
     year INT,
     partial_sequence STRING,
     value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
     artist_name,
     year,
     COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC
    
  7. Fare clic su Start per avviare un aggiornamento per la pipeline connessa.

Passaggio 3: Eseguire una query sui dati trasformati

In questo passaggio si interrogheranno i dati elaborati nella pipeline di ETL per analizzare i dati relativi al brano. Queste query usano i record preparati creati nel passaggio precedente.

Prima di tutto, eseguire una query che trova gli artisti che hanno pubblicato la maggior parte delle canzoni ogni anno dal 1990.

  1. Nella barra laterale fare clic su Icona Editor SQLSQL Editor.

  2. Fare clic sull'icona Aggiungi o più della nuova scheda e selezionare Crea nuova query dal menu.

  3. Immetti gli elementi seguenti:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC
    

    Sostituire <catalog> e <schema> con il nome del catalogo e dello schema in cui si trova la tabella. Ad esempio: data_pipelines.songs_data.top_artists_by_year.

  4. Fare clic su Esegui selezionato.

Ora, esegui un'altra query che trova canzoni con un ritmo 4/4 e un tempo adatto al ballo.

  1. Fare clic sull'icona Aggiungi o più nuovo tocco e selezionare Crea nuova query dal menu.

  2. Immettere il codice seguente:

     -- Find songs with a 4/4 beat and danceable tempo
     SELECT artist_name, song_title, tempo
     FROM <catalog>.<schema>.songs_prepared
     WHERE time_signature = 4 AND tempo between 100 and 140;
    

    Sostituire <catalog> e <schema> con il nome del catalogo e dello schema in cui si trova la tabella. Ad esempio: data_pipelines.songs_data.songs_prepared.

  3. Fare clic su Esegui selezionato.

Passaggio 4: Creare un'attività per eseguire la pipeline

Creare quindi un flusso di lavoro per automatizzare i passaggi di inserimento, elaborazione e analisi dei dati usando un processo di Databricks.

  1. Nell'area di lavoro fare clic sull'icona Flussi di lavoro.Processi e pipeline nella barra laterale.
  2. In Nuovo fare clic su Lavoro.
  3. Nella casella titolo attività, sostituisci Nuovo Lavoro <data e ora> con il tuo nome lavoro. Ad esempio: Songs workflow.
  4. In Nome attività immettere un nome per la prima attività, ETL_songs_dataad esempio .
  5. In Tipo selezionare Pipeline.
  6. In Pipeline selezionare la pipeline creata nel passaggio 1.
  7. Fare clic su Crea.
  8. Per eseguire il flusso di lavoro, fare clic su Esegui ora. Per visualizzare i dettagli dell'esecuzione, fare clic sulla scheda Esecuzioni. Fare clic sul compito per visualizzare i dettagli dell'esecuzione del compito.
  9. Per visualizzare i risultati al termine del flusso di lavoro, fare clic su Vai all'ultima esecuzione riuscita o all'ora di inizio per l'esecuzione del processo. Viene visualizzata la pagina Output e vengono visualizzati i risultati della query.

Vedere Monitoraggio e osservabilità per i processi Lakeflow per ulteriori informazioni sulle esecuzioni dei processi.

Passaggio 5: Pianificare l'esecuzione della pipeline

Per eseguire la pipeline ETL in base a una pianificazione, seguire questa procedura:

  1. Passare all'interfaccia utente Processi e pipeline nella stessa area di lavoro di Azure Databricks del processo.
  2. Opzionalmente, selezionare i filtri Attività e Di mia proprietà.
  3. Nella colonna Nome, fare clic sul nome del processo. Nel pannello laterale sono visualizzati i dettagli del lavoro.
  4. Fare clic su Aggiungi trigger nel pannello Pianificazioni e trigger e selezionare Pianificato in Tipo di trigger.
  5. Specificare il periodo, l'ora di inizio e il fuso orario.
  6. Fare clic su Salva.

Altre informazioni