Delen via


CREATE STREAMING TABLE ... STROOM AUTO CDC

Van toepassing op:aangevinkt als ja Databricks SQL

Belangrijk

Deze functie bevindt zich in de bètaversie. Vereist Databricks Runtime 17.3 en hoger.

Gebruik de FLOW AUTO CDC component waarmee CREATE STREAMING TABLE u CDC-records (Change Data Capture) van een bron in een streamingtabel wilt verwerken.

Voorheen werd de MERGE INTO instructie vaak gebruikt voor het verwerken van CDC-records in Azure Databricks. MERGE INTO kan echter onjuiste resultaten produceren vanwege niet-opeenvolgende records of kan complexe logica vereisen om records opnieuw te ordenen.

AUTO CDC vereenvoudigt CDC door automatisch out-of-orderrecords af te handelen. U geeft sleutels op voor het identificeren van records, een reekskolom voor ordenen en of u resultaten wilt opslaan als SCD-type 1 (directe updates) of SCD-type 2 (geschiedenistracering).

Syntaxis

CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Het standaardgedrag voor INSERT en UPDATE gebeurtenissen is om CDC-gebeurtenissen uit de bron te upsert: werk alle rijen in de doeltabel bij die overeenkomen met de opgegeven sleutels of voeg een nieuwe rij in wanneer er geen overeenkomende record in de doeltabel bestaat. Verwerking voor DELETE gebeurtenissen kan worden opgegeven met de APPLY AS DELETE WHEN voorwaarde.

Parameterwaarden

  • source

    De bron voor de gegevens. De bron moet een streamingbron zijn. Gebruik het STREAM trefwoord om streamingsemantiek toe te passen en uit de bron te lezen. Als de leesbewerking een wijziging of verwijdering van een bestaande record tegenkomt, wordt er een fout gegenereerd. Het is het veiligst om te lezen uit statische of alleen bij te voegen bronnen.

    Zie Gegevens transformeren met pijplijnenvoor meer informatie over het streamen van gegevens.

  • KEYS

    De kolom of combinatie van kolommen waarmee een rij in de brongegevens uniek wordt geïdentificeerd. De waarden in deze kolommen worden gebruikt om te bepalen welke CDC-gebeurtenissen van toepassing zijn op specifieke records in de doeltabel.

    Als u een combinatie van kolommen wilt definiëren, gebruikt u een door komma's gescheiden lijst met kolommen.

    Deze clausule is vereist.

  • IGNORE NULL UPDATES

    Hiermee kunt u updates opnemen die een subset van de doelkolommen bevatten. Wanneer een CDC-gebeurtenis overeenkomt met een bestaande rij en IGNORE NULL UPDATES wordt opgegeven, behouden kolommen met een null waarde hun bestaande waarden in het doel. Dit geldt ook voor geneste kolommen met een null waarde.

    Deze component is optioneel.

    De standaardinstelling is om bestaande kolommen met null waarden te overschrijven.

  • APPLY AS DELETE WHEN

    Hiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een DELETE in plaats van een upsert.

    Voor SCD-bronnen van type 2 wordt de verwijderde rij tijdelijk bewaard als een "tombstone" in de onderliggende Delta-tabel. In de metastore wordt een weergave gemaakt die deze tombstones eruit filtert. Het bewaarinterval kan worden geconfigureerd met de pipelines.cdc.tombstoneGCThresholdInSecondstabeleigenschap.

    Deze component is optioneel.

  • APPLY AS TRUNCATE WHEN

    Hiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een volledige tabel TRUNCATE. Omdat deze clausule een volledige truncatie van de doeltabel activeert, moet deze alleen worden gebruikt voor specifieke use cases die deze functionaliteit vereisen.

    De APPLY AS TRUNCATE WHEN component wordt alleen ondersteund voor SCD-type 1. SCD-type 2 biedt geen ondersteuning voor de truncatie-operatie.

    Deze component is optioneel.

  • SEQUENCE BY

    De kolomnaam waarmee de logische volgorde van CDC-gebeurtenissen in de brongegevens wordt opgegeven. De pijplijnverwerking maakt gebruik van deze volgordebepaling om wijzigingsevenementen die buiten volgorde aankomen, af te handelen.

    Als er meerdere kolommen nodig zijn voor sequentiëren, gebruikt u een STRUCT expressie: deze wordt eerst gerangschikt op het eerste structveld, vervolgens door het tweede veld als er een gelijkspel is, enzovoort.

    Opgegeven kolommen moeten sorteerbare gegevenstypen zijn.

    Deze clausule is vereist.

  • COLUMNS

    Hiermee geeft u een subset van kolommen op die moeten worden opgenomen in de doeltabel. U kunt het volgende doen:

    • Geef de volledige lijst met kolommen op die u wilt opnemen: COLUMNS (userId, name, city).
    • Geef een lijst met kolommen op die u wilt uitsluiten: COLUMNS * EXCEPT (operation, sequenceNum)

    Deze component is optioneel.

    De standaardinstelling is om alle kolommen in de doeltabel op te nemen wanneer de COLUMNS component niet is opgegeven.

  • STORED AS

    Om te bepalen of records moeten worden opgeslagen als SCD-type 1 of SCD-type 2.

    Deze component is optioneel.

    De standaardwaarde is SCD type 1.

  • TRACK HISTORY ON

    Hiermee geeft u een subset van uitvoerkolommen op voor het genereren van geschiedenisrecords wanneer er wijzigingen zijn in die opgegeven kolommen. U kunt het volgende doen:

    • Geef de volledige lijst met kolommen op die u wilt bijhouden: COLUMNS (userId, name, city).
    • Geef een lijst op met kolommen die moeten worden uitgesloten van het bijhouden: COLUMNS * EXCEPT (operation, sequenceNum)

    Deze component is optioneel. De standaardinstelling is het bijhouden van de geschiedenis voor alle uitvoerkolommen wanneer er wijzigingen zijn, gelijk aan TRACK HISTORY ON *.

Examples

-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  SEQUENCE BY sequenceNum
  STORED AS SCD TYPE 1;

-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2;

-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);