Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Van toepassing op:
Databricks SQL
Belangrijk
Deze functie bevindt zich in de bètaversie. Vereist Databricks Runtime 17.3 en hoger.
Gebruik de FLOW AUTO CDC component waarmee CREATE STREAMING TABLE u CDC-records (Change Data Capture) van een bron in een streamingtabel wilt verwerken.
Voorheen werd de MERGE INTO instructie vaak gebruikt voor het verwerken van CDC-records in Azure Databricks.
MERGE INTO kan echter onjuiste resultaten produceren vanwege niet-opeenvolgende records of kan complexe logica vereisen om records opnieuw te ordenen.
AUTO CDC vereenvoudigt CDC door automatisch out-of-orderrecords af te handelen. U geeft sleutels op voor het identificeren van records, een reekskolom voor ordenen en of u resultaten wilt opslaan als SCD-type 1 (directe updates) of SCD-type 2 (geschiedenistracering).
Syntaxis
CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Het standaardgedrag voor INSERT en UPDATE gebeurtenissen is om CDC-gebeurtenissen uit de bron te upsert: werk alle rijen in de doeltabel bij die overeenkomen met de opgegeven sleutels of voeg een nieuwe rij in wanneer er geen overeenkomende record in de doeltabel bestaat. Verwerking voor DELETE gebeurtenissen kan worden opgegeven met de APPLY AS DELETE WHEN voorwaarde.
Parameterwaarden
sourceDe bron voor de gegevens. De bron moet een streamingbron zijn. Gebruik het
STREAMtrefwoord om streamingsemantiek toe te passen en uit de bron te lezen. Als de leesbewerking een wijziging of verwijdering van een bestaande record tegenkomt, wordt er een fout gegenereerd. Het is het veiligst om te lezen uit statische of alleen bij te voegen bronnen.Zie Gegevens transformeren met pijplijnenvoor meer informatie over het streamen van gegevens.
KEYSDe kolom of combinatie van kolommen waarmee een rij in de brongegevens uniek wordt geïdentificeerd. De waarden in deze kolommen worden gebruikt om te bepalen welke CDC-gebeurtenissen van toepassing zijn op specifieke records in de doeltabel.
Als u een combinatie van kolommen wilt definiëren, gebruikt u een door komma's gescheiden lijst met kolommen.
Deze clausule is vereist.
IGNORE NULL UPDATESHiermee kunt u updates opnemen die een subset van de doelkolommen bevatten. Wanneer een CDC-gebeurtenis overeenkomt met een bestaande rij en
IGNORE NULL UPDATESwordt opgegeven, behouden kolommen met eennullwaarde hun bestaande waarden in het doel. Dit geldt ook voor geneste kolommen met eennullwaarde.Deze component is optioneel.
De standaardinstelling is om bestaande kolommen met
nullwaarden te overschrijven.APPLY AS DELETE WHENHiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een
DELETEin plaats van een upsert.Voor SCD-bronnen van type 2 wordt de verwijderde rij tijdelijk bewaard als een "tombstone" in de onderliggende Delta-tabel. In de metastore wordt een weergave gemaakt die deze tombstones eruit filtert. Het bewaarinterval kan worden geconfigureerd met de
pipelines.cdc.tombstoneGCThresholdInSecondstabeleigenschap.Deze component is optioneel.
APPLY AS TRUNCATE WHENHiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een volledige tabel
TRUNCATE. Omdat deze clausule een volledige truncatie van de doeltabel activeert, moet deze alleen worden gebruikt voor specifieke use cases die deze functionaliteit vereisen.De
APPLY AS TRUNCATE WHENcomponent wordt alleen ondersteund voor SCD-type 1. SCD-type 2 biedt geen ondersteuning voor de truncatie-operatie.Deze component is optioneel.
SEQUENCE BYDe kolomnaam waarmee de logische volgorde van CDC-gebeurtenissen in de brongegevens wordt opgegeven. De pijplijnverwerking maakt gebruik van deze volgordebepaling om wijzigingsevenementen die buiten volgorde aankomen, af te handelen.
Als er meerdere kolommen nodig zijn voor sequentiëren, gebruikt u een
STRUCTexpressie: deze wordt eerst gerangschikt op het eerste structveld, vervolgens door het tweede veld als er een gelijkspel is, enzovoort.Opgegeven kolommen moeten sorteerbare gegevenstypen zijn.
Deze clausule is vereist.
COLUMNSHiermee geeft u een subset van kolommen op die moeten worden opgenomen in de doeltabel. U kunt het volgende doen:
- Geef de volledige lijst met kolommen op die u wilt opnemen:
COLUMNS (userId, name, city). - Geef een lijst met kolommen op die u wilt uitsluiten:
COLUMNS * EXCEPT (operation, sequenceNum)
Deze component is optioneel.
De standaardinstelling is om alle kolommen in de doeltabel op te nemen wanneer de
COLUMNScomponent niet is opgegeven.- Geef de volledige lijst met kolommen op die u wilt opnemen:
STORED ASOm te bepalen of records moeten worden opgeslagen als SCD-type 1 of SCD-type 2.
Deze component is optioneel.
De standaardwaarde is SCD type 1.
TRACK HISTORY ONHiermee geeft u een subset van uitvoerkolommen op voor het genereren van geschiedenisrecords wanneer er wijzigingen zijn in die opgegeven kolommen. U kunt het volgende doen:
- Geef de volledige lijst met kolommen op die u wilt bijhouden:
COLUMNS (userId, name, city). - Geef een lijst op met kolommen die moeten worden uitgesloten van het bijhouden:
COLUMNS * EXCEPT (operation, sequenceNum)
Deze component is optioneel. De standaardinstelling is het bijhouden van de geschiedenis voor alle uitvoerkolommen wanneer er wijzigingen zijn, gelijk aan
TRACK HISTORY ON *.- Geef de volledige lijst met kolommen op die u wilt bijhouden:
Examples
-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;
-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;
-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);