Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Gäller för:
Databricks SQL
Viktigt!
Den här funktionen finns i Beta. Kräver Databricks Runtime 17.3 och senare.
FLOW AUTO CDC Använd -satsen med CREATE STREAMING TABLE för att bearbeta CDC-poster (Change Data Capture) från en källa till en strömmande tabell.
Tidigare användes instruktionen MERGE INTO ofta för bearbetning av CDC-poster på Azure Databricks. Kan dock MERGE INTO ge felaktiga resultat på grund av poster som inte är sekvenserade eller kräver komplex logik för att ordna om poster.
AUTO CDC förenklar CDC genom att automatiskt hantera out-of-order-poster. Du anger nycklar för att identifiera poster, en sekvenskolumn för beställning och om resultat ska lagras som SCD typ 1 (direktuppdateringar) eller SCD typ 2 (historikspårning).
Syntax
CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Standardbeteendet för INSERT och UPDATE händelser är att uppdatera CDC-händelser från källan: uppdatera alla rader i måltabellen som matchar de angivna nycklarna eller infoga en ny rad när en matchande post inte finns i måltabellen. Hantering av DELETE händelser kan anges med villkoret APPLY AS DELETE WHEN .
Parameters
sourceDatakällan. Källan måste vara en strömmande källa. Använd nyckelordet
STREAMför att använda strömmande semantik för att läsa från källan. Om läsningen påträffar en ändring eller borttagning av en befintlig post utlöses ett fel. Det är säkrast att läsa från statiska eller endast tilläggskällor.Mer information om strömmande data finns i Transformera data med pipelines.
KEYSKolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Värdena i dessa kolumner används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen.
Om du vill definiera en kombination av kolumner använder du en kommaavgränsad lista med kolumner.
Den här satsen krävs.
IGNORE NULL UPDATESTillåter inmatning av uppdateringar som innehåller en delmängd av målkolumnerna. När en CDC-händelse matchar en befintlig rad och
IGNORE NULL UPDATESanges behåller kolumner med ettnullvärde sina befintliga värden i målet. Detta gäller även kapslade kolumner med ettnullvärde.Den här satsen är valfri.
Standardvärdet är att skriva över befintliga kolumner med
nullvärden.APPLY AS DELETE WHENAnger när en CDC-händelse bör behandlas som en
DELETEi stället för en uppdatering.För SCD-typ 2-källor behålls den borttagna raden tillfälligt som en gravsten i den underliggande Delta-tabellen för att hantera oordnade data, och en vy skapas i metaarkivet som filtrerar bort dessa gravstenar. Kvarhållningsintervallet kan konfigureras med
pipelines.cdc.tombstoneGCThresholdInSecondstabellegenskapen.Den här satsen är valfri.
APPLY AS TRUNCATE WHENAnger när en CDC-händelse ska behandlas som en fullständig tabell
TRUNCATE. Eftersom den här satsen utlöser en fullständig trunkering av måltabellen bör den endast användas för specifika användningsfall som kräver den här funktionen.APPLY AS TRUNCATE WHENSatsen stöds endast för SCD-typ 1. SCD-typ 2 stöder inte trunceringsoperationen.Den här satsen är valfri.
SEQUENCE BYKolumnnamnet som anger den logiska ordningen för CDC-händelser i källdata. Bearbetningen i pipelinen använder den här sekvenseringen för att hantera ändringshändelser som kommer i oordning.
Om flera kolumner behövs för sekvensering använder du ett
STRUCTuttryck: det sorteras efter det första structfältet först, sedan efter det andra fältet om det finns ett oavgjort resultat och så vidare.Angivna kolumner måste vara sorterbara datatyper.
Den här satsen krävs.
COLUMNSAnger en delmängd av kolumner som ska inkluderas i måltabellen. Du kan antingen:
- Ange den fullständiga listan med kolumner som ska inkluderas:
COLUMNS (userId, name, city). - Ange en lista med kolumner som ska undantas:
COLUMNS * EXCEPT (operation, sequenceNum)
Den här satsen är valfri.
Standardvärdet är att inkludera alla kolumner i måltabellen
COLUMNSnär satsen inte har angetts.- Ange den fullständiga listan med kolumner som ska inkluderas:
STORED ASOm poster ska lagras som SCD-typ 1 eller SCD-typ 2.
Den här satsen är valfri.
Standardvärdet är SCD typ 1.
TRACK HISTORY ONAnger en delmängd av resultatkolumner för att generera historikposter när det sker ändringar i de specificerade kolumnerna. Du kan antingen:
- Ange den fullständiga listan med kolumner som ska spåras:
COLUMNS (userId, name, city). - Ange en lista över kolumner som ska undantas från spårning:
COLUMNS * EXCEPT (operation, sequenceNum)
Den här satsen är valfri. Standardvärdet är att spåra historiken för alla utdatakolumner när det finns ändringar, motsvarande
TRACK HISTORY ON *.- Ange den fullständiga listan med kolumner som ska spåras:
Exempel
-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;
-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;
-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);