Not
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Använd instruktionen AUTO CDC ... INTO för att skapa ett flöde som använder Lakeflow Spark Deklarativa pipelines för ändringsdatainsamling (CDC). Den här instruktionen läser ändringar från en CDC-källa och tillämpar dem på ett strömningsmål.
- Mer information om CDC finns i Vad är ändringsdatainsamling (CDC)?.
- Mer information om att använda
AUTO CDCfinns i De AUTOMATISKA CDC API:erna: Förenkla förändringsdatainsamling med pipelines. - Mer information om
CREATE FLOWfinns i CREATE FLOW (pipelines).
Syntax
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Du definierar datakvalitetsbegränsningar för målet med samma CONSTRAINT sats som andra pipelinefrågor. Se avsnittet Hantera datakvalitet med pipeline-förväntningar.
Standardbeteendet för INSERT och UPDATE händelser är att uppdatera CDC-händelser från källan: uppdatera alla rader i måltabellen som matchar de angivna nycklarna eller infoga en ny rad när en matchande post inte finns i måltabellen. Hantering av DELETE händelser kan anges med villkoret APPLY AS DELETE WHEN .
Viktigt!
Du måste deklarera en måldataströmstabell för att tillämpa ändringarna på den. Du kan också ange schemat för måltabellen. När du anger schemat för måltabellen för SCD-typ 2-tabeller måste du även inkludera kolumnerna __START_AT och __END_AT med samma datatyp som sequence_by fältet.
Se API:er för AUTOMATISK CDC: Förenkla insamling av ändringsdata med pipelines.
Parameterar
flow_nameNamnet på flödet som ska skapas.
sourceDatakällan. Källan måste vara en strömmande källa. Använd stream-nyckelordet för att använda strömmande semantik för att läsa från källan. Om läsningen påträffar en ändring eller borttagning av en befintlig post utlöses ett fel. Det är säkrast att läsa från statiska eller endast tilläggskällor. Om du vill mata in data som har ändringsincheckningar kan du använda Python och
SkipChangeCommitsalternativet för att hantera fel.Mer information om strömmande data finns i Transformera data med pipelines.
KEYSKolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Värdena i dessa kolumner används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen.
Om du vill definiera en kombination av kolumner använder du en kommaavgränsad lista med kolumner.
Den här satsen är nödvändig.
IGNORE NULL UPDATESTillåter inmatning av uppdateringar som innehåller en delmängd av målkolumnerna. När en CDC-händelse matchar en befintlig rad och IGNORE NULL UPDATES har angetts behåller kolumner med ett
nullvärde sina befintliga värden i målet. Detta gäller även kapslade kolumner med ettnullvärde.Den här satsen är valfri.
Standardvärdet är att skriva över befintliga kolumner med
nullvärden.APPLY AS DELETE WHENAnger när en CDC-händelse bör behandlas som en
DELETEi stället för en uppdatering.För SCD-typ 2-källor behålls den borttagna raden tillfälligt som en gravsten i den underliggande Delta-tabellen för att hantera oordnade data, och en vy skapas i metaarkivet som filtrerar bort dessa gravstenar. Kvarhållningsintervallet kan konfigureras med
pipelines.cdc.tombstoneGCThresholdInSecondstabellegenskapen.Den här satsen är valfri.
APPLY AS TRUNCATE WHENAnger när en CDC-händelse ska behandlas som en fullständig tabell
TRUNCATE. Eftersom den här satsen utlöser en fullständig trunkering av måltabellen bör den endast användas för specifika användningsfall som kräver den här funktionen.APPLY AS TRUNCATE WHENSatsen stöds endast för SCD-typ 1. SCD-typ 2 stöder inte trunceringsoperationen.Den här satsen är valfri.
SEQUENCE BYKolumnnamnet som anger den logiska ordningen för CDC-händelser i källdata. Bearbetningen i pipelinen använder den här sekvenseringen för att hantera ändringshändelser som kommer i oordning.
Om flera kolumner behövs för sekvensering använder du ett
STRUCTuttryck: det sorteras efter det första structfältet först, sedan efter det andra fältet om det finns ett oavgjort resultat och så vidare.Angivna kolumner måste vara sorterbara datatyper.
Den här satsen krävs.
COLUMNSAnger en delmängd av kolumner som ska inkluderas i måltabellen. Du kan antingen:
- Ange den fullständiga listan med kolumner som ska inkluderas:
COLUMNS (userId, name, city). - Ange en lista med kolumner som ska undantas:
COLUMNS * EXCEPT (operation, sequenceNum)
Den här satsen är valfri.
Standardvärdet är att inkludera alla kolumner i måltabellen
COLUMNSnär satsen inte har angetts.- Ange den fullständiga listan med kolumner som ska inkluderas:
STORED ASOm poster ska lagras som SCD-typ 1 eller SCD-typ 2.
Den här satsen är valfri.
Standardvärdet är SCD typ 1.
TRACK HISTORY ONAnger en delmängd av resultatkolumner för att generera historikposter när det sker ändringar i de specificerade kolumnerna. Du kan antingen:
- Ange den fullständiga listan med kolumner som ska spåras:
COLUMNS (userId, name, city). - Ange en lista över kolumner som ska undantas från spårning:
COLUMNS * EXCEPT (operation, sequenceNum)
Den här satsen är valfri. Standardvärdet är att spåra historiken för alla utdatakolumner när det finns ändringar, motsvarande
TRACK HISTORY ON *.- Ange den fullständiga listan med kolumner som ska spåras:
Examples
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);