Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
A create_auto_cdc_flow() függvény létrehoz egy folyamatot, amely a Lakeflow Spark Deklaratív folyamatok módosítási adatrögzítési (CDC) funkcióját használja a forrásadatok változásadat-adatcsatornából (CDF) való feldolgozásához.
Megjegyzés:
Ez a függvény lecseréli az előző függvényt apply_changes(). A két függvény ugyanazzal a szignatúrával rendelkezik. A Databricks azt javasolja, hogy frissítsen az új név használatára.
Fontos
A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A céltábla sémájának create_auto_cdc_flow() megadásakor a __START_AT mezőkkel azonos adattípusú __END_AT és sequence_by oszlopokat kell tartalmaznia.
A szükséges céltábla létrehozásához használhatja a create_streaming_table() függvényt a folyamat Python-felületén.
Szemantika
from pyspark import pipelines as dp
dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)
A create_auto_cdc_flow feldolgozásához az alapértelmezett viselkedés a INSERT és UPDATE események esetében az, hogy a forrásból származó CDC-eseményeket frissítjük és beszúrjuk: frissítse a céltábla azon sorait, amelyek megfelelnek a megadott kulcs(ok)nak, vagy szúrjon be egy új sort, ha egyező rekord nem található a céltáblában. Az események kezelése DELETE a apply_as_deletes paraméterrel adható meg.
Ha többet szeretne megtudni a CDC változáscsatornával történő feldolgozásáról, olvassa el az AUTO CDC API-kat: Egyszerűsítse a változásadat-rögzítést a folyamatokkal. A create_auto_cdc_flow() függvény használatának példáját lásd itt: Példa: SCD 1. és SCD 2. típusú feldolgozás CDF forrásadatokkal.
Paraméterek
| Paraméter | Típus | Description |
|---|---|---|
target |
str |
Szükséges. A frissíteni kívánt tábla neve. A create_streaming_table() függvénnyel a függvény végrehajtása előtt létrehozhatja a céltáblát create_auto_cdc_flow() . |
source |
str |
Szükséges. A CDC-rekordokat tartalmazó adatforrás. |
keys |
list |
Szükséges. Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Ez annak azonosítására szolgál, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira. Megadhatja az alábbiakat:
|
sequence_by |
str, col() vagy struct() |
Szükséges. A forrásadatokban a CDC-események logikai sorrendjét meghatározó oszlopnevek. A Lakeflow Spark Declarative Pipelines ezt a sorrendet használja a sorrenden kívül érkező események kezelésére. A megadott oszlopnak rendezhető adattípusnak kell lennie. Megadhatja az alábbiakat:
|
ignore_null_updates |
bool |
A céloszlopok egy részhalmazát tartalmazó frissítések betöltésének engedélyezése. Ha egy CDC-esemény egyezik egy meglévő sorralignore_null_updates, és True az, az null oszlopok megőrzik a célhelyen meglévő értékeiket. Ez a beágyazott oszlopokra is vonatkozik, amelynek értéke a .null Ha ignore_null_updatesFalse van, a meglévő értékek felülíródnak a null értékekkel.Az alapértelmezett érték a False. |
apply_as_deletes |
str vagy expr() |
Megadja, hogy a CDC-eseményeket mikor kell DELETE-ként, és nem upsertként kezelni. Megadhatja az alábbiakat:
A rendelésen kívüli adatok kezeléséhez a törölt sor ideiglenesen sírkőként marad meg az alapul szolgáló Delta-táblában, és létrejön egy nézet a metaadattárban, amely kiszűri ezeket a sírköveket. A megőrzési időköz alapértelmezés szerint két nap, és a táblatulajdonságokkal pipelines.cdc.tombstoneGCThresholdInSeconds konfigurálható. |
apply_as_truncates |
str vagy expr() |
Megadja, hogy a CDC-események mikor legyenek teljes táblaként TRUNCATEkezelve. Megadhatja az alábbiakat:
Mivel ez a záradék a céltábla teljes csonkját aktiválja, csak a funkciót igénylő konkrét használati esetekhez használható. A apply_as_truncates paraméter csak az 1. SCD-típus esetén támogatott. A 2. SCD-típus nem támogatja a csonkolási műveleteket. |
column_list vagy except_column_list |
list |
A céltáblában szerepeltetni kívánt oszlopok egy részhalmaza. A belefoglalandó oszlopok teljes listájának megadására használható column_list . Itt except_column_list adhatja meg a kizárni kívánt oszlopokat. Az érték deklarálható sztringek listájaként vagy Spark SQL-függvényként col() :
A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId). Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha nem column_list vagy except_column_list argumentumot ad át a függvénynek. |
stored_as_scd_type |
str vagy int |
A rekordok tárolása 1. vagy 2. SCD-típusként. Állítsa be 1 formátumra SCD 1-es típushoz, vagy 2 formátumra SCD 2-es típushoz. Az alapértelmezett scd típus 1. |
track_history_column_list vagy track_history_except_column_list |
list |
A céltábla történetéhez nyomon követendő kimeneti oszlopok egy részhalmaza. A követendő oszlopok teljes listájának megadására használható track_history_column_list . A nyomon követésből kizárandó oszlopok megadására használható track_history_except_column_list . Az érték deklarálható sztringek listájaként vagy Spark SQL-függvényként col() :
A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId). Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha nem track_history_column_list vagy track_history_except_column_list argumentumot ad át a függvénynek. |
name |
str |
A folyam neve. Ha nincs megadva, az alapértelmezett érték ugyanaz, mint targeta . |
once |
bool |
Igény szerint definiálja a folyamatot egyszeri folyamatként, például visszatöltésként. A once=True kétféleképpen változtatja meg a folyamatot.
|