Dela via


CREATE STREAMING TABLE ... FLOW AUTO CDC

Gäller för:markerad ja Databricks SQL

Viktigt!

Den här funktionen finns i Beta. Kräver Databricks Runtime 17.3 och senare.

FLOW AUTO CDC Använd -satsen med CREATE STREAMING TABLE för att bearbeta CDC-poster (Change Data Capture) från en källa till en strömmande tabell.

Tidigare användes instruktionen MERGE INTO ofta för bearbetning av CDC-poster på Azure Databricks. Kan dock MERGE INTO ge felaktiga resultat på grund av poster som inte är sekvenserade eller kräver komplex logik för att ordna om poster.

AUTO CDC förenklar CDC genom att automatiskt hantera out-of-order-poster. Du anger nycklar för att identifiera poster, en sekvenskolumn för beställning och om resultat ska lagras som SCD typ 1 (direktuppdateringar) eller SCD typ 2 (historikspårning).

Syntax

CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Standardbeteendet för INSERT och UPDATE händelser är att uppdatera CDC-händelser från källan: uppdatera alla rader i måltabellen som matchar de angivna nycklarna eller infoga en ny rad när en matchande post inte finns i måltabellen. Hantering av DELETE händelser kan anges med villkoret APPLY AS DELETE WHEN .

Parameters

  • source

    Datakällan. Källan måste vara en strömmande källa. Använd nyckelordet STREAM för att använda strömmande semantik för att läsa från källan. Om läsningen påträffar en ändring eller borttagning av en befintlig post utlöses ett fel. Det är säkrast att läsa från statiska eller endast tilläggskällor.

    Mer information om strömmande data finns i Transformera data med pipelines.

  • KEYS

    Kolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Värdena i dessa kolumner används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen.

    Om du vill definiera en kombination av kolumner använder du en kommaavgränsad lista med kolumner.

    Den här satsen krävs.

  • IGNORE NULL UPDATES

    Tillåter inmatning av uppdateringar som innehåller en delmängd av målkolumnerna. När en CDC-händelse matchar en befintlig rad och IGNORE NULL UPDATES anges behåller kolumner med ett null värde sina befintliga värden i målet. Detta gäller även kapslade kolumner med ett null värde.

    Den här satsen är valfri.

    Standardvärdet är att skriva över befintliga kolumner med null värden.

  • APPLY AS DELETE WHEN

    Anger när en CDC-händelse bör behandlas som en DELETE i stället för en uppdatering.

    För SCD-typ 2-källor behålls den borttagna raden tillfälligt som en gravsten i den underliggande Delta-tabellen för att hantera oordnade data, och en vy skapas i metaarkivet som filtrerar bort dessa gravstenar. Kvarhållningsintervallet kan konfigureras med pipelines.cdc.tombstoneGCThresholdInSecondstabellegenskapen.

    Den här satsen är valfri.

  • APPLY AS TRUNCATE WHEN

    Anger när en CDC-händelse ska behandlas som en fullständig tabell TRUNCATE. Eftersom den här satsen utlöser en fullständig trunkering av måltabellen bör den endast användas för specifika användningsfall som kräver den här funktionen.

    APPLY AS TRUNCATE WHEN Satsen stöds endast för SCD-typ 1. SCD-typ 2 stöder inte trunceringsoperationen.

    Den här satsen är valfri.

  • SEQUENCE BY

    Kolumnnamnet som anger den logiska ordningen för CDC-händelser i källdata. Bearbetningen i pipelinen använder den här sekvenseringen för att hantera ändringshändelser som kommer i oordning.

    Om flera kolumner behövs för sekvensering använder du ett STRUCT uttryck: det sorteras efter det första structfältet först, sedan efter det andra fältet om det finns ett oavgjort resultat och så vidare.

    Angivna kolumner måste vara sorterbara datatyper.

    Den här satsen krävs.

  • COLUMNS

    Anger en delmängd av kolumner som ska inkluderas i måltabellen. Du kan antingen:

    • Ange den fullständiga listan med kolumner som ska inkluderas: COLUMNS (userId, name, city).
    • Ange en lista med kolumner som ska undantas: COLUMNS * EXCEPT (operation, sequenceNum)

    Den här satsen är valfri.

    Standardvärdet är att inkludera alla kolumner i måltabellen COLUMNS när satsen inte har angetts.

  • STORED AS

    Om poster ska lagras som SCD-typ 1 eller SCD-typ 2.

    Den här satsen är valfri.

    Standardvärdet är SCD typ 1.

  • TRACK HISTORY ON

    Anger en delmängd av resultatkolumner för att generera historikposter när det sker ändringar i de specificerade kolumnerna. Du kan antingen:

    • Ange den fullständiga listan med kolumner som ska spåras: COLUMNS (userId, name, city).
    • Ange en lista över kolumner som ska undantas från spårning: COLUMNS * EXCEPT (operation, sequenceNum)

    Den här satsen är valfri. Standardvärdet är att spåra historiken för alla utdatakolumner när det finns ändringar, motsvarande TRACK HISTORY ON *.

Exempel

-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  SEQUENCE BY sequenceNum
  STORED AS SCD TYPE 1;

-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2;

-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);