Sdílet prostřednictvím


Referenční dokumentace jazyka Python pro rozdílové živé tabulky

Tento článek obsahuje podrobnosti o programovacím rozhraní Delta Live Tables Python.

Informace o rozhraní SQL API najdete v referenčních informacích k jazyku SQL Delta Live Tables.

Podrobnosti týkající se konfigurace automatického zavaděče najdete v tématu Co je automatický zavaděč?.

Než začnete

Při implementaci kanálů s rozhraním Delta Live Tables Python je důležité vzít v úvahu následující skutečnosti:

  • Vzhledem k tomu, že Python table() a view() funkce se během plánování a spuštění aktualizace kanálu vyvolávají vícekrát, nezahrnujte do jedné z těchto funkcí kód, který může mít vedlejší účinky (například kód, který upravuje data nebo odesílá e-mail). Aby nedocházelo k neočekávanému chování, měly by funkce Pythonu, které definují datové sady, obsahovat pouze kód potřebný k definování tabulky nebo zobrazení.
  • K provádění operací, jako je odesílání e-mailů nebo integrace s externí monitorovací službou, zejména ve funkcích, které definují datové sady, použijte háky událostí. Implementace těchto operací ve funkcích, které definují datové sady, způsobí neočekávané chování.
  • table Python a view funkce musí vracet datový rámec. Některé funkce, které pracují s datovými rámci, nevrací datové rámce a neměly by se používat. Mezi tyto operace patří funkce, jako collect()jsou , count(), toPandas(), save()a saveAsTable(). Vzhledem k tomu, že transformace datového rámce se provádějí po vyřešení celého grafu toku dat, použití těchto operací může mít nežádoucí vedlejší účinky.

Import modulu Pythonu dlt

Funkce Pythonu Delta Live Tables jsou definovány v dlt modulu. Vaše kanály implementované pomocí rozhraní Python API musí importovat tento modul:

import dlt

Vytvoření materializovaného zobrazení nebo streamované tabulky Delta Live Tables

Rozdílové živé tabulky v Pythonu určují, jestli se má datová sada aktualizovat jako materializovaná nebo streamovaná tabulka na základě definovaného dotazu. Dekorátor @table lze použít k definování materializovaných zobrazení i streamovaných tabulek.

Pokud chcete definovat materializované zobrazení v Pythonu, použijte @table dotaz, který provádí statické čtení proti zdroji dat. Pokud chcete definovat streamovací tabulku, použijte @table dotaz, který provádí čtení streamování vůči zdroji dat nebo používá funkci create_streaming_table(). Oba typy datových sad mají stejnou specifikaci syntaxe:

Poznámka:

Pokud chcete tento argument použít cluster_by k povolení clusteringu liquid, musí být kanál nakonfigurovaný tak, aby používal kanál preview.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Vytvoření zobrazení Delta Live Tables

Pokud chcete definovat zobrazení v Pythonu @view , použijte dekorátor. @table Podobně jako dekorátor můžete pro statické datové sady nebo streamované datové sady použít zobrazení v dynamických tabulkách Delta. Následuje syntaxe pro definování zobrazení pomocí Pythonu:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Příklad: Definování tabulek a zobrazení

Pokud chcete definovat tabulku nebo zobrazení v Pythonu, použijte @dlt.view u funkce dekorátor nebo @dlt.table dekorátor. Název funkce nebo name parametr můžete použít k přiřazení názvu tabulky nebo zobrazení. Následující příklad definuje dvě různé datové sady: zobrazení označované taxi_raw jako vstupní zdroj přebírá soubor JSON a tabulku, filtered_data která přebírá taxi_raw zobrazení jako vstup:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Příklad: Přístup k datové sadě definované ve stejném kanálu

Kromě čtení z externích zdrojů dat můžete přistupovat k datovým sadám definovaným ve stejném kanálu pomocí funkce Delta Live Tables read() . Následující příklad ukazuje vytvoření customers_filtered datové sady pomocí read() funkce:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

Funkci můžete také použít spark.table() pro přístup k datové sadě definované ve stejném kanálu. Při použití spark.table() funkce pro přístup k datové sadě definované v kanálu, v argumentu funkce předpenpen LIVE klíčové slovo na název datové sady:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Příklad: Čtení z tabulky zaregistrované v metastoru

Pokud chcete číst data z tabulky zaregistrované v metastoru Hive, v argumentu funkce vynechat LIVE klíčové slovo a volitelně kvalifikovat název tabulky s názvem databáze:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Příklad čtení z tabulky katalogu Unity najdete v tématu Ingestování dat do kanálu katalogu Unity.

Příklad: Přístup k datové sadě pomocí spark.sql

Datovou sadu můžete vrátit také pomocí výrazu spark.sql ve funkci dotazu. Pokud chcete číst z interní datové sady, předejděte LIVE. název datové sady:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Vytvoření tabulky, která se použije jako cíl operací streamování

create_streaming_table() Pomocí funkce můžete vytvořit cílovou tabulku pro výstup záznamů podle operací streamování, včetně apply_changes(), apply_changes_from_snapshot() a @append_flow výstupních záznamů.

Poznámka:

Funkce create_target_table() jsou create_streaming_live_table() zastaralé. Databricks doporučuje aktualizovat existující kód tak, aby funkci používal create_streaming_table() .

Poznámka:

Pokud chcete tento argument použít cluster_by k povolení clusteringu liquid, musí být kanál nakonfigurovaný tak, aby používal kanál preview.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Argumenty
name

Typ: str

Název tabulky.

Tento parametr je povinný.
comment

Typ: str

Volitelný popis tabulky.
spark_conf

Typ: dict

Volitelný seznam konfigurací Sparku pro spuštění tohoto dotazu.
table_properties

Typ: dict

Volitelný seznam vlastností tabulky pro tabulku.
partition_cols

Typ: array

Volitelný seznam jednoho nebo více sloupců, které se mají použít k dělení tabulky.
cluster_by

Typ: array

Volitelně můžete povolit clustering liquid v tabulce a definovat sloupce, které se mají použít jako klíče clusteringu.

Viz Použití liquid clusteringu pro tabulky Delta.
path

Typ: str

Volitelné umístění úložiště pro data tabulky. Pokud není nastavená, systém ve výchozím nastavení nastaví umístění úložiště kanálu.
schema

Typ: str nebo StructType

Volitelná definice schématu pro tabulku. Schémata je možné definovat jako řetězec DDL SQL nebo pomocí Pythonu.
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Typ: dict

Volitelná omezení kvality dat pro tabulku Podívejte se na několik očekávání.
row_filter (Public Preview)

Typ: str

Volitelná klauzule filtru řádků pro tabulku. Viz Publikování tabulek s filtry řádků a maskami sloupců.

Řízení způsobu materializace tabulek

Tabulky také nabízejí další kontrolu nad jejich materializací:

  • Určete, jak se tabulky rozdělují pomocí partition_cols. K urychlení dotazů můžete použít dělení.
  • Vlastnosti tabulky můžete nastavit při definování zobrazení nebo tabulky. Viz Vlastnosti tabulky Delta Live Tables.
  • Nastavte umístění úložiště pro data tabulky pomocí path nastavení. Ve výchozím nastavení se data tabulky ukládají v umístění úložiště kanálu, pokud path nejsou nastavená.
  • V definici schématu můžete použít vygenerované sloupce . Viz příklad: Zadejte schéma a sloupce oddílů.

Poznámka:

U tabulek, které mají velikost menší než 1 TB, databricks doporučuje řídit organizaci dat Delta Live Tables. Sloupce oddílů byste neměli zadávat, pokud neočekáváte, že se tabulka rozšíří nad terabajt.

Příklad: Zadání schématu a sloupců oddílů

Volitelně můžete zadat schéma tabulky pomocí Pythonu StructType nebo řetězce SQL DDL. Při zadání pomocí řetězce DDL může definice obsahovat vygenerované sloupce.

Následující příklad vytvoří tabulku volanou sales se schématem zadaným pomocí Pythonu StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

Následující příklad určuje schéma tabulky pomocí řetězce DDL, definuje vygenerovaný sloupec a definuje sloupec oddílu:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Delta Live Tables ve výchozím nastavení odvodí schéma z table definice, pokud nezadáte schéma.

Konfigurace streamované tabulky tak, aby ignorovala změny ve zdrojové streamovací tabulce

Poznámka:

  • Příznak skipChangeCommits funguje jenom s spark.readStream použitím option() funkce. Tento příznak nelze použít ve dlt.read_stream() funkci.
  • Příznak nelze použítskipChangeCommits, pokud je zdrojová tabulka streamování definována jako cíl funkce apply_changes().

Ve výchozím nastavení streamované tabulky vyžadují zdroje jen pro připojení. Pokud streamovací tabulka používá jako zdroj jinou streamovací tabulku a zdrojová streamovací tabulka vyžaduje aktualizace nebo odstranění, například zpracování gdpr "právo na zapomenutí", skipChangeCommits může být příznak nastaven při čtení zdrojové tabulky streamování, aby tyto změny ignoroval. Další informace o tomto příznaku naleznete v tématu Ignorovat aktualizace a odstranění.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Příklad: Definování omezení tabulky

Důležité

Omezení tabulky jsou ve verzi Public Preview.

Při zadávání schématu můžete definovat primární a cizí klíče. Omezení jsou informativní a nevynucují se. Viz klauzule CONSTRAINT v referenční dokumentaci jazyka SQL.

Následující příklad definuje tabulku s omezením primárního a cizího klíče:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Příklad: Definování filtru řádků a masky sloupců

Důležité

Filtry řádků a masky sloupců jsou ve verzi Public Preview.

Pokud chcete vytvořit materializované zobrazení nebo tabulku Streamování s filtrem řádků a maskou sloupců, použijte klauzuli ROW FILTER a klauzuli MASK. Následující příklad ukazuje, jak definovat materializované zobrazení a streamovací tabulku s filtrem řádků i maskou sloupce:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Další informace o filtrech řádků a maskách sloupců najdete v tématu Publikování tabulek s filtry řádků a maskami sloupců.

Python Delta Live Tables – vlastnosti

Následující tabulky popisují možnosti a vlastnosti, které můžete zadat při definování tabulek a zobrazení pomocí delta živých tabulek:

Poznámka:

Pokud chcete tento argument použít cluster_by k povolení clusteringu liquid, musí být kanál nakonfigurovaný tak, aby používal kanál preview.

@table nebo @view
name

Typ: str

Volitelný název tabulky nebo zobrazení. Pokud není definován, název funkce se použije jako název tabulky nebo zobrazení.
comment

Typ: str

Volitelný popis tabulky.
spark_conf

Typ: dict

Volitelný seznam konfigurací Sparku pro spuštění tohoto dotazu.
table_properties

Typ: dict

Volitelný seznam vlastností tabulky pro tabulku.
path

Typ: str

Volitelné umístění úložiště pro data tabulky. Pokud není nastavená, systém ve výchozím nastavení nastaví umístění úložiště kanálu.
partition_cols

Typ: a collection of str

Volitelná kolekce, list například jeden nebo více sloupců, které se mají použít k dělení tabulky.
cluster_by

Typ: array

Volitelně můžete povolit clustering liquid v tabulce a definovat sloupce, které se mají použít jako klíče clusteringu.

Viz Použití liquid clusteringu pro tabulky Delta.
schema

Typ: str nebo StructType

Volitelná definice schématu pro tabulku. Schémata lze definovat jako řetězec DDL SQL nebo pomocí Pythonu StructType.
temporary

Typ: bool

Vytvořte tabulku, ale nepublikujte metadata pro tabulku. Klíčové temporary slovo dává Delta Live Tables pokyn k vytvoření tabulky, která je k dispozici pro kanál, ale neměla by být přístupná mimo kanál. Aby se zkrátila doba zpracování, dočasná tabulka se zachová po celou dobu životnosti kanálu, který ho vytvoří, a ne jenom jednu aktualizaci.

Výchozí hodnota je False.
row_filter (Public Preview)

Typ: str

Volitelná klauzule filtru řádků pro tabulku. Viz Publikování tabulek s filtry řádků a maskami sloupců.
Definice tabulky nebo zobrazení
def <function-name>()

Funkce Pythonu, která definuje datovou sadu. name Pokud parametr není nastavený, <function-name> použije se jako název cílové datové sady.
query

Příkaz Spark SQL, který vrací datový rámec Spark Dataset nebo Koalas.

Použijte dlt.read() nebo spark.table() proveďte úplné čtení z datové sady definované ve stejném kanálu. Při použití spark.table() funkce ke čtení z datové sady definované ve stejném kanálu předpřipravené LIVE klíčové slovo na název datové sady v argumentu funkce. Pokud chcete například číst z datové sady s názvem customers:

spark.table("LIVE.customers")

Funkci můžete také použít spark.table() ke čtení z tabulky registrované v metastoru tak, že vynecháte LIVE klíčové slovo a volitelně kvalifikujete název tabulky s názvem databáze:

spark.table("sales.customers")

Slouží dlt.read_stream() k provedení streamování čtení z datové sady definované ve stejném kanálu.

spark.sql Pomocí funkce definujte dotaz SQL k vytvoření návratové datové sady.

Pomocí syntaxe PySpark definujte dotazy Delta Live Tables pomocí Pythonu.
Očekávání
@expect("description", "constraint")

Deklarace omezení kvality dat identifikovaných
description. Pokud řádek porušuje očekávání, zahrňte řádek do cílové datové sady.
@expect_or_drop("description", "constraint")

Deklarace omezení kvality dat identifikovaných
description. Pokud řádek porušuje očekávání, odstraňte řádek z cílové datové sady.
@expect_or_fail("description", "constraint")

Deklarace omezení kvality dat identifikovaných
description. Pokud řádek porušuje očekávání, okamžitě zastavte provádění.
@expect_all(expectations)

Deklarujte jedno nebo více omezení kvality dat.
expectations je slovník Pythonu, kde klíč představuje očekávaný popis a hodnota je očekávané omezení. Pokud řádek porušuje jakékoli očekávání, zahrňte řádek do cílové datové sady.
@expect_all_or_drop(expectations)

Deklarujte jedno nebo více omezení kvality dat.
expectations je slovník Pythonu, kde klíč představuje očekávaný popis a hodnota je očekávané omezení. Pokud řádek porušuje jakékoli očekávání, odstraňte řádek z cílové datové sady.
@expect_all_or_fail(expectations)

Deklarujte jedno nebo více omezení kvality dat.
expectations je slovník Pythonu, kde klíč představuje očekávaný popis a hodnota je očekávané omezení. Pokud řádek porušuje jakékoli očekávání, okamžitě zastavte provádění.

Změna zachytávání dat z kanálu změn pomocí Pythonu v rozdílových živých tabulkách

apply_changes() Pomocí funkce v rozhraní Python API můžete pomocí funkce cdC (Delta Live Tables Change Data Capture) zpracovávat zdrojová data z datového kanálu změn (CDF).

Důležité

Pokud chcete použít změny, musíte deklarovat cílovou streamovací tabulku. Volitelně můžete zadat schéma cílové tabulky. Při zadávání schématu apply_changes() cílové tabulky musíte zahrnout sloupce __START_AT se __END_AT stejným datovým typem jako sequence_by pole.

K vytvoření požadované cílové tabulky můžete použít funkci create_streaming_table() v rozhraní Pythonu Delta Live Tables.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Poznámka:

Pro APPLY CHANGES zpracování je výchozím chováním INSERT a UPDATE událostmi upsertovat události CDC ze zdroje: aktualizujte všechny řádky v cílové tabulce, které odpovídají zadaným klíčům, nebo vložte nový řádek, pokud v cílové tabulce neexistuje odpovídající záznam. Zpracování událostí DELETE lze zadat pomocí APPLY AS DELETE WHEN podmínky.

Další informace o zpracování CDC pomocí kanálu změn najdete v tématu Rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí rozdílových živých tabulek. Příklad použití apply_changes() funkce, viz Příklad: SCD typ 1 a SCD typ 2 zpracování se zdrojovými daty CDF.

Důležité

Pokud chcete použít změny, musíte deklarovat cílovou streamovací tabulku. Volitelně můžete zadat schéma cílové tabulky. Při zadávání schématu apply_changes cílové tabulky musíte zahrnout sloupce __START_AT se __END_AT stejným datovým typem jako sequence_by pole.

Viz rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí rozdílových živých tabulek.

Argumenty
target

Typ: str

Název tabulky, která se má aktualizovat. Před spuštěním apply_changes() funkce můžete pomocí funkce create_streaming_table() vytvořit cílovou tabulku.

Tento parametr je povinný.
source

Typ: str

Zdroj dat obsahující záznamy CDC.

Tento parametr je povinný.
keys

Typ: list

Sloupec nebo kombinace sloupců, které jednoznačně identifikují řádek ve zdrojových datech. Slouží k identifikaci událostí CDC, které se vztahují na konkrétní záznamy v cílové tabulce.

Můžete zadat jednu z těchto:

– Seznam řetězců: ["userId", "orderId"]
– Seznam funkcí Spark SQL col() : [col("userId"), col("orderId"]

Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId), ale nemůžete použít col(source.userId).

Tento parametr je povinný.
sequence_by

Typ: str nebo col()

Název sloupce určující logické pořadí událostí CDC ve zdrojových datech. Delta Live Tables používá toto sekvencování ke zpracování událostí změn, které přicházejí mimo pořadí.

Můžete zadat jednu z těchto:

– Řetězec: "sequenceNum"
– Funkce Spark SQL col() : col("sequenceNum")

Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId), ale nemůžete použít col(source.userId).

Zadaný sloupec musí být seřazený datový typ.

Tento parametr je povinný.
ignore_null_updates

Typ: bool

Povolit ingestování aktualizací obsahujících podmnožinu cílových sloupců Když událost CDC odpovídá existujícímu řádku a ignore_null_updates je True, sloupce se null zachovají své stávající hodnoty v cíli. To platí také pro vnořené sloupce s hodnotou null. Pokud ignore_null_updates je hodnota False, existující hodnoty se přepíšou null hodnotami.

Tento parametr je volitelný.

Výchozí hodnota je False.
apply_as_deletes

Typ: str nebo expr()

Určuje, kdy se má událost CDC považovat za událost DELETE , nikoli jako upsert. Aby bylo možné zpracovat data mimo pořadí, odstraněný řádek se dočasně zachová jako náhrobek v podkladové tabulce Delta a v metastoru se vytvoří zobrazení, které vyfiltruje tyto náhrobky. Interval uchovávání informací je možné nakonfigurovat pomocí
pipelines.cdc.tombstoneGCThresholdInSeconds vlastnost table.

Můžete zadat jednu z těchto:

– Řetězec: "Operation = 'DELETE'"
– Funkce Spark SQL expr() : expr("Operation = 'DELETE'")

Tento parametr je volitelný.
apply_as_truncates

Typ: str nebo expr()

Určuje, kdy má být událost CDC považována za úplnou tabulku TRUNCATE. Vzhledem k tomu, že tato klauzule aktivuje úplné zkrácení cílové tabulky, měla by být použita pouze pro konkrétní případy použití vyžadující tuto funkci.

Parametr apply_as_truncates je podporován pouze pro SCD typu 1. ScD typu 2 nepodporuje operace zkrácení.

Můžete zadat jednu z těchto:

– Řetězec: "Operation = 'TRUNCATE'"
– Funkce Spark SQL expr() : expr("Operation = 'TRUNCATE'")

Tento parametr je volitelný.
column_list

except_column_list

Typ: list

Podmnožina sloupců, které se mají zahrnout do cílové tabulky. Slouží column_list k určení kompletního seznamu sloupců, které se mají zahrnout. Slouží except_column_list k určení sloupců, které chcete vyloučit. Můžete deklarovat hodnotu jako seznam řetězců nebo jako funkce Spark SQL col() :

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId), ale nemůžete použít col(source.userId).

Tento parametr je volitelný.

Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud funkci nepředáte žádný column_list argument nebo except_column_list argument.
stored_as_scd_type

Typ: str nebo int

Určuje, zda se mají ukládat záznamy jako SCD typu 1 nebo SCD typu 2.

Nastavte na 1 typ SCD 1 nebo 2 pro SCD typ 2.

Tato klauzule je nepovinná.

Výchozí hodnota je SCD typu 1.
track_history_column_list

track_history_except_column_list

Typ: list

Podmnožina výstupních sloupců, které se mají sledovat pro historii v cílové tabulce. Slouží track_history_column_list k určení úplného seznamu sloupců, které se mají sledovat. Používání
track_history_except_column_list určit sloupce, které mají být vyloučeny ze sledování. Můžete deklarovat hodnotu jako seznam řetězců nebo jako funkce Spark SQL col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId), ale nemůžete použít col(source.userId).

Tento parametr je volitelný.

Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud ne nebo ne track_history_column_list .
track_history_except_column_list funkce je předána argumentu.

Změna zachytávání dat ze snímků databáze pomocí Pythonu v rozdílových živých tabulkách

Důležité

Rozhraní APPLY CHANGES FROM SNAPSHOT API je ve verzi Public Preview.

apply_changes_from_snapshot() Pomocí funkce v rozhraní Python API můžete ke zpracování zdrojových dat ze snímků databáze použít funkci cdC (Delta Live Tables Change Data Capture).

Důležité

Pokud chcete použít změny, musíte deklarovat cílovou streamovací tabulku. Volitelně můžete zadat schéma cílové tabulky. Při zadávání schématu apply_changes_from_snapshot() cílové tabulky musíte zahrnout __START_AT __END_AT i sloupce se stejným datovým typem jako sequence_by pole.

K vytvoření požadované cílové tabulky můžete použít funkci create_streaming_table() v rozhraní Pythonu Delta Live Tables.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Poznámka:

Při APPLY CHANGES FROM SNAPSHOT zpracování je výchozím chováním vložit nový řádek, pokud v cíli neexistuje odpovídající záznam se stejnými klíči. Pokud existuje odpovídající záznam, aktualizuje se pouze v případě, že se změnily některé hodnoty v řádku. Řádky s klíči, které jsou přítomné v cíli, ale již nejsou ve zdroji, se odstraní.

Další informace o zpracování CDC pomocí snímků najdete v tématu Rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí rozdílových živých tabulek. Příklady použití funkce najdete v pravidelných příkladech příjmu apply_changes_from_snapshot() snímků a historických příkladů příjmu snímků.

Argumenty
target

Typ: str

Název tabulky, která se má aktualizovat. Před spuštěním apply_changes() funkce můžete pomocí funkce create_streaming_table() vytvořit cílovou tabulku.

Tento parametr je povinný.
source

Typ: str nebo lambda function

Název tabulky nebo zobrazení pro pravidelné snímky nebo funkci lambda Pythonu, která vrací datový rámec snímku, který se má zpracovat, a verzi snímku. Viz Implementace zdrojového argumentu.

Tento parametr je povinný.
keys

Typ: list

Sloupec nebo kombinace sloupců, které jednoznačně identifikují řádek ve zdrojových datech. Slouží k identifikaci událostí CDC, které se vztahují na konkrétní záznamy v cílové tabulce.

Můžete zadat jednu z těchto:

– Seznam řetězců: ["userId", "orderId"]
– Seznam funkcí Spark SQL col() : [col("userId"), col("orderId"]

Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId), ale nemůžete použít col(source.userId).

Tento parametr je povinný.
stored_as_scd_type

Typ: str nebo int

Určuje, zda se mají ukládat záznamy jako SCD typu 1 nebo SCD typu 2.

Nastavte na 1 typ SCD 1 nebo 2 pro SCD typ 2.

Tato klauzule je nepovinná.

Výchozí hodnota je SCD typu 1.
track_history_column_list

track_history_except_column_list

Typ: list

Podmnožina výstupních sloupců, které se mají sledovat pro historii v cílové tabulce. Slouží track_history_column_list k určení úplného seznamu sloupců, které se mají sledovat. Používání
track_history_except_column_list určit sloupce, které mají být vyloučeny ze sledování. Můžete deklarovat hodnotu jako seznam řetězců nebo jako funkce Spark SQL col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId), ale nemůžete použít col(source.userId).

Tento parametr je volitelný.

Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud ne nebo ne track_history_column_list .
track_history_except_column_list funkce je předána argumentu.

Implementace argumentu source

Funkce apply_changes_from_snapshot() obsahuje source argument. Pro zpracování historických snímků se očekává, že argumentem je funkce lambda Pythonu, která vrací dvě hodnoty apply_changes_from_snapshot() funkce: datový rámec Pythonu obsahující data snímku, source která se mají zpracovat, a verzi snímku.

Následuje podpis funkce lambda:

lambda Any => Optional[(DataFrame, Any)]
  • Argumentem funkce lambda je nejnovější zpracovaná verze snímku.
  • Návratová hodnota funkce lambda je None nebo řazená kolekce členů dvou hodnot: První hodnota řazené kolekce členů je datový rámec obsahující snímek, který se má zpracovat. Druhá hodnota řazené kolekce členů je verze snímku, která představuje logické pořadí snímku.

Příklad, který implementuje a volá funkci lambda:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Modul runtime Delta Live Tables provede následující kroky pokaždé, když se aktivuje kanál obsahující apply_changes_from_snapshot() funkci:

  1. next_snapshot_and_version Spustí funkci, která načte další datový rámec snímku a odpovídající verzi snímku.
  2. Pokud se žádný datový rámec nevrátí, spuštění se ukončí a aktualizace kanálu se označí jako dokončená.
  3. Rozpozná změny v novém snímku a postupně je použije na cílovou tabulku.
  4. Vrátí krok 1, aby se načetl další snímek a jeho verze.

Omezení

Rozhraní Pythonu Delta Live Tables má následující omezení:

Funkce pivot() není podporována. Operace pivot ve Sparku vyžaduje dychtivé načítání vstupních dat pro výpočet schématu výstupu. Tato funkce není v dynamických tabulkách Delta podporovaná.