Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
AUTO CDC ... INTO Az utasítással állítson be egy folyamatot, amely a Lakeflow Spark Deklaratív Pipeline-ek módosítási adatrögzítési (CDC) funkcióját használja. Ez az utasítás beolvassa egy CDC-forrás módosításait, és alkalmazza őket egy streamelési célobjektumra.
- A CDC-vel kapcsolatos további információkért lásd : Mi az a változási adatrögzítés (CDC)?.
- A
AUTO CDChasználatáról további információkat a „AUTO CDC API-k: Az adatrögzítés egyszerűsítése folyamatokkal” című témakörben talál. - További részletekért
CREATE FLOWlásd a CREATE FLOW (pipeline-ek) című témakört.
Szemantika
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
A cél adatminőségi korlátozásait ugyanazzal CONSTRAINT a záradékkal határozhatja meg, mint a többi folyamat lekérdezését. Lásd: Adatminőség kezelése folyamatelvárásokkal.
Az INSERT és UPDATE események alapértelmezett viselkedése az, hogy a forrásból származó CDC-eseményeket frissíti vagy beszúrja: frissíti a céltábla azon sorait, amelyek megfelelnek a megadott kulcs(ok)nak, vagy új sort szúr be, ha egyező rekord nem található a céltáblában.
DELETE események kezelése a APPLY AS DELETE WHEN feltétellel meghatározható.
Fontos
A módosítások alkalmazásához deklarálnia kell egy célstreamelési táblát. Igény szerint megadhatja a céltábla sémáját. A 2. típusú SCD-táblák esetében a céltábla sémájának megadásakor a __START_AT mezővel azonos adattípusú __END_AT és sequence_by oszlopokat is tartalmaznia kell.
Lásd az AUTO CDC API-k: Egyszerűsítse a változáskövető adatrögzítést a csővezetékekkel.
Paraméterek
flow_nameA létrehozandó folyamat neve.
sourceAz adatok forrása. A forrásnak streamelési forrásnak kell lennie. A STREAM kulcsszóval stream-szemantikát használhat a forrásból való olvasáshoz. Ha az olvasás egy meglévő rekord módosítását vagy törlését tapasztalja, hibaüzenet jelenik meg. A legbiztonságosabb, ha statikus vagy csak hozzáfűző forrásokból olvas. A módosítási véglegesítéseket tartalmazó adatok betöltéséhez használhatja a Pythont és a
SkipChangeCommitshibák kezelésére szolgáló lehetőséget.További információért az adatfolyamokról, tekintse meg a Folyamatokkal történő adatátalakítást.
KEYSAzok az oszlopok vagy oszlopok kombinációja, amelyek egyedileg azonosítják a forrásadatok sorait. Az oszlopokban szereplő értékek segítségével azonosíthatja, hogy mely CDC-események vonatkoznak a céltábla adott rekordjaira.
Az oszlopok kombinációjának meghatározásához használjon vesszővel tagolt oszloplistát.
Ez a záradék kötelező.
IGNORE NULL UPDATESLehetővé teszi a céloszlopok egy részhalmazát tartalmazó frissítések betöltését. Ha egy CDC-esemény megfelel egy meglévő sornak, és a NULL FRISSÍTÉSEK FIGYELMEN KÍVÜL HAGYása beállítás meg van adva, az értékekkel rendelkező
nulloszlopok megőrzik a célban meglévő értékeiket. Ez anullértékkel rendelkező beágyazott oszlopokra is vonatkozik.Ez a záradék nem kötelező.
Az alapértelmezett az, hogy a meglévő oszlopokat a
nullértékekkel írjuk felül.APPLY AS DELETE WHENMegadja, hogy a CDC-eseményeket mikor kell
DELETE-ként, és nem upsertként kezelni.A 2. típusú SCD-források esetében a rendezetlen adatok kezelésére a rendszer ideiglenesen megőrzi a törölt sort a háttérben lévő Delta-táblában, és létrehoz egy nézetet a metaadattárban, amely kiszűri ezeket a sírköveket. A megőrzési időköz a
pipelines.cdc.tombstoneGCThresholdInSecondshasználatával konfigurálható.Ez a záradék nem kötelező.
APPLY AS TRUNCATE WHENMegadja, hogy a CDC-események mikor legyenek teljes táblaként
TRUNCATEkezelve. Mivel ez a záradék a céltábla teljes csonkját aktiválja, csak a funkciót igénylő konkrét használati esetekhez használható.A
APPLY AS TRUNCATE WHENzáradék csak az 1. SCD-típus esetében támogatott. A 2. SCD-típus nem támogatja a csonkolási műveletet.Ez a záradék nem kötelező.
SEQUENCE BYAz oszlop neve, amely a CDC-események logikai sorrendjét adja meg a forrásadatokban. A pipeline feldolgozás ezzel a szekvenálással kezeli a nem sorrendben érkező változási eseményeket.
Ha több oszlopra van szükség a szekvenáláshoz, használjon egy
STRUCTkifejezést: először az első struct mező, majd a második mező szerint rendezi, ha van egy döntetlen, és így tovább.A megadott oszlopoknak rendezhető adattípusoknak kell lenniük.
Ez a záradék kötelező.
COLUMNSA céltáblában szerepeltetni kívánt oszlopok egy részhalmazát adja meg. A következő lehetőségek közül választhat:
- Adja meg a belefoglalandó oszlopok teljes listáját:
COLUMNS (userId, name, city). - Adja meg a kizárandó oszlopok listáját:
COLUMNS * EXCEPT (operation, sequenceNum)
Ez a záradék nem kötelező.
Az alapértelmezett érték az, hogy az összes oszlopot belefoglalja a céltáblába, ha a
COLUMNSzáradék nincs megadva.- Adja meg a belefoglalandó oszlopok teljes listáját:
STORED ASA rekordok tárolása 1. vagy 2. SCD-típusként.
Ez a záradék nem kötelező.
Az alapértelmezett scd típus 1.
TRACK HISTORY ONA kimeneti oszlopok egy részhalmazát adja meg, amely előzményrekordokat hoz létre a megadott oszlopok módosításakor. A következő lehetőségek közül választhat:
- Adja meg a követendő oszlopok teljes listáját:
COLUMNS (userId, name, city). - Adja meg a nyomon követésből kizárandó oszlopok listáját:
COLUMNS * EXCEPT (operation, sequenceNum)
Ez a záradék nem kötelező. Az alapértelmezett beállítás az összes kimeneti oszlop előzményeinek nyomon követése, ha bármilyen változás történt, ami egyenértékű azokkal
TRACK HISTORY ON *.- Adja meg a követendő oszlopok teljes listáját:
Példák
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);