AUTO CDC INTO (pipelines)

Använd instruktionen AUTO CDC ... INTO för att skapa ett flöde som använder Lakeflow Spark Deklarativa pipelines för ändringsdatainsamling (CDC). Den här instruktionen läser ändringar från en CDC-källa och tillämpar dem på ett strömningsmål.

Syntax

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Du definierar datakvalitetsbegränsningar för målet med samma CONSTRAINT sats som andra pipelinefrågor. Se avsnittet Hantera datakvalitet med pipeline-förväntningar.

Standardbeteendet för INSERT och UPDATE händelser är att uppdatera CDC-händelser från källan: uppdatera alla rader i måltabellen som matchar de angivna nycklarna eller infoga en ny rad när en matchande post inte finns i måltabellen. Hantering av DELETE händelser kan anges med villkoret APPLY AS DELETE WHEN .

Viktigt!

Du måste deklarera en måldataströmstabell för att tillämpa ändringarna på den. Du kan också ange schemat för måltabellen. När du anger schemat för måltabellen för SCD-typ 2-tabeller måste du även inkludera kolumnerna __START_AT och __END_AT med samma datatyp som sequence_by fältet.

Se API:er för AUTOMATISK CDC: Förenkla insamling av ändringsdata med pipelines.

Parameterar

  • flow_name

    Namnet på flödet som ska skapas.

  • source

    Datakällan. Källan måste vara en strömmande källa. Använd stream-nyckelordet för att använda strömmande semantik för att läsa från källan. Om läsningen påträffar en ändring eller borttagning av en befintlig post utlöses ett fel. Det är säkrast att läsa från statiska eller endast tilläggskällor. Om du vill mata in data som har ändringsincheckningar kan du använda Python och SkipChangeCommits alternativet för att hantera fel.

    Mer information om strömmande data finns i Transformera data med pipelines.

  • KEYS

    Kolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Värdena i dessa kolumner används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen.

    Om du vill definiera en kombination av kolumner använder du en kommaavgränsad lista med kolumner.

    Den här satsen är nödvändig.

  • IGNORE NULL UPDATES

    Tillåter inmatning av uppdateringar som innehåller en delmängd av målkolumnerna. När en CDC-händelse matchar en befintlig rad och IGNORE NULL UPDATES har angetts behåller kolumner med ett null värde sina befintliga värden i målet. Detta gäller även kapslade kolumner med ett null värde.

    Den här satsen är valfri.

    Standardvärdet är att skriva över befintliga kolumner med null värden.

  • APPLY AS DELETE WHEN

    Anger när en CDC-händelse bör behandlas som en DELETE i stället för en uppdatering.

    För SCD-typ 2-källor behålls den borttagna raden tillfälligt som en gravsten i den underliggande Delta-tabellen för att hantera oordnade data, och en vy skapas i metaarkivet som filtrerar bort dessa gravstenar. Kvarhållningsintervallet kan konfigureras med pipelines.cdc.tombstoneGCThresholdInSecondstabellegenskapen.

    Den här satsen är valfri.

  • APPLY AS TRUNCATE WHEN

    Anger när en CDC-händelse ska behandlas som en fullständig tabell TRUNCATE. Eftersom den här satsen utlöser en fullständig trunkering av måltabellen bör den endast användas för specifika användningsfall som kräver den här funktionen.

    APPLY AS TRUNCATE WHEN Satsen stöds endast för SCD-typ 1. SCD-typ 2 stöder inte trunceringsoperationen.

    Den här satsen är valfri.

  • SEQUENCE BY

    Kolumnnamnet som anger den logiska ordningen för CDC-händelser i källdata. Bearbetningen i pipelinen använder den här sekvenseringen för att hantera ändringshändelser som kommer i oordning.

    Om flera kolumner behövs för sekvensering använder du ett STRUCT uttryck: det sorteras efter det första structfältet först, sedan efter det andra fältet om det finns ett oavgjort resultat och så vidare.

    Angivna kolumner måste vara sorterbara datatyper.

    Den här satsen krävs.

  • COLUMNS

    Anger en delmängd av kolumner som ska inkluderas i måltabellen. Du kan antingen:

    • Ange den fullständiga listan med kolumner som ska inkluderas: COLUMNS (userId, name, city).
    • Ange en lista med kolumner som ska undantas: COLUMNS * EXCEPT (operation, sequenceNum)

    Den här satsen är valfri.

    Standardvärdet är att inkludera alla kolumner i måltabellen COLUMNS när satsen inte har angetts.

  • STORED AS

    Om poster ska lagras som SCD-typ 1 eller SCD-typ 2.

    Den här satsen är valfri.

    Standardvärdet är SCD typ 1.

  • TRACK HISTORY ON

    Anger en delmängd av resultatkolumner för att generera historikposter när det sker ändringar i de specificerade kolumnerna. Du kan antingen:

    • Ange den fullständiga listan med kolumner som ska spåras: COLUMNS (userId, name, city).
    • Ange en lista över kolumner som ska undantas från spårning: COLUMNS * EXCEPT (operation, sequenceNum)

    Den här satsen är valfri. Standardvärdet är att spåra historiken för alla utdatakolumner när det finns ändringar, motsvarande TRACK HISTORY ON *.

Examples

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);