Dela via


Självstudie: Köra en lakehouse-analyspipeline från slutpunkt till slutpunkt

Den här självstudien visar hur du konfigurerar en analyspipeline från slutpunkt till slutpunkt för ett Azure Databricks lakehouse.

Viktigt!

I den här självstudien används interaktiva notebook-filer för att slutföra vanliga ETL-uppgifter i Python i Unity Catalog-aktiverade kluster. Om du inte använder Unity Catalog läser du Kör din första ETL-arbetsbelastning på Azure Databricks.

Uppgifter i den här självstudien

I slutet av den här artikeln kommer du att känna dig bekväm:

  1. Starta ett Unity Catalog-aktiverat beräkningskluster.
  2. Skapa en Databricks-anteckningsbok.
  3. Skriva och läsa data från en extern plats i Unity Catalog.
  4. Konfigurera inkrementell datainmatning till en Unity Catalog-tabell med automatisk inläsning.
  5. Köra notebook-celler för att bearbeta, fråga och förhandsgranska data.
  6. Schemalägga en notebook-fil som ett Databricks-jobb.
  7. Köra frågor mot Unity Catalog-tabeller från Databricks SQL

Azure Databricks tillhandahåller en uppsättning produktionsklara verktyg som gör det möjligt för dataproffs att snabbt utveckla och distribuera ETL-pipelines (extrahering, transformering och inläsning). Med Unity Catalog kan dataförvaltare konfigurera och skydda autentiseringsuppgifter för lagring, externa platser och databasobjekt för användare i hela organisationen. Med Databricks SQL kan analytiker köra SQL-frågor mot samma tabeller som används i ETL-arbetsbelastningar i produktion, vilket möjliggör business intelligence i realtid i stor skala.

Du kan också använda Delta Live Tables för att skapa ETL-pipelines. Databricks skapade Delta Live Tables för att minska komplexiteten med att skapa, distribuera och underhålla ETL-pipelines för produktion. Se Självstudie: Kör din första Delta Live Tables-pipeline.

Behov

Kommentar

Om du inte har behörighet för klusterkontroll kan du fortfarande slutföra de flesta av stegen nedan så länge du har åtkomst till ett kluster.

Steg 1: Skapa ett kluster

Om du vill utföra undersökande dataanalys och datateknik skapar du ett kluster för att tillhandahålla de beräkningsresurser som krävs för att köra kommandon.

  1. Klicka på beräkningsikon Beräkna i sidofältet.
  2. Klicka på Ny ikon Nytt i sidofältet och välj sedan Kluster. Då öppnas sidan Nytt kluster/beräkning.
  3. Ange ett unikt namn för klustret.
  4. Välj alternativknappen Enskild nod.
  5. Välj Enskild användare i listrutan Åtkomstläge .
  6. Kontrollera att din e-postadress visas i fältet Enskild användare .
  7. Välj önskad Databricks-körningsversion, 11.1 eller senare för att använda Unity Catalog.
  8. Klicka på Skapa beräkning för att skapa klustret.

Mer information om Databricks-kluster finns i Beräkning.

Steg 2: Skapa en Databricks-notebook-fil

Om du vill skapa en notebook-fil på arbetsytan klickar du på Ny ikon Nytt i sidofältet och klickar sedan på Notebook. En tom anteckningsbok öppnas på arbetsytan.

Mer information om hur du skapar och hanterar notebook-filer finns i Hantera notebook-filer.

Steg 3: Skriva och läsa data från en extern plats som hanteras av Unity Catalog

Databricks rekommenderar att du använder automatisk inläsning för inkrementell datainmatning. Automatisk inläsning identifierar och bearbetar automatiskt nya filer när de tas emot i molnobjektlagring.

Använd Unity Catalog för att hantera säker åtkomst till externa platser. Användare eller tjänstens huvudnamn med READ FILES behörigheter på en extern plats kan använda Automatisk inläsning för att mata in data.

Normalt anländer data till en extern plats på grund av skrivningar från andra system. I den här demonstrationen kan du simulera data ankomst genom att skriva ut JSON-filer till en extern plats.

Kopiera koden nedan till en notebook-cell. Ersätt strängvärdet för catalog med namnet på en katalog med CREATE CATALOG och USE CATALOG behörigheter. Ersätt strängvärdet för external_location med sökvägen för en extern plats med READ FILES, WRITE FILESoch CREATE EXTERNAL TABLE behörigheter.

Externa platser kan definieras som en hel lagringscontainer, men pekar ofta på en katalog som är kapslad i en container.

Rätt format för en extern platssökväg är "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

Om du kör den här cellen bör du skriva ut en rad som läser 12 byte, skriva ut strängen "Hello world!", och visa alla databaser som finns i den angivna katalogen. Om du inte kan få den här cellen att köras kontrollerar du att du befinner dig i en Unity Catalog-aktiverad arbetsyta och begär rätt behörigheter från arbetsyteadministratören för att slutföra den här självstudien.

Python-koden nedan använder din e-postadress för att skapa en unik databas i den angivna katalogen och en unik lagringsplats på den externa platsen. Om du kör den här cellen tas alla data som är associerade med den här självstudien bort, så att du kan köra det här exemplet idempotently. En klass definieras och instansieras som du ska använda för att simulera batchar med data som anländer från ett anslutet system till källans externa plats.

Kopiera den här koden till en ny cell i anteckningsboken och kör den för att konfigurera din miljö.

Kommentar

Variablerna som definieras i den här koden bör göra det möjligt att köra den på ett säkert sätt utan risk för konflikt med befintliga arbetsytetillgångar eller andra användare. Begränsade nätverks- eller lagringsbehörigheter ger upphov till fel när den här koden körs. kontakta arbetsyteadministratören för att felsöka dessa begränsningar.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

Du kan nu landa en batch med data genom att kopiera följande kod till en cell och köra den. Du kan köra den här cellen manuellt upp till 60 gånger för att utlösa ny data ankomst.

RawData.land_batch()

Steg 4: Konfigurera automatisk inläsning för att mata in data till Unity Catalog

Databricks rekommenderar att du lagrar data med Delta Lake. Delta Lake är ett öppen källkod lagringslager som tillhandahåller ACID-transaktioner och möjliggör datasjöhuset. Delta Lake är standardformatet för tabeller som skapats i Databricks.

Om du vill konfigurera automatisk inläsning för att mata in data till en Unity Catalog-tabell kopierar du och klistrar in följande kod i en tom cell i anteckningsboken:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(source)
  .select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Mer information om automatisk inläsning finns i Vad är automatisk inläsning?.

Mer information om strukturerad direktuppspelning med Unity Catalog finns i Använda Unity Catalog med strukturerad direktuppspelning.

Steg 5: Bearbeta och interagera med data

Notebook-filer kör logikcell för cell. Använd de här stegen för att köra logiken i cellen:

  1. Om du vill köra cellen som du slutförde i föregående steg markerar du cellen och trycker på SKIFT+RETUR.

  2. Om du vill köra frågor mot tabellen som du just har skapat kopierar du och klistrar in följande kod i en tom cell och trycker sedan på SKIFT+RETUR för att köra cellen.

    df = spark.read.table(table)
    
  3. Om du vill förhandsgranska data i dataramen kopierar du och klistrar in följande kod i en tom cell och trycker sedan på SKIFT+RETUR för att köra cellen.

    display(df)
    

Mer information om interaktiva alternativ för att visualisera data finns i Visualiseringar i Databricks Notebooks.

Steg 6: Schemalägga ett jobb

Du kan köra Databricks-notebook-filer som produktionsskript genom att lägga till dem som en uppgift i ett Databricks-jobb. I det här steget skapar du ett nytt jobb som du kan utlösa manuellt.

Så här schemalägger du anteckningsboken som en uppgift:

  1. Klicka på Schemalägg till höger i rubrikfältet.
  2. Ange ett unikt namn för jobbnamnet.
  3. Klicka på Manuell.
  4. I listrutan Kluster väljer du det kluster som du skapade i steg 1.
  5. Klicka på Skapa.
  6. I fönstret som visas klickar du på Kör nu.
  7. Om du vill se resultatet av jobbkörningen Extern länk klickar du på ikonen bredvid tidsstämpeln Senaste körning .

Mer information om jobb finns i Vad är Databricks-jobb?.

Steg 7: Frågetabell från Databricks SQL

Alla som har behörigheten USE CATALOG i den aktuella katalogen, behörigheten USE SCHEMA för det aktuella schemat och SELECT behörigheter i tabellen kan köra frågor mot innehållet i tabellen från deras önskade Databricks-API.

Du behöver åtkomst till ett SQL-lager som körs för att köra frågor i Databricks SQL.

Tabellen som du skapade tidigare i den här självstudien har namnet target_table. Du kan köra frågor mot den med hjälp av katalogen som du angav i den första cellen och databasen med faderskapet e2e_lakehouse_<your-username>. Du kan använda Katalogutforskaren för att hitta de dataobjekt som du skapade.

Ytterligare integreringar

Läs mer om integreringar och verktyg för datateknik med Azure Databricks: