Sdílet prostřednictvím


AUTO CDC INTO (kanály)

Pomocí příkazu AUTO CDC ... INTO vytvořte tok, který používá funkcionalitu deklarativních potrubí Lakeflow Spark pro zachytávání změn dat (CDC). Tento příkaz čte změny ze zdroje CDC a použije je na cíl streamování.

Syntaxe

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Omezení kvality dat pro cíl definujete pomocí stejné CONSTRAINT klauzule jako jiné dotazy kanálu. Viz Spravujte kvalitu dat pomocí požadavků na datový potrubí.

Výchozí chování INSERT a UPDATE události slouží k přenesení událostí CDC ze zdroje: aktualizujte všechny řádky v cílové tabulce, které odpovídají zadaným klíčům, nebo vložte nový řádek, pokud v cílové tabulce neexistuje odpovídající záznam. Zpracování událostí DELETE lze zadat pomocí APPLY AS DELETE WHEN podmínky.

Důležité

Abyste mohli změny použít, musíte deklarovat cílovou streamovací tabulku. Volitelně můžete zadat schéma cílové tabulky. U tabulek SCD typu 2 je nutné při zadávání schématu cílové tabulky zahrnout také sloupce __START_AT a __END_AT se stejným datovým typem jako pole sequence_by.

Viz rozhraní API AUTO CDC: Zjednodušte zachytávání změn dat pomocí pipelin.

Parametry

  • flow_name

    Název toku, který chcete vytvořit.

  • source

    Zdroj dat. Zdrojem musí být zdroj streamování . Pomocí klíčového slova STREAM můžete ke čtení ze zdroje použít sémantiku streamování. Pokud čtení narazí na změnu nebo odstranění existujícího záznamu, vyvolá se chyba. Je nejbezpečnější číst ze statických nebo doplňovacích zdrojů. K ingestci dat, která mají commity změn, můžete použít Python a možnost SkipChangeCommits pro zpracování chyb.

    Další informace o streamovaných datech najdete v tématu Transformace dat pomocí kanálů.

  • KEYS

    Sloupec nebo kombinace sloupců, které jednoznačně identifikují řádek ve zdrojových datech. Hodnoty v těchto sloupcích slouží k identifikaci událostí CDC, které se vztahují na konkrétní záznamy v cílové tabulce.

    Pokud chcete definovat kombinaci sloupců, použijte čárkami oddělený seznam sloupců.

    Tato klauzule je povinná.

  • IGNORE NULL UPDATES

    Umožňuje ingestování aktualizací obsahujících podmnožinu cílových sloupců. Pokud se událost CDC shoduje s existujícím řádkem a je zadána funkce IGNORE NULL UPDATES, sloupce s null hodnotou si zachovají své stávající hodnoty v cíli. To platí i pro vnořené sloupce s null hodnotou.

    Tato klauzule je nepovinná.

    Výchozí hodnotou je přepsání existujících sloupců null hodnotami.

  • APPLY AS DELETE WHEN

    Určuje, kdy se má událost CDC považovat za událost DELETE , nikoli jako upsert.

    Pro zdroje typu SCD typu 2 je odstraněný řádek dočasně zachován jako náhrobek v podkladové tabulce Delta a v metastoru se vytvoří zobrazení, které filtruje tyto náhrobky. Interval uchovávání informací lze nakonfigurovat s pipelines.cdc.tombstoneGCThresholdInSecondsvlastností tabulky.

    Tato klauzule je nepovinná.

  • APPLY AS TRUNCATE WHEN

    Určuje, kdy má být událost CDC považována za úplnou tabulku TRUNCATE. Vzhledem k tomu, že tato klauzule aktivuje úplné zkrácení cílové tabulky, měla by být použita pouze pro konkrétní případy použití vyžadující tuto funkci.

    Klauzule APPLY AS TRUNCATE WHEN je podporována pouze pro SCD typu 1. ScD typu 2 nepodporuje operaci zkrácení.

    Tato klauzule je nepovinná.

  • SEQUENCE BY

    Název sloupce určující logické pořadí událostí CDC ve zdrojových datech. Zpracování potrubí používá toto sekvencování ke zpracování událostí změn, které přicházejí ve špatném pořadí.

    Pokud je pro sekvencování potřeba více sloupců, použijte STRUCT výraz: seřadí se podle prvního pole struktury, pak podle druhého pole v případě shody, a tak dále.

    Zadané sloupce musí být řaditelné datové typy.

    Tato klauzule je povinná.

  • COLUMNS

    Určuje podmnožinu sloupců, které se mají zahrnout do cílové tabulky. Máte tyto možnosti:

    • Zadejte úplný seznam sloupců, které se mají zahrnout: COLUMNS (userId, name, city).
    • Zadejte seznam sloupců, které chcete vyloučit: COLUMNS * EXCEPT (operation, sequenceNum)

    Tato klauzule je nepovinná.

    Výchozí hodnota je zahrnout všechny sloupce v cílové tabulce, pokud COLUMNS není klauzule zadána.

  • STORED AS

    Určuje, zda se mají ukládat záznamy jako SCD typu 1 nebo SCD typu 2.

    Tato klauzule je nepovinná.

    Výchozí hodnota je SCD typu 1.

  • TRACK HISTORY ON

    Určuje podmnožinu výstupních sloupců pro generování záznamů historie, pokud dojde k nějakým změnám těchto zadaných sloupců. Máte tyto možnosti:

    • Zadejte úplný seznam sloupců, které chcete sledovat: COLUMNS (userId, name, city).
    • Zadejte seznam sloupců, které se mají vyloučit ze sledování: COLUMNS * EXCEPT (operation, sequenceNum)

    Tato klauzule je nepovinná. Výchozí hodnota je sledovat historii všech výstupních sloupců, pokud dojde ke změnám, které jsou ekvivalentní TRACK HISTORY ON *.

Examples

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);