Delen via


Pijplijncode ontwikkelen met Python

Lakeflow Spark-declaratieve pijplijnen (SDP) introduceert verschillende nieuwe Python-codeconstructies voor het definiëren van gerealiseerde weergaven en streamingtabellen in pijplijnen. Python-ondersteuning voor het ontwikkelen van pijplijnen is gebaseerd op de basisbeginselen van PySpark DataFrame en Structured Streaming-API's.

Voor gebruikers die niet bekend zijn met Python en DataFrames, raadt Databricks het gebruik van de SQL-interface aan. Zie Code voor declaratieve pijplijnen van Lakeflow Spark ontwikkelen met SQL.

Zie voor een volledige verwijzing naar de Python-syntaxis van Lakeflow SDP de Python-taalreferentie van Lakeflow Spark declaratieve pijplijnen.

Basisbeginselen van Python voor pijplijnontwikkeling

Python-code waarmee pipeline-gegevenssets worden gemaakt, moet DataFrames retourneren.

Alle Python-API's van Lakeflow Spark-declaratieve pijplijnen worden geïmplementeerd in de pyspark.pipelines module. Uw pijplijncode die met Python is geïmplementeerd, moet de pipelines module expliciet boven aan de Python-bron importeren. In onze voorbeelden gebruiken we de volgende importopdracht en gebruiken dp we in voorbeelden waarnaar pipelineswordt verwezen.

from pyspark import pipelines as dp

Opmerking

Apache Spark™ bevat declaratieve pijplijnen vanaf Spark 4.1, die beschikbaar zijn via de pyspark.pipelines module. Databricks Runtime breidt deze opensource-mogelijkheden uit met aanvullende API's en integraties voor beheerd productiegebruik.

Code die is geschreven met de opensource-module pipelines wordt uitgevoerd zonder wijzigingen in Azure Databricks. De volgende functies maken geen deel uit van Apache Spark:

  • dp.create_auto_cdc_flow
  • dp.create_auto_cdc_from_snapshot_flow
  • @dp.expect(...)
  • @dp.temporary_view

Pijplijn leest en schrijft standaard naar de catalogus en het schema dat is opgegeven tijdens de pijplijnconfiguratie. Zie De doelcatalogus en het schema instellen.

Pijplijnspecifieke Python-code verschilt van andere typen Python-code op een kritieke manier: Python-pijplijncode roept niet rechtstreeks de functies aan die gegevensopname en transformatie uitvoeren om gegevenssets te maken. In plaats daarvan interpreteert SDP de decoratorfuncties uit de dp module in alle broncodebestanden die zijn geconfigureerd in een pijplijn en bouwt een gegevensstroomgrafiek.

Belangrijk

Als u onverwacht gedrag wilt voorkomen wanneer uw pijplijn wordt uitgevoerd, moet u geen code opnemen die mogelijk neveneffecten heeft in uw functies die gegevenssets definiëren. Zie de Python-verwijzing voor meer informatie.

Een gerealiseerde weergave of streamingtabel maken met Python

Hiermee @dp.table maakt u een streamingtabel op basis van de resultaten van een streaming-leesbewerking. Gebruik @dp.materialized_view om een geëmaterialiseerde weergave te maken op basis van de resultaten van een batchleesbewerking.

Standaard worden gerealiseerde weergave- en streamingtabelnamen afgeleid van functienamen. In het volgende codevoorbeeld ziet u de basissyntaxis voor het maken van een gerealiseerde weergave en streamingtabel:

Opmerking

Beide functies verwijzen naar dezelfde tabel in de samples catalogus en gebruiken dezelfde decoratorfunctie. In deze voorbeelden wordt benadrukt dat het enige verschil in de basissyntaxis voor gematerialiseerde weergaven en streamingtabellen het gebruik van spark.read tegenover spark.readStreamis.

Niet alle gegevensbronnen ondersteunen streaming-leesbewerkingen. Sommige gegevensbronnen moeten altijd worden verwerkt met semantiek voor streaming.

from pyspark import pipelines as dp

@dp.materialized_view()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dp.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

U kunt desgewenst de tabelnaam opgeven met behulp van het argument name in de @dp.table decorator. In het volgende voorbeeld ziet u dit patroon voor een gematerialiseerde weergave en streamingtabel:

from pyspark import pipelines as dp

@dp.materialized_view(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dp.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Gegevens laden uit objectopslag

Pijplijnen ondersteunen het laden van gegevens uit alle indelingen die worden ondersteund door Azure Databricks. Zie opties voor gegevensindeling.

Opmerking

In deze voorbeelden wordt gebruik gemaakt van gegevens die via de automatisch aan uw werkruimte gekoppelde /databricks-datasets beschikbaar zijn. Databricks raadt aan volumepaden of cloud-URI's te gebruiken om te verwijzen naar gegevens die zijn opgeslagen in cloudobjectopslag. Zie Wat zijn Unity Catalog-volumes?

Databricks raadt aan om automatisch laden en streamingtabellen te gebruiken bij het configureren van incrementele opnameworkloads voor gegevens die zijn opgeslagen in de opslag van cloudobjecten. Zie Wat is Auto Loader?

In het volgende voorbeeld wordt een streamingtabel gemaakt op basis van JSON-bestanden met behulp van automatisch laden:

from pyspark import pipelines as dp

@dp.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

In het volgende voorbeeld wordt batch-semantiek gebruikt om een JSON-map te lezen en een gerealiseerde weergave te maken:

from pyspark import pipelines as dp

@dp.materialized_view()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Gegevens valideren met verwachtingen

U kunt verwachtingen gebruiken om beperkingen voor gegevenskwaliteit in te stellen en af te dwingen. Zie Gegevenskwaliteit beheren met de verwachtingen van pijplijnen.

De volgende code gebruikt @dp.expect_or_drop om een verwachting te definiëren die records valid_data verwijdert die null zijn tijdens gegevensopname:

from pyspark import pipelines as dp

@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Gematerialiseerde weergaven en streamingtabellen opvragen die in uw pijplijn zijn gedefinieerd

In het volgende voorbeeld worden vier gegevenssets gedefinieerd:

  • Een streamingtabel met de naam orders waarmee JSON-gegevens worden geladen.
  • Een gerealiseerde weergave met de naam customers waarmee CSV-gegevens worden geladen.
  • Een gerealiseerde weergave genaamd customer_orders die records uit de orders- en customers-gegevenssets koppelt, de order-tijdstempel naar een datum omzet en de velden customer_id, order_number, stateen order_date selecteert.
  • Een gerealiseerde weergave met de naam daily_orders_by_state waarmee het dagelijkse aantal orders voor elke status wordt geaggregeerd.

Opmerking

Wanneer u query's uitvoert op weergaven of tabellen in uw pijplijn, kunt u de catalogus en het schema rechtstreeks opgeven of kunt u de standaardinstellingen gebruiken die zijn geconfigureerd in uw pijplijn. In dit voorbeeld worden de orders, customersen customer_orders tabellen geschreven en gelezen uit de standaardcatalogus en het standaardschema dat is geconfigureerd voor uw pijplijn.

Traditionele publicatiemodus maakt gebruik van het LIVE schema om een query uit te voeren op andere gematerialiseerde weergaven en streamingtabellen die zijn gedefinieerd in uw pijplijn. In nieuwe pijplijnen wordt de syntaxis van het LIVE schema stilletjes genegeerd. Zie LIVE-schema (verouderd).

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dp.materialized_view()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dp.materialized_view()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dp.materialized_view()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Tabellen maken in een for lus

U kunt Python for-lussen gebruiken om programmatisch meerdere tabellen te maken. Dit kan handig zijn wanneer u veel gegevensbronnen of doel-datasets hebt die slechts met een paar parameters variëren, wat resulteert in minder totale code om te onderhouden en minder coderedundantie.

De for lus evalueert logica in seriële volgorde, maar zodra de planning voor de gegevenssets is voltooid, wordt de logica van de pijplijn parallel uitgevoerd.

Belangrijk

Wanneer u dit patroon gebruikt om gegevenssets te definiëren, moet u ervoor zorgen dat de lijst met waarden die worden doorgegeven aan de for lus altijd additief is. Als een gegevensset die eerder in een pijplijn is gedefinieerd, wordt weggelaten uit een toekomstige pijplijnuitvoering, wordt die gegevensset automatisch verwijderd uit het doelschema.

In het volgende voorbeeld worden vijf tabellen gemaakt waarmee klantorders per regio worden gefilterd. Hier wordt de regionaam gebruikt om de naam van de doelmaterialized views in te stellen en de brongegevens te filteren. Tijdelijke weergaven worden gebruikt om koppelingen te definiëren uit de brontabellen die worden gebruikt bij het samenstellen van de uiteindelijke gematerialiseerde weergaven.

from pyspark import pipelines as dp
from pyspark.sql.functions import collect_list, col

@dp.temporary_view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dp.temporary_view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dp.materialized_view(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Hier volgt een voorbeeld van de gegevensstroomgrafiek voor deze pijplijn:

Een gegevensstroomgrafiek van twee weergaven die leiden tot vijf regionale tabellen.

Problemen oplossen: for lus maakt veel tabellen met dezelfde waarden

Het luie uitvoeringsmodel dat pijplijnen gebruiken om Python-code te evalueren, vereist dat je logica direct naar individuele waarden verwijst wanneer de functie, die door @dp.materialized_view() is versierd, wordt aangeroepen.

In het volgende voorbeeld ziet u twee juiste benaderingen voor het definiëren van tabellen met een for lus. In beide voorbeelden wordt expliciet naar elke tabelnaam uit de tables lijst verwezen binnen de functie die is gedecoreerd door @dp.materialized_view().

from pyspark import pipelines as dp

# Create a parent function to set local variables

def create_table(table_name):
  @dp.materialized_view(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dp.materialized_view()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dp.materialized_view(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

Het volgende voorbeeld verwijst niet correct naar waarden. In dit voorbeeld worden tabellen met afzonderlijke namen gemaakt, maar alle tabellen laden gegevens van de laatste waarde in de for lus:

from pyspark import pipelines as dp

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dp.materialized(name=t_name)
  def create_table():
    return spark.read.table(t_name)

Records definitief verwijderen uit een gerealiseerde weergave of streamingtabel

Als u records definitief wilt verwijderen uit een gerealiseerde weergave of streamingtabel waarvoor verwijderingsvectoren zijn ingeschakeld, zoals voor AVG-naleving, moeten extra bewerkingen worden uitgevoerd op de onderliggende Delta-tabellen van het object. Om ervoor te zorgen dat records uit een gerealiseerde weergave worden verwijderd, zie Records definitief verwijderen uit een gerealiseerde weergave met verwijderingsvectoren ingeschakeld. Zie Records definitief verwijderen uit een streamingtabelom ervoor te zorgen dat records uit een streamingtabel worden verwijderd.