Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Důležité
Tato funkce je ve verzi Public Preview.
Tato create_auto_cdc_from_snapshot_flow funkce vytvoří tok, který využívá funkce zachytávání změn dat (CDC) v deklarativních kanálech Lakeflow Spark ke zpracování zdrojových dat ze snímků databáze. Podívejte se, jak se CDC implementuje pomocí rozhraní API.
Poznámka:
Tato funkce nahrazuje předchozí funkci apply_changes_from_snapshot(). Obě funkce mají stejný podpis. Databricks doporučuje aktualizovat tak, aby používal nový název.
Důležité
Pro tuto operaci musíte mít cílovou streamovací tabulku.
K vytvoření požadované cílové tabulky můžete použít funkci create_streaming_table().
Syntaxe
from pyspark import pipelines as dp
dp.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Poznámka:
Při AUTO CDC FROM SNAPSHOT zpracování je výchozím chováním vložit nový řádek, pokud v cíli neexistuje odpovídající záznam se stejnými klíči. Pokud existuje odpovídající záznam, aktualizuje se pouze v případě, že se změnily některé hodnoty v řádku. Řádky s klíči, které jsou přítomné v cíli, ale již nejsou ve zdroji, se vymažou.
Další informace o zpracování CDC pomocí snímků najdete v tématu Rozhraní API AUTO CDC: Zjednodušení zachytávání dat změn pomocí datových kanálů. Příklady použití funkce najdete v pravidelných příkladech create_auto_cdc_from_snapshot_flow() snímků a historických příkladů příjmu snímků.
Parametry
| Parameter | Typ | Description |
|---|---|---|
target |
str |
Povinné. Název tabulky, která se má aktualizovat. Pomocí funkce create_streaming_table() můžete před spuštěním create_auto_cdc_from_snapshot_flow() funkce vytvořit cílovou tabulku. |
source |
str nebo lambda function |
Povinné. Název tabulky nebo zobrazení pro pravidelné snímky nebo funkci lambda Pythonu, která vrací datový rámec snímku, který se má zpracovat, a verzi snímku. Viz Implementace argumentusource. |
keys |
list |
Povinné. Sloupec nebo kombinace sloupců, které jednoznačně identifikují řádek ve zdrojových datech. Slouží k identifikaci událostí CDC, které se vztahují na konkrétní záznamy v cílové tabulce. Můžete zadat jednu z těchto:
|
stored_as_scd_type |
str nebo int |
Určuje, zda se mají ukládat záznamy jako SCD typu 1 nebo SCD typu 2. Nastavte na 1 typ SCD 1 nebo 2 pro SCD typ 2. Výchozí hodnota je SCD typu 1. |
track_history_column_list nebo track_history_except_column_list |
list |
Podmnožina výstupních sloupců, které se mají sledovat pro historii v cílové tabulce. Slouží track_history_column_list k určení úplného seznamu sloupců, které se mají sledovat. Slouží track_history_except_column_list k určení sloupců, které mají být vyloučeny ze sledování. Můžete deklarovat hodnotu jako seznam řetězců nebo jako funkce Spark SQL col() :
Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId), ale nemůžete použít col(source.userId). Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud funkci nepředáte žádný track_history_column_list argument nebo track_history_except_column_list argument. |
Implementace argumentu source
Funkce create_auto_cdc_from_snapshot_flow() obsahuje source argument. Pro zpracování historických snímků se očekává, že argumentem je funkce lambda Pythonu, která vrací dvě hodnoty source funkce: datový rámec Pythonu obsahující data snímku, create_auto_cdc_from_snapshot_flow() která se mají zpracovat, a verzi snímku.
Následuje podpis funkce lambda:
lambda Any => Optional[(DataFrame, Any)]
- Argumentem funkce lambda je nejnovější zpracovaná verze snímku.
- Návratová hodnota funkce lambda je
Nonenebo řazená kolekce členů dvou hodnot: První hodnota řazené kolekce členů je datový rámec obsahující snímek, který se má zpracovat. Druhá hodnota n-tice je verze snímku, která udává logické pořadí snímku.
Příklad, který implementuje a volá funkci lambda:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)
Běhové prostředí deklarativních potrubí Lakeflow Spark provádí při každé aktivaci potrubí, které obsahuje funkci create_auto_cdc_from_snapshot_flow(), následující kroky:
-
next_snapshot_and_versionSpustí funkci, která načte další datový rámec snímku a odpovídající verzi snímku. - Pokud se nevrátí žádný DataFrame, spuštění se ukončí a aktualizace pipeline se označí jako dokončená.
- Rozpozná změny v novém snímku a postupně je použije na cílovou tabulku.
- Vrací se ke kroku č. 1, aby načetl další snímek a jeho verzi.