Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Fontos
Ez a funkció nyilvános előzetes verzióban érhető el.
A create_auto_cdc_from_snapshot_flow függvény létrehoz egy folyamatot, amely a Lakeflow Spark Deklaratív folyamatok módosítási adatrögzítési (CDC) funkcióját használja a forrásadatok adatbázis-pillanatképekből való feldolgozásához. Lásd: Hogyan implementálják a CDC-t az AUTO CDC FROM SNAPSHOT API-val?
Megjegyzés:
Ez a függvény lecseréli az előző függvényt apply_changes_from_snapshot(). A két függvény ugyanazzal a szignatúrával rendelkezik. A Databricks azt javasolja, hogy frissítsen az új név használatára.
Fontos
Ehhez a művelethez rendelkeznie kell egy célstreamelési táblával.
A szükséges céltábla létrehozásához használhatja a create_streaming_table() függvényt.
Szemantika
from pyspark import pipelines as dp
dp.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Megjegyzés:
A AUTO CDC FROM SNAPSHOT feldolgozáshoz az alapértelmezett viselkedés az, hogy új sort szúr be, ha nem létezik azonos kulccsal rendelkező rekord a célobjektumban. Ha létezik egyező rekord, az csak akkor frissül, ha a sorban szereplő értékek bármelyike módosult. A célban található, de a forrásban már nem szereplő kulcsokat tartalmazó sorok törlődnek.
A CDC pillanatképekkel történő feldolgozásával kapcsolatos további információkért lásd The AUTO CDC APIs: Az változáskövetés egyszerűsítése folyamatokkal. A függvény használatára vonatkozó példákért tekintse meg a create_auto_cdc_from_snapshot_flow()rendszeres pillanatkép-betöltési és az előzmény-pillanatkép-betöltési példákat.
Paraméterek
| Paraméter | Típus | Description |
|---|---|---|
target |
str |
Szükséges. A frissíteni kívánt tábla neve. A create_streaming_table() függvénnyel a függvény végrehajtása előtt létrehozhatja a céltáblát create_auto_cdc_from_snapshot_flow() . |
source |
str vagy lambda function |
Szükséges. Egy táblázat vagy nézet neve, amely rendszeres időközönként pillanatképet hoz létre, vagy egy Python lambda függvény, amely visszaadja a feldolgozandó pillanatkép dataFrame-et és a pillanatkép verzióját. Lásd : Az source argumentum implementálása. |
keys |
list |
Szükséges. Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Ez annak azonosítására szolgál, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira. Megadhatja az alábbiakat:
|
stored_as_scd_type |
str vagy int |
A rekordok tárolása 1. vagy 2. SCD-típusként. Állítsa be 1 formátumra SCD 1-es típushoz, vagy 2 formátumra SCD 2-es típushoz. Az alapértelmezett scd típus 1. |
track_history_column_list vagy track_history_except_column_list |
list |
A céltábla történetéhez nyomon követendő kimeneti oszlopok egy részhalmaza. A követendő oszlopok teljes listájának megadására használható track_history_column_list . A nyomon követésből kizárandó oszlopok megadására használható track_history_except_column_list . Az érték deklarálható sztringek listájaként vagy Spark SQL-függvényként col() :
A függvények argumentumai col() nem tartalmazhatnak minősítőket. Használhatja például, col(userId)de nem használhatja col(source.userId). Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha nem track_history_column_list vagy track_history_except_column_list argumentumot ad át a függvénynek. |
Az source argumentum megvalósítása
A create_auto_cdc_from_snapshot_flow() függvény tartalmazza az argumentumot source . Az előzmény pillanatképek feldolgozásához az source argumentum egy Python lambda függvény lesz, amely két értéket ad vissza a create_auto_cdc_from_snapshot_flow() függvénynek: a feldolgozandó pillanatképadatokat tartalmazó Python DataFrame-et és egy pillanatkép-verziót.
A lambda függvény aláírása a következő:
lambda Any => Optional[(DataFrame, Any)]
- A lambda függvény argumentuma a legutóbb feldolgozott pillanatkép verziója.
- A lambda függvény
Nonevisszatérési értéke vagy két értékből álló rekord: A rekord első értéke a feldolgozandó pillanatképet tartalmazó DataFrame. A tuple második értéke a pillanatkép verzió, amely a pillanatkép logikai sorrendjét képviseli.
Példa a lambda függvény implementálására és meghívására:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)
A Lakeflow Spark Deklaratív folyamatok futtatókörnyezete a következő lépéseket hajtja végre minden alkalommal, amikor a függvényt create_auto_cdc_from_snapshot_flow() tartalmazó folyamat aktiválódik:
- A függvény futtatásával
next_snapshot_and_versionbetölti a következő pillanatkép-adatkeretet és a megfelelő pillanatkép-verziót. - Ha a DataFrame nem ad vissza, a futtatás leáll, és a folyamatfrissítés befejezettként van megjelölve.
- Észleli az új pillanatkép módosításait, és növekményesen alkalmazza őket a céltáblára.
- Visszatér az 1. lépéshez, hogy betöltse a következő pillanatképet és annak verzióját.