Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Funkce create_auto_cdc_flow() vytvoří tok, který používá funkci zachycení změn dat (CDC) Lakeflow Spark Declarative Pipelines ke zpracování zdrojových dat z datového kanálu změn (CDF).
Poznámka:
Tato funkce nahrazuje předchozí funkci apply_changes(). Obě funkce mají stejný podpis. Databricks doporučuje aktualizovat tak, aby používal nový název.
Důležité
Pokud chcete použít změny, musíte deklarovat cílovou streamovací tabulku. Volitelně můžete zadat schéma cílové tabulky. Při zadávání schématu create_auto_cdc_flow() cílové tabulky musíte zahrnout sloupce __START_AT a __END_AT se stejným datovým typem jako sequence_by pole.
K vytvoření požadované cílové tabulky můžete použít funkci create_streaming_table() v rozhraní Python pro sestavování datového toku.
Syntaxe
from pyspark import pipelines as dp
dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)
Pro zpracování create_auto_cdc_flow je výchozím chováním u událostí INSERT a UPDATE přidat nebo aktualizovat (upsertovat) události CDC ze zdroje: aktualizace všech řádků v cílové tabulce, které odpovídají zadaným klíčům, nebo vložení nového řádku, pokud v cílové tabulce neexistuje odpovídající záznam.
DELETE Zpracování událostí lze zadat pomocí parametruapply_as_deletes.
Další informace o zpracování CDC pomocí toku změn najdete v tématu Rozhraní API automatického CDC: Zjednodušte zachytávání změn dat pomocí pipeline. Příklad použití create_auto_cdc_flow() funkce, viz Příklad: SCD typ 1 a SCD typ 2 zpracování se zdrojovými daty CDF.
Parametry
| Parameter | Typ | Description |
|---|---|---|
target |
str |
Povinné. Název tabulky, která se má aktualizovat. Pomocí funkce create_streaming_table() můžete před spuštěním create_auto_cdc_flow() funkce vytvořit cílovou tabulku. |
source |
str |
Povinné. Zdroj dat obsahující záznamy CDC. |
keys |
list |
Povinné. Sloupec nebo kombinace sloupců, které jednoznačně identifikují řádek ve zdrojových datech. Slouží k identifikaci událostí CDC, které se vztahují na konkrétní záznamy v cílové tabulce. Můžete zadat jednu z těchto:
|
sequence_by |
str, col() nebo struct() |
Povinné. Názvy sloupců určující logické pořadí událostí CDC ve zdrojových datech. Deklarativní kanály Sparku Lakeflow používají toto sekvencování ke zpracování událostí změn, které přicházejí mimo pořadí. Zadaný sloupec musí mít typ dat, který lze řadit. Můžete zadat jednu z těchto:
|
ignore_null_updates |
bool |
Povolit ingestování aktualizací obsahujících podmnožinu cílových sloupců Když událost CDC odpovídá existujícímu řádku a ignore_null_updates je True, sloupce s null si zachovají své hodnoty v cílovém systému. To platí také pro vnořené sloupce s hodnotou null. Pokud ignore_null_updates je hodnota False, existující hodnoty se přepíšou null hodnotami.Výchozí hodnota je False. |
apply_as_deletes |
str nebo expr() |
Určuje, kdy se má událost CDC považovat za událost DELETE , nikoli jako upsert. Můžete zadat jednu z těchto:
Aby bylo možné zpracovat data mimo pořadí, odstraněný řádek se dočasně zachová jako náhrobek v podkladové tabulce Delta a v metastoru se vytvoří zobrazení, které vyfiltruje tyto náhrobky. Interval uchovávání je ve výchozím nastavení dva dny a dá se nakonfigurovat s pipelines.cdc.tombstoneGCThresholdInSeconds vlastností tabulky. |
apply_as_truncates |
str nebo expr() |
Určuje, kdy má být událost CDC považována za úplnou tabulku TRUNCATE. Můžete zadat jednu z těchto:
Vzhledem k tomu, že tato klauzule aktivuje úplné zkrácení cílové tabulky, měla by být použita pouze pro konkrétní případy použití vyžadující tuto funkci. Parametr apply_as_truncates je podporován pouze pro SCD typu 1. SCD typu 2 nepodporuje trunkovací operace. |
column_list nebo except_column_list |
list |
Podmnožina sloupců, které se mají zahrnout do cílové tabulky. Slouží column_list k určení kompletního seznamu sloupců, které se mají zahrnout. Slouží except_column_list k určení sloupců, které chcete vyloučit. Můžete deklarovat hodnotu jako seznam řetězců nebo jako funkce Spark SQL col() :
Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId), ale nemůžete použít col(source.userId). Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud funkci nepředáte žádný column_list argument nebo except_column_list argument. |
stored_as_scd_type |
str nebo int |
Určuje, zda se mají ukládat záznamy jako SCD typu 1 nebo SCD typu 2. Nastavte na 1 typ SCD 1 nebo 2 pro SCD typ 2. Výchozí hodnota je SCD typu 1. |
track_history_column_list nebo track_history_except_column_list |
list |
Podmnožina výstupních sloupců, které se mají sledovat pro historii v cílové tabulce. Slouží track_history_column_list k určení úplného seznamu sloupců, které se mají sledovat. Slouží track_history_except_column_list k určení sloupců, které mají být vyloučeny ze sledování. Můžete deklarovat hodnotu jako seznam řetězců nebo jako funkce Spark SQL col() :
Argumenty funkcí col() nemohou obsahovat kvalifikátory. Můžete například použít col(userId), ale nemůžete použít col(source.userId). Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud funkci nepředáte žádný track_history_column_list argument nebo track_history_except_column_list argument. |
name |
str |
Název toku. Pokud není zadaný, výchozí hodnota je stejná jako target. |
once |
bool |
Volitelně můžete tok definovat jako jednorázový tok, například jako backfill. Použití once=True změní tok dvěma způsoby:
|