Megosztás a következőn keresztül:


create_auto_cdc_from_snapshot_flow

A create_auto_cdc_from_snapshot_flow függvény létrehoz egy folyamatot, amely a Lakeflow Spark Deklaratív folyamatok módosítási adatrögzítési (CDC) funkcióját használja a forrásadatok adatbázis-pillanatképekből való feldolgozásához. Lásd: Hogyan implementálják a CDC-t az AUTO CDC FROM SNAPSHOT API-val?

Megjegyzés:

Ez a függvény lecseréli az előző függvényt apply_changes_from_snapshot(). A két függvény ugyanazzal a szignatúrával rendelkezik. A Databricks azt javasolja, hogy frissítsen az új név használatára.

Fontos

Ehhez a művelethez rendelkeznie kell egy célstreamelési táblával.

A szükséges céltábla létrehozásához használhatja a create_streaming_table() függvényt.

Szemantika

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Megjegyzés:

A AUTO CDC FROM SNAPSHOT feldolgozáshoz az alapértelmezett viselkedés az, hogy új sort szúr be, ha nem létezik azonos kulccsal rendelkező rekord a célobjektumban. Ha létezik egyező rekord, az csak akkor frissül, ha a sorban szereplő értékek bármelyike módosult. A célban található, de a forrásban már nem szereplő kulcsokat tartalmazó sorok törlődnek.

A CDC pillanatképekkel történő feldolgozásával kapcsolatos további információkért lásd The AUTO CDC APIs: Az változáskövetés egyszerűsítése folyamatokkal. A függvény használatára vonatkozó példákért tekintse meg a create_auto_cdc_from_snapshot_flow()rendszeres pillanatkép-betöltési és az előzmény-pillanatkép-betöltési példákat.

Paraméterek

Paraméter Típus Description
target str Szükséges. A frissíteni kívánt tábla neve. A create_streaming_table() függvénnyel a függvény végrehajtása előtt létrehozhatja a céltáblát create_auto_cdc_from_snapshot_flow() .
source str vagy lambda function Szükséges. Egy táblázat vagy nézet neve, amely rendszeres időközönként pillanatképet hoz létre, vagy egy Python lambda függvény, amely visszaadja a feldolgozandó pillanatkép dataFrame-et és a pillanatkép verzióját. Lásd : Az source argumentum implementálása.
keys list Szükséges. Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Ez annak azonosítására szolgál, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira. Megadhatja az alábbiakat:
  • Sztringek listája: ["userId", "orderId"]
  • A Spark SQL-függvények col() listája: [col("userId"), col("orderId"]. A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId).
stored_as_scd_type str vagy int A rekordok tárolása 1. vagy 2. SCD-típusként. Állítsa be 1 formátumra SCD 1-es típushoz, vagy 2 formátumra SCD 2-es típushoz. Az alapértelmezett scd típus 1.
track_history_column_list vagy track_history_except_column_list list A céltábla történetéhez nyomon követendő kimeneti oszlopok egy részhalmaza. A követendő oszlopok teljes listájának megadására használható track_history_column_list . A nyomon követésből kizárandó oszlopok megadására használható track_history_except_column_list . Az érték deklarálható sztringek listájaként vagy Spark SQL-függvényként col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId). Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha nem track_history_column_list vagy track_history_except_column_list argumentumot ad át a függvénynek.

Az source argumentum megvalósítása

A create_auto_cdc_from_snapshot_flow() függvény tartalmazza az argumentumot source . Az előzmény pillanatképek feldolgozásához az source argumentum egy Python lambda függvény lesz, amely két értéket ad vissza a create_auto_cdc_from_snapshot_flow() függvénynek: a feldolgozandó pillanatképadatokat tartalmazó Python DataFrame-et és egy pillanatkép-verziót.

A lambda függvény aláírása a következő:

lambda Any => Optional[(DataFrame, Any)]
  • A lambda függvény argumentuma a legutóbb feldolgozott pillanatkép verziója.
  • A lambda függvény None visszatérési értéke vagy két értékből álló rekord: A rekord első értéke a feldolgozandó pillanatképet tartalmazó DataFrame. A tuple második értéke a pillanatkép verzió, amely a pillanatkép logikai sorrendjét képviseli.

Példa a lambda függvény implementálására és meghívására:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

A Lakeflow Spark Deklaratív folyamatok futtatókörnyezete a következő lépéseket hajtja végre minden alkalommal, amikor a függvényt create_auto_cdc_from_snapshot_flow() tartalmazó folyamat aktiválódik:

  1. A függvény futtatásával next_snapshot_and_version betölti a következő pillanatkép-adatkeretet és a megfelelő pillanatkép-verziót.
  2. Ha a DataFrame nem ad vissza, a futtatás leáll, és a folyamatfrissítés befejezettként van megjelölve.
  3. Észleli az új pillanatkép módosításait, és növekményesen alkalmazza őket a céltáblára.
  4. Visszatér az 1. lépéshez, hogy betöltse a következő pillanatképet és annak verzióját.