Sdílet prostřednictvím


Vývoj kódu kanálu pomocí Pythonu

Deklarativní kanály Sparku Lakeflow (SDP) zavádí několik nových konstruktorů kódu Pythonu pro definování materializovaných zobrazení a streamovaných tabulek v kanálech. Podpora Pythonu pro vývoj kanálů vychází ze základních funkcí PySpark DataFrame a rozhraní API strukturovaného streamování.

Pro uživatele, kteří nezná Python a datové rámce, databricks doporučuje používat rozhraní SQL. Viz Vývoj kódu deklarativních kanálů Sparku Lakeflow pomocí SQL.

Úplnou referenci syntaxe Pythonu pro Lakeflow SDP najdete v Referenční příručce jazyka Python deklarativních kanálů Lakeflow Spark.

Základy Pythonu pro vývoj pipelinů

Kód Pythonu, který vytváří datové sady pipline, musí vracet datové rámce.

V modulu pyspark.pipelines jsou implementována všechna rozhraní API Python deklarativních kanálů Sparku Lakeflow. Kód potrubí implementovaný pomocí Pythonu musí explicitně importovat modul pipelines v horní části zdrojového kódu Pythonu. V našich příkladech použijeme následující příkaz importu a dp k odkazování na pipelines.

from pyspark import pipelines as dp

Poznámka:

Apache Spark™ zahrnuje deklarativní kanály začínající ve Sparku pyspark.pipelines 4.1, které jsou dostupné prostřednictvím modulu. Databricks Runtime rozšiřuje tyto opensourcové funkce s dalšími rozhraními API a integracemi pro spravované produkční použití.

Kód napsaný pomocí opensourcového pipelines modulu běží bez úprav v Azure Databricks. Následující funkce nejsou součástí Apache Sparku:

  • dp.create_auto_cdc_flow
  • dp.create_auto_cdc_from_snapshot_flow
  • @dp.expect(...)
  • @dp.temporary_view

Kanál čte a zapisuje ve výchozím nastavení do katalogu a schématu zadaného během konfigurace kanálu. Viz Nastavení cílového katalogu a schématu.

Kód Pythonu specifický pro kanály se liší od jiných typů kódu Pythonu jedním kritickým způsobem: Kód kanálu Pythonu přímo nevyvolá funkce, které provádějí příjem a transformaci dat při vytváření datových sad. Místo toho protokol SDP interpretuje funkce dekorátoru z modulu ve všech souborech zdrojového dp kódu nakonfigurovaných v pipeline a vytvoří dataflow graf.

Důležité

Abyste se vyhnuli neočekávanému chování při spuštění kanálu, nezahrňte kód, který může mít vedlejší účinky, do funkcí, které definují datové sady. Další informace najdete v referenčních příručkách Pythonu.

Vytvoření materializovaného zobrazení nebo tabulky streamování pomocí Pythonu

Použijte @dp.table, abyste vytvořili tabulku pro streamování z výsledků čtení streamu. Použijte @dp.materialized_view k vytvoření materializovaného zobrazení z výsledků dávkového čtení.

Ve výchozím nastavení jsou materializované názvy tabulek zobrazení a streamovaných tabulek odvozeny z názvů funkcí. Následující příklad kódu ukazuje základní syntaxi pro vytvoření materializované tabulky zobrazení a streamování:

Poznámka:

Obě funkce odkazují na stejnou tabulku v katalogu samples a používají stejnou funkci dekorátoru. Tyto příklady zvýrazňují, že jediný rozdíl v základní syntaxi materializovaných zobrazení a streamovaných tabulek je použití spark.read oproti spark.readStream.

Ne všechny zdroje dat podporují streamované čtení. Některé zdroje dat by se měly vždy zpracovávat sémantikou streamování.

from pyspark import pipelines as dp

@dp.materialized_view()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dp.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Volitelně můžete název tabulky zadat pomocí argumentu name v dekorátoru @dp.table. Následující příklad ukazuje tento model pro materializované zobrazení a streamovací tabulku:

from pyspark import pipelines as dp

@dp.materialized_view(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dp.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Načtení dat z úložiště objektů

Kanály podporují načítání dat ze všech formátů podporovaných službou Azure Databricks. Viz Možnosti formátu dat.

Poznámka:

Tyto příklady používají data dostupná v /databricks-datasets automaticky připojená k vašemu pracovnímu prostoru. Databricks doporučuje používat cesty svazků nebo cloudové identifikátory URI k odkazování na data uložená v cloudovém úložišti objektů. Viz Co jsou svazky katalogu Unity?.

Databricks doporučuje používat Auto Loader a streamovací tabulky při konfiguraci úloh pro inkrementální příjem dat z dat uložených v cloudovém objektovém úložišti. Podívejte se na Co je to Auto Loader?

Následující příklad ukazuje, jak vytvořit streamovací tabulku ze souborů JSON pomocí Auto Loaderu:

from pyspark import pipelines as dp

@dp.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Následující příklad používá sémantiku dávky ke čtení adresáře JSON a vytvoření materializovaného zobrazení:

from pyspark import pipelines as dp

@dp.materialized_view()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Ověření dat s očekáváními

Pomocí očekávání můžete nastavit a vynutit omezení kvality dat. Viz Spravujte kvalitu dat pomocí požadavků na datový potrubí.

Následující kód pomocí @dp.expect_or_drop definuje očekávanou pojmenovanou valid_data, která během příjmu dat zahodí záznamy, které mají hodnotu null:

from pyspark import pipelines as dp

@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Provádění dotazů na materializovaná zobrazení a streamované tabulky definované v datovém potrubí

Následující příklad definuje čtyři datové sady:

  • Streamovaná tabulka s názvem orders, která načítá data JSON.
  • Materializované zobrazení s názvem customers, které načte data CSV.
  • Materializované zobrazení s názvem customer_orders, které spojuje záznamy z datových sad orders a customers, přetypuje časové razítko objednávky na datum a vybere pole customer_id, order_number, statea order_date.
  • Materializované zobrazení s názvem daily_orders_by_state, které agreguje denní počet objednávek pro každý stav.

Poznámka:

Při dotazování zobrazení nebo tabulek v kanálu můžete přímo zadat katalog a schéma nebo můžete použít výchozí hodnoty nakonfigurované v kanálu. V tomto příkladu se tabulky orders, customersa customer_orders zapisují a čtou z výchozího katalogu a schématu nakonfigurovaného pro váš kanál.

Starší způsob publikace používá schéma LIVE k dotazování jiných materializovaných zobrazení a streamovaných tabulek definovaných ve vašem datovém toku. V nových potrubích je syntaxe schématu LIVE ignorována bez upozornění. Viz LIVE schema (starší verze).

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dp.materialized_view()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dp.materialized_view()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dp.materialized_view()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Vytváření tabulek ve smyčce for

Smyčky for Pythonu můžete použít k programovému vytvoření více tabulek. To může být užitečné v případě, že máte mnoho zdrojů dat nebo cílových datových sad, které se liší pouze několika parametry, což vede k menšímu celkovému kódu pro zachování a snížení redundance kódu.

Smyčka for vyhodnocuje logiku v sériovém pořadí, ale jakmile je plánování datových sad dokončeno, potrubí spouští logiku paralelně.

Důležité

Při použití tohoto vzoru pro definování datových sad se ujistěte, že seznam hodnot předaných do smyčky for je vždy aditivní. Pokud je datová sada dříve definovaná v kanálu vynechána z budoucího spuštění kanálu, tato datová sada se automaticky zahodí z cílového schématu.

Následující příklad vytvoří pět tabulek, které filtrují objednávky zákazníků podle oblastí. V tomto případě se název oblasti používá k nastavení názvu cílových materializovaných zobrazení a k filtrování zdrojových dat. Dočasná zobrazení slouží k definování spojení ze zdrojových tabulek používaných při vytváření konečných materializovaných zobrazení.

from pyspark import pipelines as dp
from pyspark.sql.functions import collect_list, col

@dp.temporary_view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dp.temporary_view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dp.materialized_view(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Následuje příklad grafu toku dat pro tento kanál:

graf toku dat se dvěma zobrazeními vedoucími do pěti regionálních tabulek.

Řešení potíží: smyčka for vytváří mnoho tabulek se stejnými hodnotami

Opožděný model spouštění, který pipeliny používají k vyhodnocení kódu Pythonu, vyžaduje, aby logika přímo odkazovala na jednotlivé hodnoty, když je vyvolána funkce dekorovaná @dp.materialized_view().

Následující příklad ukazuje dva správné přístupy k definování tabulek pomocí smyčky for. V obou příkladech je každý název tabulky ze seznamu tables explicitně odkazován v rámci funkce zdobené @dp.materialized_view().

from pyspark import pipelines as dp

# Create a parent function to set local variables

def create_table(table_name):
  @dp.materialized_view(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dp.materialized_view()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dp.materialized_view(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

Následující příklad neodkazuje na hodnoty správně. Tento příklad vytvoří tabulky s jedinečnými názvy, ale všechny tabulky načítají data z poslední hodnoty ve smyčce for:

from pyspark import pipelines as dp

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dp.materialized(name=t_name)
  def create_table():
    return spark.read.table(t_name)

Trvalé odstranění záznamů z materializovaného zobrazení nebo tabulky streamování

Pokud chcete trvale odstranit záznamy z materializovaného zobrazení nebo tabulky streamování s povolenými vektory odstranění, jako je například dodržování předpisů GDPR, musí být u podkladových tabulek Delta objektu provedeny další operace. Pokud chcete zajistit odstranění záznamů z materializovaného zobrazení, přečtěte si Trvalé odstranění záznamů z materializovaného zobrazení s povolenými vektory odstranění. Pokud chcete zajistit odstranění záznamů z tabulky streamování, přečtěte si téma Trvalé odstranění záznamů z tabulky streamování.