Dela via


Självstudie: Skapa din första pipeline med Lakeflow Pipelines-redigeraren

Lär dig hur du skapar en ny pipeline genom att använda Lakeflow Spark Deklarativa Pipelines (SDP) för dataorkestrering och Auto Loader. Den här handledningen utökar exempelpipelinen genom att rensa data och skapa en fråga för att hitta de 100 bästa användarna.

I den här självstudien lär du dig hur du använder Lakeflow Pipelines-redigeraren för att:

  • Skapa en ny pipeline med standardmappstrukturen och börja med en uppsättning exempelfiler.
  • Definiera datakvalitetsbegränsningar med hjälp av förväntningar.
  • Använd redigeringsfunktionerna för att utöka pipelinen med en ny transformering för att utföra analys av dina data.

Kravspecifikation

Innan du påbörjar den här självstudien måste du:

  • Loggas in på en Azure Databricks-arbetsyta.
  • Låt Unity Catalog vara aktiverat för din arbetsyta.
  • Låt Lakeflow-pipelinesredigeraren vara aktiverad för din arbetsyta och du måste vara anmäld. Se Aktivera Lakeflow Pipelines-redigeraren och uppdaterad övervakning.
  • Ha behörighet att skapa en beräkningsresurs eller åtkomst till en beräkningsresurs.
  • Ha behörighet att skapa ett nytt schema i en katalog. De behörigheter som krävs är ALL PRIVILEGES eller USE CATALOG och CREATE SCHEMA.

Steg 1: Skapa en pipeline

I det här steget skapar du en pipeline med hjälp av standardmappstrukturen och kodexempel. Kodexemplen refererar till users tabellen i wanderbricks exempeldatakällan.

  1. På din Azure Databricks-arbetsyta klickar du på plusikonen.Ny och sedan pipelineikon.ETL-pipeline. Då öppnas pipelineredigeraren på sidan Skapa en pipeline.

  2. Klicka på rubriken för att ge pipelinen ett namn.

  3. Precis under namnet väljer du standardkatalogen och schemat för dina utdatatabeller. Dessa används när du inte anger en katalog och ett schema i pipelinedefinitionerna.

  4. Under Nästa steg för pipelinen klickar du på ikonen Schema.Börja med exempelkod i SQL - eller schemaikonen.Börja med exempelkod i Python, baserat på dina språkinställningar. Detta ändrar standardspråket för exempelkoden, men du kan lägga till kod på det andra språket senare. Detta skapar en standardmappstruktur med exempelkod för att komma igång.

  5. Du kan visa exempelkoden i pipeline-resursöversikten till vänster i arbetsytan. Under transformations finns två filer som genererar en pipelinedatauppsättning vardera. Under explorations finns en notebook-fil som har kod som hjälper dig att visa utdata från din pipeline. Genom att klicka på en fil kan du visa och redigera koden i redigeraren.

    Utdatauppsättningarna har ännu inte skapats och pipelinediagrammet till höger på skärmen är tomt.

  6. Om du vill köra pipelinekoden (koden i transformations mappen) klickar du på Kör pipeline i den övre högra delen av skärmen.

    När körningen är klar visar den nedre delen av arbetsytan de två nya tabellerna som skapades: sample_users_<pipeline-name> och sample_aggregation_<pipeline-name>. Du kan också se att pipelinediagrammet till höger på arbetsytan nu visar de två tabellerna, inklusive att sample_users är källan för sample_aggregation.

Steg 2: Tillämpa datakvalitetskontroller

I det här steget lägger du till en datakvalitetskontroll i sample_users tabellen. Du använder pipeline-förväntningar för att begränsa data. I det här fallet tar du bort alla användarposter som inte har en giltig e-postadress och matar ut den rensade tabellen som users_cleaned.

  1. I pipelinens tillgångswebbläsare klickar du på Plus-ikonen., och väljer Transformering.

  2. I dialogrutan Skapa ny transformeringsfil gör du följande val:

    • Välj antingen Python eller SQL som språk. Detta behöver inte matcha ditt tidigare val.
    • Ge filen ett namn. I det här fallet väljer du users_cleaned.
    • För Målsökväg lämnar du standardvärdet.
    • För Datauppsättningstyp lämnar du den antingen som Ingen markerad eller väljer Materialiserad vy. Om du väljer Materialiserad vy genererar den exempelkod åt dig.
  3. I den nya kodfilen redigerar du koden så att den matchar följande (använd SQL eller Python, baserat på ditt val på föregående skärm). Ersätt <pipeline-name> med det fullständiga namnet på tabellen sample_users .

    SQL

    -- Drop all rows that do not have an email address
    
    CREATE MATERIALIZED VIEW users_cleaned
    (
      CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
    ) AS
    SELECT *
    FROM sample_users_<pipeline-name>;
    

    python

    from pyspark import pipelines as dp
    
    # Drop all rows that do not have an email address
    
    @dp.table
    @dp.expect_or_drop("no null emails", "email IS NOT NULL")
    def users_cleaned():
        return (
            spark.read.table("sample_users_<pipeline_name>")
        )
    
  4. Klicka på Kör pipeline för att uppdatera pipelinen. Den bör nu ha tre tabeller.

Steg 3: Analysera de främsta användarna

Få sedan de 100 bästa användarna efter antalet bokningar som har skapats. Koppla tabellen wanderbricks.bookings till den users_cleaned materialiserade vyn.

  1. I pipelinens tillgångswebbläsare klickar du på Plus-ikonen., och väljer Transformering.

  2. I dialogrutan Skapa ny transformeringsfil gör du följande val:

    • Välj antingen Python eller SQL som språk. Detta behöver inte matcha dina tidigare val.
    • Ge filen ett namn. I det här fallet väljer du users_and_bookings.
    • För Målsökväg lämnar du standardvärdet.
    • För Datauppsättningstyp låter du det vara som Inget valt.
  3. I den nya kodfilen redigerar du koden så att den matchar följande (använd SQL eller Python, baserat på ditt val på föregående skärm).

    SQL

    -- Get the top 100 users by number of bookings
    
    CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
    SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
    FROM users_cleaned u
    JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
    GROUP BY u.name
    ORDER BY booking_count DESC
    LIMIT 100;
    

    python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import col, count, desc
    
    # Get the top 100 users by number of bookings
    
    @dp.table
    def users_and_bookings():
        return (
            spark.read.table("users_cleaned")
            .join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
            .groupBy(col("name"))
            .agg(count("booking_id").alias("booking_count"))
            .orderBy(desc("booking_count"))
            .limit(100)
        )
    
  4. Klicka på Kör pipeline för att uppdatera datamängderna. När körningen är klar kan du se i Pipeline Graph att det finns fyra tabeller, inklusive den nya users_and_bookings tabellen.

    Pipelinediagram som visar fyra tabeller i pipeline

Nästa steg

Nu när du har lärt dig hur du använder några av funktionerna i Lakeflow Pipelines-redigeraren och skapat en pipeline, finns här några andra funktioner för att lära dig mer om: