Megosztás a következőn keresztül:


AUTO CDC INTO (folyamatok)

AUTO CDC ... INTO Az utasítással állítson be egy folyamatot, amely a Lakeflow Spark Deklaratív Pipeline-ek módosítási adatrögzítési (CDC) funkcióját használja. Ez az utasítás beolvassa egy CDC-forrás módosításait, és alkalmazza őket egy streamelési célobjektumra.

Szemantika

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

A cél adatminőségi korlátozásait ugyanazzal CONSTRAINT a záradékkal határozhatja meg, mint a többi folyamat lekérdezését. Lásd: Adatminőség kezelése folyamatelvárásokkal.

Az INSERT és UPDATE események alapértelmezett viselkedése az, hogy a forrásból származó CDC-eseményeket frissíti vagy beszúrja: frissíti a céltábla azon sorait, amelyek megfelelnek a megadott kulcs(ok)nak, vagy új sort szúr be, ha egyező rekord nem található a céltáblában. DELETE események kezelése a APPLY AS DELETE WHEN feltétellel meghatározható.

Fontos

A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A 2. típusú SCD-táblák esetében a céltábla sémájának megadásakor a __START_AT mezővel azonos adattípusú __END_AT és sequence_by oszlopokat is tartalmaznia kell.

Lásd az AUTO CDC API-k: Egyszerűsítse a változáskövető adatrögzítést a csővezetékekkel.

Paraméterek

  • flow_name

    A létrehozandó folyamat neve.

  • source

    Az adatok forrása. A forrásnak streamelési forrásnak kell lennie. A STREAM kulcsszóval stream-szemantikát használhat a forrásból való olvasáshoz. Ha az olvasás egy meglévő rekord módosítását vagy törlését tapasztalja, hibaüzenet jelenik meg. A legbiztonságosabb, ha statikus vagy csak hozzáfűző forrásokból olvas. A módosítási véglegesítéseket tartalmazó adatok betöltéséhez használhatja a Pythont és a SkipChangeCommits hibák kezelésére szolgáló lehetőséget.

    További információért az adatfolyamokról, tekintse meg a Folyamatokkal történő adatátalakítást.

  • KEYS

    Azok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Az oszlopokban szereplő értékek segítségével azonosíthatja, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira.

    Az oszlopok kombinációjának meghatározásához használjon vesszővel tagolt oszloplistát.

    Ez a záradék kötelező.

  • IGNORE NULL UPDATES

    Lehetővé teszi a céloszlopok egy részhalmazát tartalmazó frissítések betöltését. Ha egy CDC-esemény megfelel egy meglévő sornak, és a NULL FRISSÍTÉSEK FIGYELMEN KÍVÜL HAGYása beállítás meg van adva, az értékekkel rendelkező null oszlopok megőrzik a célban meglévő értékeiket. Ez a null értékkel rendelkező beágyazott oszlopokra is vonatkozik.

    Ez a záradék nem kötelező.

    Az alapértelmezett az, hogy a meglévő oszlopokat a null értékekkel írjuk felül.

  • APPLY AS DELETE WHEN

    Megadja, hogy a CDC-eseményeket mikor kell DELETE-ként, és nem upsertként kezelni.

    A 2. típusú SCD-források esetében a rendezetlen adatok kezelésére a rendszer ideiglenesen megőrzi a törölt sort a háttérben lévő Delta-táblában, és létrehoz egy nézetet a metaadattárban, amely kiszűri ezeket a sírköveket. A megőrzési időköz a pipelines.cdc.tombstoneGCThresholdInSeconds használatával konfigurálható.

    Ez a záradék nem kötelező.

  • APPLY AS TRUNCATE WHEN

    Megadja, hogy a CDC-események mikor legyenek teljes táblaként TRUNCATEkezelve. Mivel ez a záradék a céltábla teljes csonkját aktiválja, csak a funkciót igénylő konkrét használati esetekhez használható.

    A APPLY AS TRUNCATE WHEN záradék csak az 1. SCD-típus esetében támogatott. A 2. SCD-típus nem támogatja a csonkolási műveletet.

    Ez a záradék nem kötelező.

  • SEQUENCE BY

    Az oszlop neve, amely a CDC-események logikai sorrendjét adja meg a forrásadatokban. A pipeline feldolgozás ezzel a szekvenálással kezeli a nem sorrendben érkező változási eseményeket.

    Ha több oszlopra van szükség a szekvenáláshoz, használjon egy STRUCT kifejezést: először az első struct mező, majd a második mező szerint rendezi, ha van egy döntetlen, és így tovább.

    A megadott oszlopoknak rendezhető adattípusoknak kell lenniük.

    Ez a záradék kötelező.

  • COLUMNS

    A céltáblában szerepeltetni kívánt oszlopok egy részhalmazát adja meg. A következő lehetőségek közül választhat:

    • Adja meg a belefoglalandó oszlopok teljes listáját: COLUMNS (userId, name, city).
    • Adja meg a kizárandó oszlopok listáját: COLUMNS * EXCEPT (operation, sequenceNum)

    Ez a záradék nem kötelező.

    Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha a COLUMNS záradék nincs megadva.

  • STORED AS

    A rekordok tárolása 1. vagy 2. SCD-típusként.

    Ez a záradék nem kötelező.

    Az alapértelmezett scd típus 1.

  • TRACK HISTORY ON

    A kimeneti oszlopok egy részhalmazát adja meg, amely előzményrekordokat hoz létre a megadott oszlopok módosításakor. A következő lehetőségek közül választhat:

    • Adja meg a követendő oszlopok teljes listáját: COLUMNS (userId, name, city).
    • Adja meg a nyomon követésből kizárandó oszlopok listáját: COLUMNS * EXCEPT (operation, sequenceNum)

    Ez a záradék nem kötelező. Az alapértelmezett beállítás az összes kimeneti oszlop előzményeinek nyomon követése, ha bármilyen változás történt, ami egyenértékű azokkal TRACK HISTORY ON *.

Példák

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);