Not
Åtkomst till denna sida kräver auktorisation. Du kan prova att logga in eller byta katalog.
Åtkomst till denna sida kräver auktorisation. Du kan prova att byta katalog.
Lär dig hur du skapar en ny pipeline genom att använda Lakeflow Spark Deklarativa Pipelines (SDP) för dataorkestrering och Auto Loader. Den här handledningen utökar exempelpipelinen genom att rensa data och skapa en fråga för att hitta de 100 bästa användarna.
I den här självstudien lär du dig hur du använder Lakeflow Pipelines-redigeraren för att:
- Skapa en ny pipeline med standardmappstrukturen och börja med en uppsättning exempelfiler.
- Definiera datakvalitetsbegränsningar med hjälp av förväntningar.
- Använd redigeringsfunktionerna för att utöka pipelinen med en ny transformering för att utföra analys av dina data.
Kravspecifikation
Innan du påbörjar den här självstudien måste du:
- Loggas in på en Azure Databricks-arbetsyta.
- Låt Unity Catalog vara aktiverat för din arbetsyta.
- Låt Lakeflow-pipelinesredigeraren vara aktiverad för din arbetsyta och du måste vara anmäld. Se Aktivera Lakeflow Pipelines-redigeraren och uppdaterad övervakning.
- Ha behörighet att skapa en beräkningsresurs eller åtkomst till en beräkningsresurs.
- Ha behörighet att skapa ett nytt schema i en katalog. De behörigheter som krävs är
ALL PRIVILEGESellerUSE CATALOGochCREATE SCHEMA.
Steg 1: Skapa en pipeline
I det här steget skapar du en pipeline med hjälp av standardmappstrukturen och kodexempel. Kodexemplen refererar till users tabellen i wanderbricks exempeldatakällan.
På din Azure Databricks-arbetsyta klickar du på
Ny och sedan
ETL-pipeline. Då öppnas pipelineredigeraren på sidan Skapa en pipeline.
Klicka på rubriken för att ge pipelinen ett namn.
Precis under namnet väljer du standardkatalogen och schemat för dina utdatatabeller. Dessa används när du inte anger en katalog och ett schema i pipelinedefinitionerna.
Under Nästa steg för pipelinen klickar du på
Börja med exempelkod i SQL - eller
Börja med exempelkod i Python, baserat på dina språkinställningar. Detta ändrar standardspråket för exempelkoden, men du kan lägga till kod på det andra språket senare. Detta skapar en standardmappstruktur med exempelkod för att komma igång.
Du kan visa exempelkoden i pipeline-resursöversikten till vänster i arbetsytan. Under
transformationsfinns två filer som genererar en pipelinedatauppsättning vardera. Underexplorationsfinns en notebook-fil som har kod som hjälper dig att visa utdata från din pipeline. Genom att klicka på en fil kan du visa och redigera koden i redigeraren.Utdatauppsättningarna har ännu inte skapats och pipelinediagrammet till höger på skärmen är tomt.
Om du vill köra pipelinekoden (koden i
transformationsmappen) klickar du på Kör pipeline i den övre högra delen av skärmen.När körningen är klar visar den nedre delen av arbetsytan de två nya tabellerna som skapades:
sample_users_<pipeline-name>ochsample_aggregation_<pipeline-name>. Du kan också se att pipelinediagrammet till höger på arbetsytan nu visar de två tabellerna, inklusive attsample_usersär källan försample_aggregation.
Steg 2: Tillämpa datakvalitetskontroller
I det här steget lägger du till en datakvalitetskontroll i sample_users tabellen. Du använder pipeline-förväntningar för att begränsa data. I det här fallet tar du bort alla användarposter som inte har en giltig e-postadress och matar ut den rensade tabellen som users_cleaned.
I pipelinens tillgångswebbläsare klickar du på
, och väljer Transformering.
I dialogrutan Skapa ny transformeringsfil gör du följande val:
- Välj antingen Python eller SQL som språk. Detta behöver inte matcha ditt tidigare val.
- Ge filen ett namn. I det här fallet väljer du
users_cleaned. - För Målsökväg lämnar du standardvärdet.
- För Datauppsättningstyp lämnar du den antingen som Ingen markerad eller väljer Materialiserad vy. Om du väljer Materialiserad vy genererar den exempelkod åt dig.
I den nya kodfilen redigerar du koden så att den matchar följande (använd SQL eller Python, baserat på ditt val på föregående skärm). Ersätt
<pipeline-name>med det fullständiga namnet på tabellensample_users.SQL
-- Drop all rows that do not have an email address CREATE MATERIALIZED VIEW users_cleaned ( CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW ) AS SELECT * FROM sample_users_<pipeline-name>;python
from pyspark import pipelines as dp # Drop all rows that do not have an email address @dp.table @dp.expect_or_drop("no null emails", "email IS NOT NULL") def users_cleaned(): return ( spark.read.table("sample_users_<pipeline_name>") )Klicka på Kör pipeline för att uppdatera pipelinen. Den bör nu ha tre tabeller.
Steg 3: Analysera de främsta användarna
Få sedan de 100 bästa användarna efter antalet bokningar som har skapats. Koppla tabellen wanderbricks.bookings till den users_cleaned materialiserade vyn.
I pipelinens tillgångswebbläsare klickar du på
, och väljer Transformering.
I dialogrutan Skapa ny transformeringsfil gör du följande val:
- Välj antingen Python eller SQL som språk. Detta behöver inte matcha dina tidigare val.
- Ge filen ett namn. I det här fallet väljer du
users_and_bookings. - För Målsökväg lämnar du standardvärdet.
- För Datauppsättningstyp låter du det vara som Inget valt.
I den nya kodfilen redigerar du koden så att den matchar följande (använd SQL eller Python, baserat på ditt val på föregående skärm).
SQL
-- Get the top 100 users by number of bookings CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS SELECT u.name AS name, COUNT(b.booking_id) AS booking_count FROM users_cleaned u JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id GROUP BY u.name ORDER BY booking_count DESC LIMIT 100;python
from pyspark import pipelines as dp from pyspark.sql.functions import col, count, desc # Get the top 100 users by number of bookings @dp.table def users_and_bookings(): return ( spark.read.table("users_cleaned") .join(spark.read.table("samples.wanderbricks.bookings"), "user_id") .groupBy(col("name")) .agg(count("booking_id").alias("booking_count")) .orderBy(desc("booking_count")) .limit(100) )Klicka på Kör pipeline för att uppdatera datamängderna. När körningen är klar kan du se i Pipeline Graph att det finns fyra tabeller, inklusive den nya
users_and_bookingstabellen.
Nästa steg
Nu när du har lärt dig hur du använder några av funktionerna i Lakeflow Pipelines-redigeraren och skapat en pipeline, finns här några andra funktioner för att lära dig mer om:
Verktyg för att arbeta med och felsöka transformeringar när du skapar pipelines:
- Selektiv exekvering
- Dataförhandsgranskningar
- Interaktiv DAG (diagram över datauppsättningarna i din pipeline)
Inbyggd Databricks Asset Bundles-integrering för effektivt samarbete, versionskontroll och CI/CD-integrering direkt från redigeraren: