Sdílet prostřednictvím


create_auto_cdc_from_snapshot_flow

Důležité

Tato funkce je ve verzi Public Preview.

Tato create_auto_cdc_from_snapshot_flow funkce vytvoří tok, který využívá funkce zachytávání změn dat (CDC) v deklarativních kanálech Lakeflow Spark ke zpracování zdrojových dat ze snímků databáze. Podívejte se, jak se CDC implementuje pomocí rozhraní API.

Poznámka:

Tato funkce nahrazuje předchozí funkci apply_changes_from_snapshot(). Obě funkce mají stejný podpis. Databricks doporučuje aktualizovat tak, aby používal nový název.

Důležité

Pro tuto operaci musíte mít cílovou streamovací tabulku.

K vytvoření požadované cílové tabulky můžete použít funkci create_streaming_table().

Syntaxe

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Poznámka:

Při AUTO CDC FROM SNAPSHOT zpracování je výchozím chováním vložit nový řádek, pokud v cíli neexistuje odpovídající záznam se stejnými klíči. Pokud existuje odpovídající záznam, aktualizuje se pouze v případě, že se změnily některé hodnoty v řádku. Řádky s klíči, které jsou přítomné v cíli, ale již nejsou ve zdroji, se vymažou.

Další informace o zpracování CDC pomocí snímků najdete v tématu Rozhraní API AUTO CDC: Zjednodušení zachytávání dat změn pomocí datových kanálů. Příklady použití funkce najdete v pravidelných příkladech create_auto_cdc_from_snapshot_flow() snímků a historických příkladů příjmu snímků.

Parametry

Parameter Typ Description
target str Povinné. Název tabulky, která se má aktualizovat. Pomocí funkce create_streaming_table() můžete před spuštěním create_auto_cdc_from_snapshot_flow() funkce vytvořit cílovou tabulku.
source str nebo lambda function Povinné. Název tabulky nebo zobrazení pro pravidelné snímky nebo funkci lambda Pythonu, která vrací datový rámec snímku, který se má zpracovat, a verzi snímku. Viz Implementace argumentusource.
keys list Povinné. Sloupec nebo kombinace sloupců, které jednoznačně identifikují řádek ve zdrojových datech. Slouží k identifikaci událostí CDC, které se vztahují na konkrétní záznamy v cílové tabulce. Můžete zadat jednu z těchto:
  • Seznam řetězců: ["userId", "orderId"]
  • Seznam funkcí Spark SQL col() : [col("userId"), col("orderId"]. Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId), ale nemůžete použít col(source.userId).
stored_as_scd_type str nebo int Určuje, zda se mají ukládat záznamy jako SCD typu 1 nebo SCD typu 2. Nastavte na 1 typ SCD 1 nebo 2 pro SCD typ 2. Výchozí hodnota je SCD typu 1.
track_history_column_list nebo track_history_except_column_list list Podmnožina výstupních sloupců, které se mají sledovat pro historii v cílové tabulce. Slouží track_history_column_list k určení úplného seznamu sloupců, které se mají sledovat. Slouží track_history_except_column_list k určení sloupců, které mají být vyloučeny ze sledování. Můžete deklarovat hodnotu jako seznam řetězců nebo jako funkce Spark SQL col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId), ale nemůžete použít col(source.userId). Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud funkci nepředáte žádný track_history_column_list argument nebo track_history_except_column_list argument.

Implementace argumentu source

Funkce create_auto_cdc_from_snapshot_flow() obsahuje source argument. Pro zpracování historických snímků se očekává, že argumentem je funkce lambda Pythonu, která vrací dvě hodnoty source funkce: datový rámec Pythonu obsahující data snímku, create_auto_cdc_from_snapshot_flow() která se mají zpracovat, a verzi snímku.

Následuje podpis funkce lambda:

lambda Any => Optional[(DataFrame, Any)]
  • Argumentem funkce lambda je nejnovější zpracovaná verze snímku.
  • Návratová hodnota funkce lambda je None nebo řazená kolekce členů dvou hodnot: První hodnota řazené kolekce členů je datový rámec obsahující snímek, který se má zpracovat. Druhá hodnota n-tice je verze snímku, která udává logické pořadí snímku.

Příklad, který implementuje a volá funkci lambda:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Běhové prostředí deklarativních potrubí Lakeflow Spark provádí při každé aktivaci potrubí, které obsahuje funkci create_auto_cdc_from_snapshot_flow(), následující kroky:

  1. next_snapshot_and_version Spustí funkci, která načte další datový rámec snímku a odpovídající verzi snímku.
  2. Pokud se nevrátí žádný DataFrame, spuštění se ukončí a aktualizace pipeline se označí jako dokončená.
  3. Rozpozná změny v novém snímku a postupně je použije na cílovou tabulku.
  4. Vrací se ke kroku č. 1, aby načetl další snímek a jeho verzi.