Megosztás a következőn keresztül:


create_auto_cdc_flow

A create_auto_cdc_flow() függvény létrehoz egy folyamatot, amely a Lakeflow Spark Deklaratív folyamatok módosítási adatrögzítési (CDC) funkcióját használja a forrásadatok változásadat-adatcsatornából (CDF) való feldolgozásához.

Megjegyzés:

Ez a függvény lecseréli az előző függvényt apply_changes(). A két függvény ugyanazzal a szignatúrával rendelkezik. A Databricks azt javasolja, hogy frissítsen az új név használatára.

Fontos

A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A céltábla sémájának create_auto_cdc_flow() megadásakor a __START_AT mezőkkel azonos adattípusú __END_AT és sequence_by oszlopokat kell tartalmaznia.

A szükséges céltábla létrehozásához használhatja a create_streaming_table() függvényt a folyamat Python-felületén.

Szemantika

from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = <bool>,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None,
  name = None,
  once = <bool>
)

A create_auto_cdc_flow feldolgozásához az alapértelmezett viselkedés a INSERT és UPDATE események esetében az, hogy a forrásból származó CDC-eseményeket frissítjük és beszúrjuk: frissítse a céltábla azon sorait, amelyek megfelelnek a megadott kulcs(ok)nak, vagy szúrjon be egy új sort, ha egyező rekord nem található a céltáblában. Az események kezelése DELETE a apply_as_deletes paraméterrel adható meg.

Ha többet szeretne megtudni a CDC változáscsatornával történő feldolgozásáról, olvassa el az AUTO CDC API-kat: Egyszerűsítse a változásadat-rögzítést a folyamatokkal. A create_auto_cdc_flow() függvény használatának példáját lásd itt: Példa: SCD 1. és SCD 2. típusú feldolgozás CDF forrásadatokkal.

Paraméterek

Paraméter Típus Description
target str Szükséges. A frissíteni kívánt tábla neve. A create_streaming_table() függvénnyel a függvény végrehajtása előtt létrehozhatja a céltáblát create_auto_cdc_flow() .
source str Szükséges. A CDC-rekordokat tartalmazó adatforrás.
keys list Szükséges. Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Ez annak azonosítására szolgál, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira. Megadhatja az alábbiakat:
  • Sztringek listája: ["userId", "orderId"]
  • A Spark SQL-függvények col() listája: [col("userId"), col("orderId")]. A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId).
sequence_by str, col() vagy struct() Szükséges. A forrásadatokban a CDC-események logikai sorrendjét meghatározó oszlopnevek. A Lakeflow Spark Declarative Pipelines ezt a sorrendet használja a sorrenden kívül érkező események kezelésére. A megadott oszlopnak rendezhető adattípusnak kell lennie. Megadhatja az alábbiakat:
  • Karakterlánc: "sequenceNum"
  • Spark SQL-függvény col() : col("sequenceNum"). A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId).
  • Több struct() oszlop összekapcsolása a kötések megszakításához: struct("timestamp_col", "id_col")először az első struct mező, majd a második mező szerint rendezi, ha döntetlen van, és így tovább.
ignore_null_updates bool A céloszlopok egy részhalmazát tartalmazó frissítések betöltésének engedélyezése. Ha egy CDC-esemény egyezik egy meglévő sorralignore_null_updates, és True az, az null oszlopok megőrzik a célhelyen meglévő értékeiket. Ez a beágyazott oszlopokra is vonatkozik, amelynek értéke a .null Ha ignore_null_updatesFalse van, a meglévő értékek felülíródnak a null értékekkel.
Az alapértelmezett érték a False.
apply_as_deletes str vagy expr() Megadja, hogy a CDC-eseményeket mikor kell DELETE-ként, és nem upsertként kezelni. Megadhatja az alábbiakat:
  • Karakterlánc: "Operation = 'DELETE'"
  • Spark SQL-függvény expr() : expr("Operation = 'DELETE'")

A rendelésen kívüli adatok kezeléséhez a törölt sor ideiglenesen sírkőként marad meg az alapul szolgáló Delta-táblában, és létrejön egy nézet a metaadattárban, amely kiszűri ezeket a sírköveket. A megőrzési időköz alapértelmezés szerint két nap, és a táblatulajdonságokkal pipelines.cdc.tombstoneGCThresholdInSeconds konfigurálható.
apply_as_truncates str vagy expr() Megadja, hogy a CDC-események mikor legyenek teljes táblaként TRUNCATEkezelve. Megadhatja az alábbiakat:
  • Karakterlánc: "Operation = 'TRUNCATE'"
  • Spark SQL-függvény expr() : expr("Operation = 'TRUNCATE'")

Mivel ez a záradék a céltábla teljes csonkját aktiválja, csak a funkciót igénylő konkrét használati esetekhez használható. A apply_as_truncates paraméter csak az 1. SCD-típus esetén támogatott. A 2. SCD-típus nem támogatja a csonkolási műveleteket.
column_list vagy except_column_list list A céltáblában szerepeltetni kívánt oszlopok egy részhalmaza. A belefoglalandó oszlopok teljes listájának megadására használható column_list . Itt except_column_list adhatja meg a kizárni kívánt oszlopokat. Az érték deklarálható sztringek listájaként vagy Spark SQL-függvényként col() :
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId). Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha nem column_list vagy except_column_list argumentumot ad át a függvénynek.
stored_as_scd_type str vagy int A rekordok tárolása 1. vagy 2. SCD-típusként. Állítsa be 1 formátumra SCD 1-es típushoz, vagy 2 formátumra SCD 2-es típushoz. Az alapértelmezett scd típus 1.
track_history_column_list vagy track_history_except_column_list list A céltábla történetéhez nyomon követendő kimeneti oszlopok egy részhalmaza. A követendő oszlopok teljes listájának megadására használható track_history_column_list . A nyomon követésből kizárandó oszlopok megadására használható track_history_except_column_list . Az érték deklarálható sztringek listájaként vagy Spark SQL-függvényként col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId). Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha nem track_history_column_list vagy track_history_except_column_list argumentumot ad át a függvénynek.
name str A folyam neve. Ha nincs megadva, az alapértelmezett érték ugyanaz, mint targeta .
once bool Igény szerint definiálja a folyamatot egyszeri folyamatként, például visszatöltésként. A once=True kétféleképpen változtatja meg a folyamatot.
  • A visszatérési érték. streaming-query. Ebben az esetben kötegelt DataFrame-nek kell lennie, nem folyamatos DataFrame-nek.
  • A folyamat alapértelmezés szerint egyszer fut. Ha a folyamat teljes frissítéssel frissül, akkor a ONCE folyamat újra fut az adatok újbóli létrehozásához.