Condividi tramite


Esercitazione: Creare la prima pipeline usando l'editor di pipeline Lakeflow

Informazioni su come creare una nuova pipeline usando le pipeline dichiarative di Lakeflow Spark (SDP) per l'orchestrazione dei dati e il caricatore automatico. Questa esercitazione estende la pipeline di esempio pulendo i dati e creando una query per trovare i primi 100 utenti.

In questa esercitazione si apprenderà come usare l'editor di Pipelines Lakeflow per:

  • Creare una nuova pipeline con la struttura di cartelle predefinita e iniziare con un set di file di esempio.
  • Definire vincoli di qualità dei dati usando le aspettative.
  • Usare le funzionalità dell'editor per estendere la pipeline con una nuova trasformazione per eseguire analisi sui dati.

Requisiti

Prima di iniziare questa esercitazione, è necessario:

  • Accedere a un'area di lavoro di Azure Databricks.
  • Abilitare Unity Catalog per l'area di lavoro.
  • Assicurarsi che l'editor di pipeline Lakeflow sia abilitato per l'area di lavoro e bisogna aver aderito esplicitamente. Vedere Abilitare l'editor delle pipeline di Lakeflow e il monitoraggio aggiornato.
  • Disporre dell'autorizzazione per creare una risorsa di calcolo o accedere a una risorsa di calcolo.
  • Disporre delle autorizzazioni per creare un nuovo schema in un catalogo. Le autorizzazioni necessarie sono ALL PRIVILEGES o USE CATALOG e CREATE SCHEMA.

Passaggio 1: creare una pipeline

In questo passaggio si crea una pipeline usando la struttura di cartelle predefinita e gli esempi di codice. Gli esempi di codice fanno riferimento alla tabella users nei dati di esempio wanderbricks.

  1. Nell'area di lavoro di Azure Databricks fare clic sull'icona Più.Nuovo, quindi sull'icona della pipeline.pipeline ETL. Si apre l'editor della pipeline nella sezione "Crea una pipeline".

  2. Fare clic sull'intestazione per assegnare un nome alla pipeline.

  3. Appena sotto il nome, scegliere il catalogo predefinito e lo schema per le tabelle di output. Questi vengono usati quando non si specifica un catalogo e uno schema nelle definizioni della pipeline.

  4. In Passaggio successivo per la pipeline fare clic sull'icona Schema.Iniziare con il codice di esempio nell'icona SQL o Schema.Iniziare con il codice di esempio in Python, in base alle preferenze del linguaggio. In questo modo viene modificata la lingua predefinita per il codice di esempio, ma è possibile aggiungere codice nell'altra lingua in un secondo momento. Verrà creata una struttura di cartelle predefinita con codice di esempio per iniziare.

  5. È possibile visualizzare il codice di esempio nel browser asset della pipeline sul lato sinistro dell'area di lavoro. In transformations sono presenti due file che generano un set di dati della pipeline ciascuno. Sotto explorations c'è un notebook con codice che consente di visualizzare l'output della tua pipeline. Facendo clic su un file è possibile visualizzare e modificare il codice nell'editor.

    I set di dati di output non sono ancora stati creati e il grafico Pipeline a destra della schermata è vuoto.

  6. Per eseguire il codice della pipeline (il codice nella transformations cartella), fare clic su Esegui pipeline nella parte superiore destra della schermata.

    Al termine dell'esecuzione, nella parte inferiore dell'area di lavoro vengono visualizzate le due nuove tabelle che sono state create, sample_users_<pipeline-name> e sample_aggregation_<pipeline-name>. È anche possibile osservare che il grafico della Pipeline a destra dell'area di lavoro ora mostra le due tabelle, incluso che sample_users è l'origine di sample_aggregation.

Passaggio 2: Applicare i controlli di qualità dei dati

In questo passaggio si aggiunge un controllo qualità dei dati alla sample_users tabella. Si usano le condizioni della pipeline per vincolare i dati. In questo caso, si eliminano tutti i record utente che non dispongono di un indirizzo di posta elettronica valido e si restituisce la tabella pulita come users_cleaned.

  1. Nel navigatore risorse della pipeline, fare clic sull'icona Più e selezionare Trasformazione.

  2. Nella finestra di dialogo Crea nuovo file di trasformazione effettuare le selezioni seguenti:

    • Scegliere Python o SQL per il linguaggio. Non è necessario che corrisponda alla selezione precedente.
    • Assegnare un nome al file. In questo caso scegliere users_cleaned.
    • Per Percorso di destinazione lasciare il valore predefinito.
    • Per Tipo di set di datilasciare selezionato Nessuno oppure scegliere Visualizzazione materializzata. Se si seleziona Visualizzazione materializzata, viene generato automaticamente il codice di esempio.
  3. Nel nuovo file di codice modificare il codice in modo che corrisponda al seguente (usare SQL o Python, in base alla selezione nella schermata precedente). Sostituire <pipeline-name> con il nome completo per la sample_users tabella.

    SQL

    -- Drop all rows that do not have an email address
    
    CREATE MATERIALIZED VIEW users_cleaned
    (
      CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
    ) AS
    SELECT *
    FROM sample_users_<pipeline-name>;
    

    Pitone

    from pyspark import pipelines as dp
    
    # Drop all rows that do not have an email address
    
    @dp.table
    @dp.expect_or_drop("no null emails", "email IS NOT NULL")
    def users_cleaned():
        return (
            spark.read.table("sample_users_<pipeline_name>")
        )
    
  4. Fare clic su Esegui pipeline per aggiornare la pipeline. Ora dovrebbe avere tre tabelle.

Passaggio 3: Analizzare gli utenti principali

Ottenere quindi i primi 100 utenti in base al numero di prenotazioni create. Unire la wanderbricks.bookings tabella alla users_cleaned vista materializzata.

  1. Nel navigatore risorse della pipeline, fare clic sull'icona Più e selezionare Trasformazione.

  2. Nella finestra di dialogo Crea nuovo file di trasformazione effettuare le selezioni seguenti:

    • Scegliere Python o SQL per il linguaggio. Non è necessario che corrispondano alle selezioni precedenti.
    • Assegnare un nome al file. In questo caso scegliere users_and_bookings.
    • Per Percorso di destinazione lasciare il valore predefinito.
    • Per Tipo di set di datilasciare selezionato Nessuno.
  3. Nel nuovo file di codice modificare il codice in modo che corrisponda al seguente (usare SQL o Python, in base alla selezione nella schermata precedente).

    SQL

    -- Get the top 100 users by number of bookings
    
    CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
    SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
    FROM users_cleaned u
    JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
    GROUP BY u.name
    ORDER BY booking_count DESC
    LIMIT 100;
    

    Pitone

    from pyspark import pipelines as dp
    from pyspark.sql.functions import col, count, desc
    
    # Get the top 100 users by number of bookings
    
    @dp.table
    def users_and_bookings():
        return (
            spark.read.table("users_cleaned")
            .join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
            .groupBy(col("name"))
            .agg(count("booking_id").alias("booking_count"))
            .orderBy(desc("booking_count"))
            .limit(100)
        )
    
  4. Fare clic su Esegui pipeline per aggiornare i set di dati. Al termine dell'esecuzione, è possibile vedere in Pipeline Graph che sono presenti quattro tabelle, inclusa la nuova users_and_bookings tabella.

    Grafico della pipeline che mostra quattro tabelle nella pipeline

Passaggi successivi

Ora che si è appreso come usare alcune delle funzionalità dell'editor di pipeline Lakeflow e come creare una pipeline, ecco alcune altre funzionalità per altre informazioni: