Zelfstudie: Een end-to-end lakehouse-analysepijplijn uitvoeren

In deze zelfstudie leert u hoe u een end-to-end analysepijplijn instelt voor een Azure Databricks Lakehouse.

Belangrijk

In deze zelfstudie worden interactieve notebooks gebruikt om algemene ETL-taken in Python op Unity Catalog-clusters te voltooien. Als u geen Unity Catalog gebruikt, raadpleegt u Uw eerste ETL-workload uitvoeren in Azure Databricks.

Taken in deze zelfstudie

Aan het einde van dit artikel voelt u zich comfortabel:

  1. Een rekencluster met Unity Catalog starten.
  2. Een Databricks-notebook maken.
  3. Gegevens schrijven en lezen vanaf een externe locatie van een Unity Catalog.
  4. Incrementele gegevensopname configureren voor een Unity Catalog-tabel met Automatisch laden.
  5. Notebookcellen uitvoeren om gegevens te verwerken, op te vragen en te bekijken.
  6. Een notebook plannen als een Databricks-taak.
  7. Query's uitvoeren op Unity Catalog-tabellen vanuit Databricks SQL

Azure Databricks biedt een reeks hulpprogramma's die gereed zijn voor productie waarmee gegevensprofessionals snel ETL-pijplijnen (extract, transform and load) kunnen ontwikkelen en implementeren. Met Unity Catalog kunnen gegevensstewards opslagreferenties, externe locaties en databaseobjecten configureren en beveiligen voor gebruikers binnen een organisatie. Met Databricks SQL kunnen analisten SQL-query's uitvoeren op dezelfde tabellen die worden gebruikt in ETL-productieworkloads, zodat realtime business intelligence op schaal mogelijk is.

U kunt ook Delta Live Tables gebruiken om ETL-pijplijnen te bouwen. Databricks heeft Delta Live Tables gemaakt om de complexiteit van het bouwen, implementeren en onderhouden van ETL-pijplijnen voor productie te verminderen. Zie zelfstudie: Uw eerste Delta Live Tables-pijplijn uitvoeren.

Vereisten

Notitie

Als u geen bevoegdheden voor clusterbeheer hebt, kunt u de meeste van de onderstaande stappen nog steeds voltooien zolang u toegang hebt tot een cluster.

Stap 1: Een cluster maken

Als u verkennende gegevensanalyse en data engineering wilt uitvoeren, maakt u een cluster om de rekenresources te bieden die nodig zijn om opdrachten uit te voeren.

  1. Klik op rekenpictogramCompute in de zijbalk.
  2. Klik op Nieuw pictogramNieuw in de zijbalk en selecteer Cluster. Hiermee opent u de pagina Nieuw cluster/compute.
  3. Geef een unieke naam op voor het cluster.
  4. Selecteer het keuzerondje met één knooppunt .
  5. Selecteer Eén gebruiker in de vervolgkeuzelijst Toegangsmodus .
  6. Zorg ervoor dat uw e-mailadres zichtbaar is in het veld Eén gebruiker .
  7. Selecteer de gewenste Versie van Databricks Runtime, 11.1 of hoger om Unity Catalog te gebruiken.
  8. Klik op Rekenproces maken om het cluster te maken.

Zie Compute voor meer informatie over Databricks-clusters.

Stap 2: Een Databricks-notebook maken

Maak een notebook om aan de slag te gaan met het schrijven en uitvoeren van interactieve code in Azure Databricks.

  1. Klik op Nieuw pictogramNieuw in de zijbalk en klik vervolgens op Notitieblok.
  2. Op de pagina Notitieblok maken:
    • Geef een unieke naam op voor uw notitieblok.
    • Zorg ervoor dat de standaardtaal is ingesteld op Python.
    • Gebruik de vervolgkeuzelijst Verbinding maken om het cluster te selecteren dat u in stap 1 hebt gemaakt in de vervolgkeuzelijst Cluster.

Het notitieblok wordt geopend met één lege cel.

Zie Notitieblokken beheren voor meer informatie over het maken en beheren van notitieblokken.

Stap 3: Gegevens schrijven en lezen vanaf een externe locatie die wordt beheerd door Unity Catalog

Databricks raadt het gebruik van automatische laadprogramma's aan voor incrementele gegevensopname. Automatisch laden detecteert en verwerkt automatisch nieuwe bestanden wanneer ze binnenkomen in de opslag van cloudobjecten.

Gebruik Unity Catalog om beveiligde toegang tot externe locaties te beheren. Gebruikers of service-principals met READ FILES machtigingen op een externe locatie kunnen AutoLoader gebruiken om gegevens op te nemen.

Normaal gesproken komen gegevens binnen op een externe locatie vanwege schrijfbewerkingen van andere systemen. In deze demo kunt u de aankomst van gegevens simuleren door JSON-bestanden naar een externe locatie te schrijven.

Kopieer de onderstaande code naar een notebookcel. Vervang de tekenreekswaarde door catalog de naam van een catalogus door CREATE CATALOG en USE CATALOG machtigingen. Vervang de tekenreekswaarde voor external_location door het pad voor een externe locatie door READ FILES, WRITE FILESen CREATE EXTERNAL TABLE machtigingen.

Externe locaties kunnen worden gedefinieerd als een volledige opslagcontainer, maar verwijzen vaak naar een map die in een container is genest.

De juiste indeling voor een pad naar een externe locatie is "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

Als u deze cel uitvoert, moet een regel worden afgedrukt die 12 bytes leest, de tekenreeks 'Hallo wereld!' afdrukt en alle databases weergeeft die aanwezig zijn in de opgegeven catalogus. Als u deze cel niet kunt uitvoeren, controleert u of u zich in een werkruimte met Unity Catalog bevindt en vraagt u de juiste machtigingen aan van uw werkruimtebeheerder om deze zelfstudie te voltooien.

De onderstaande Python-code gebruikt uw e-mailadres om een unieke database te maken in de opgegeven catalogus en een unieke opslaglocatie op externe locatie. Als u deze cel uitvoert, worden alle gegevens verwijderd die aan deze zelfstudie zijn gekoppeld, zodat u dit voorbeeld idempotent kunt uitvoeren. Er wordt een klasse gedefinieerd en geïnstantieerd die u gaat gebruiken om batches met gegevens te simuleren die afkomstig zijn van een verbonden systeem naar uw externe bronlocatie.

Kopieer deze code naar een nieuwe cel in uw notebook en voer deze uit om uw omgeving te configureren.

Notitie

Met de variabelen die in deze code zijn gedefinieerd, kunt u deze veilig uitvoeren zonder dat er een conflict bestaat met bestaande werkruimteactiva of andere gebruikers. Beperkte netwerk- of opslagmachtigingen veroorzaken fouten bij het uitvoeren van deze code; neem contact op met uw werkruimtebeheerder om problemen met deze beperkingen op te lossen.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

U kunt nu een batch met gegevens plaatsen door de volgende code naar een cel te kopiëren en uit te voeren. U kunt deze cel tot 60 keer handmatig uitvoeren om nieuwe gegevens te activeren.

RawData.land_batch()

Stap 4: Automatisch laden configureren om gegevens op te nemen in Unity Catalog

Databricks raadt aan om gegevens op te slaan met Delta Lake. Delta Lake is een opensource-opslaglaag die ACID-transacties biedt en die data lakehouse mogelijk maakt. Delta Lake is de standaardindeling voor tabellen die zijn gemaakt in Databricks.

Als u automatisch laden wilt configureren om gegevens op te nemen in een Unity Catalog-tabel, kopieert en plakt u de volgende code in een lege cel in uw notebook:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Zie Wat is Automatisch laadprogramma?voor meer informatie over Automatisch laden.

Zie Unity Catalog gebruiken met Structured Streaming met Structured Streaming voor meer informatie over Structured Streaming met Unity Catalog.

Stap 5: Gegevens verwerken en ermee werken

Notebooks voeren logische cellen per cel uit. Gebruik deze stappen om de logica in uw cel uit te voeren:

  1. Als u de cel wilt uitvoeren die u in de vorige stap hebt voltooid, selecteert u de cel en drukt u op Shift+Enter.

  2. Als u een query wilt uitvoeren op de tabel die u zojuist hebt gemaakt, kopieert en plakt u de volgende code in een lege cel en drukt u op Shift+Enter om de cel uit te voeren.

    df = spark.read.table(table_name)
    
  3. Als u een voorbeeld van de gegevens in uw DataFrame wilt bekijken, kopieert en plakt u de volgende code in een lege cel en drukt u op Shift+Enter om de cel uit te voeren.

    display(df)
    

Zie Visualisaties in Databricks-notebooks voor meer informatie over interactieve opties voor het visualiseren van gegevens.

Stap 6: Een taak plannen

U kunt Databricks-notebooks uitvoeren als productiescripts door ze toe te voegen als een taak in een Databricks-taak. In deze stap maakt u een nieuwe taak die u handmatig kunt activeren.

Uw notitieblok als taak plannen:

  1. Klik aan de rechterkant van de koptekstbalk op Planning .
  2. Voer een unieke naam in voor de taaknaam.
  3. Klik op Handmatig.
  4. Selecteer in de vervolgkeuzelijst Cluster het cluster dat u in stap 1 hebt gemaakt.
  5. Klik op Create.
  6. Klik in het venster dat wordt weergegeven op Nu uitvoeren.
  7. Als u de resultaten van de taakuitvoering wilt zien, klikt u op het Externe koppeling pictogram naast de tijdstempel van de laatste uitvoering .

Zie Wat is Azure Databricks Jobs? voor meer informatie over taken.

Stap 7: Een query uitvoeren op een tabel uit Databricks SQL

Iedereen met de USE CATALOG machtiging voor de huidige catalogus, de USE SCHEMA machtiging voor het huidige schema en SELECT machtigingen voor de tabel kan een query uitvoeren op de inhoud van de tabel vanuit de gewenste Databricks-API.

U hebt toegang nodig tot een actief SQL-warehouse om query's uit te voeren in Databricks SQL.

De tabel die u eerder in deze zelfstudie hebt gemaakt, heeft de naam target_table. U kunt er query's op uitvoeren met behulp van de catalogus die u hebt opgegeven in de eerste cel en de database met de vadern e2e_lakehouse_<your-username>. U kunt Catalog Explorer gebruiken om de gegevensobjecten te vinden die u hebt gemaakt.

Aanvullende integraties

Meer informatie over integraties en hulpprogramma's voor data engineering met Azure Databricks: