Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of mappen te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen om mappen te wijzigen.
Declaratieve Lakeflow Spark-pijplijnen vereenvoudigen het vastleggen van wijzigingsgegevens (CDC) met de AUTO CDC- en AUTO CDC FROM SNAPSHOT-API. Deze API's automatiseren de complexiteit van het berekenen van langzaam veranderende dimensies (SCD) Type 1 en Type 2 vanuit een CDC-feed of databasemomentopnamen. Zie Gegevens vastleggen en momentopnamen wijzigen voor meer informatie over deze concepten.
Opmerking
De AUTO CDC API's vervangen de APPLY CHANGES API's en hebben dezelfde syntaxis. De APPLY CHANGES API's zijn nog steeds beschikbaar, maar Databricks raadt het gebruik van de AUTO CDC API's aan.
De API die u gebruikt, is afhankelijk van de bron van uw wijzigingsgegevens:
-
AUTO CDC: Gebruik deze optie wanneer voor de brondatabase een CDC-feed is ingeschakeld.AUTO CDCverwerkt wijzigingen uit een gegevenswijzigingenfeed (CDF). Het wordt ondersteund in zowel de SQL-pijplijn- als Python-interfaces. -
AUTO CDC FROM SNAPSHOT: Gebruik deze optie wanneer CDC niet is ingeschakeld voor de brondatabase en alleen momentopnamen beschikbaar zijn. Met deze API worden momentopnamen vergeleken om wijzigingen te bepalen en vervolgens worden ze verwerkt. Het wordt alleen ondersteund in de Python-interface.
Beide API's ondersteunen het bijwerken van tabellen met SCD-type 1 en Type 2:
- Gebruik SCD Type 1 om records rechtstreeks bij te werken. Geschiedenis wordt niet bewaard voor bijgewerkte gegevens.
- Gebruik SCD Type 2 om een geschiedenis van records te bewaren, hetzij op alle soorten updates of op updates voor een opgegeven set kolommen.
De AUTO CDC API's worden niet ondersteund door declaratieve Apache Spark-pijplijnen.
Zie AUTO CDC INTO (pijplijnen),create_auto_cdc_flow en create_auto_cdc_from_snapshot_flow voor syntaxis en andere verwijzingen.
Opmerking
Op deze pagina wordt beschreven hoe u tabellen in uw pijplijnen bijwerkt op basis van wijzigingen in brongegevens. Zie Delta Lake-wijzigingenfeed gebruiken in Azure Databricks voor informatie over het vastleggen en opvragen van gegevens op rijniveau voor Delta-tabellen.
Requirements
Als u de CDC-API's wilt gebruiken, moet uw pijplijn zijn geconfigureerd voor het gebruik van serverloze SDP of de SDP Pro of Advancededities.
Hoe AUTO CDC werkt
Als u CDC-verwerking wilt uitvoeren, maakt u een streamingtabel met AUTO CDC en gebruikt u vervolgens de AUTO CDC ... INTO-instructie in SQL of de create_auto_cdc_flow()-functie in Python om de bron, sleutels en volgordebepaling voor de wijzigingenfeed op te geven. Zie Gegevens vastleggen en momentopnamen wijzigen voor een uitleg van de werking van sequentiëren en SCD-logica. Zie de AUTO CDC-voorbeelden.
Voor initiële hydratatie vanuit een bron met een change feed, maakt u gebruik van AUTO CDC met een once-stroom en vervolgt u het verwerken van de change feed. Zie Een externe RDBMS-tabel repliceren met behulp van AUTO CDC.
Zie AUTO CDC INTO (pijplijnen) of create_auto_cdc_flow voor syntaxisdetails.
Hoe AUTO CDC FROM SNAPSHOT werkt
AUTO CDC FROM SNAPSHOT bepaalt wijzigingen in brongegevens door momentopnamen in volgorde te vergelijken. Deze wordt alleen ondersteund in de Python-pijplijninterface. U kunt momentopnamen uit een Delta-tabel, cloudopslagbestanden of JDBC rechtstreeks lezen.
Als u AUTO CDC FROM SNAPSHOT voor CDC-verwerking wilt uitvoeren, maakt u een streamingtabel aan en gebruikt u vervolgens de create_auto_cdc_from_snapshot_flow()-functie om de momentopname, sleutels en andere argumenten op te geven. Zie Momentopnameverwerkingspatronen voor meer informatie over de twee innamepatronen en wanneer u elk moet gebruiken. Zie de voorbeelden van AUTO CDC FROM SNAPSHOT.
Zie create_auto_cdc_from_snapshot_flow voor meer informatie over de syntaxis.
Meerdere kolommen gebruiken voor sequentiëren
Als u wilt sorteren op meerdere kolommen (bijvoorbeeld een tijdstempel en een ID om de volgorde te bepalen), gebruikt u een STRUCT om deze te combineren. De API sorteert eerst op het eerste veld, en in het geval van een gelijke waarde, op het tweede veld, enzovoort.
SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)
Python
sequence_by = struct("timestamp_col", "id_col")
Auto CDC-voorbeelden
In de volgende voorbeelden ziet u de verwerking van SCD Type 1 en Type 2 met behulp van een gegevensfeedbron voor wijzigingen. Met de voorbeeldgegevens worden nieuwe gebruikersrecords gemaakt, een gebruikersrecord verwijderd en worden gebruikersrecords bijgewerkt. In het SCD-type 1-voorbeeld komen de laatste UPDATE bewerkingen te laat en worden ze verwijderd uit de doeltabel, waarmee de verwerking van gebeurtenissen buiten de order wordt gedemonstreerd.
Hier volgen de invoerrecords die in deze voorbeelden worden gebruikt. Deze gegevens worden gemaakt door de query uit te voeren in de sectie Voorbeeldgegevens maken .
| userId | naam | city | operatie | volgordenummer |
|---|---|---|---|---|
| 124 | Raul | Oaxaca | INSERT | 1 |
| 123 | Isabel | Monterrey | INSERT | 1 |
| 125 | Mercedes | Tijuana | INSERT | 2 |
| 126 | Lelie | Cancun | INSERT | 2 |
| 123 | Nul | Nul | Verwijderen | 6 |
| 125 | Mercedes | Guadalajara | UPDATE | 6 |
| 125 | Mercedes | Mexicali | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
Als u de opmerkingen bij de laatste rij in de query voor het genereren van voorbeeldgegevens ongedaan maakt, wordt de volgende record ingevoegd die aangeeft dat de tabel moet worden afgekapt (wis de tabel) op sequenceNum=3:
| userId | naam | city | operatie | volgordenummer |
|---|---|---|---|---|
| Nul | Nul | Nul | AFKNOTTEN | 3 |
Opmerking
Alle volgende voorbeelden bevatten opties om zowel als DELETETRUNCATE bewerkingen op te geven, maar elk is optioneel.
Voorbeeldgegevens maken
Voer de volgende instructies uit om een voorbeeldgegevensset te maken. Deze code is niet bedoeld om te worden uitgevoerd als onderdeel van een pijplijndefinitie. Voer deze uit vanuit de verkenningsmap van uw pijplijn in plaats van de map transformaties.
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
SCD Type 1-updates verwerken
SCD Type 1 bewaart alleen de nieuwste versie van elke record. In het volgende voorbeeld wordt de bovenstaande wijzigingengegevensstroom gelezen en worden wijzigingen toegepast op een streamingtabeldoel. Ontwikkel declaratieve Pijplijnen van Lakeflow Spark om deze code uit te voeren.
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_current")
dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
CREATE OR REFRESH STREAMING TABLE users_current;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_current
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Nadat het SCD-type 1-voorbeeld is uitgevoerd, bevat de doeltabel de volgende records:
| userId | naam | city |
|---|---|---|
| 124 | Raul | Oaxaca |
| 125 | Mercedes | Guadalajara |
| 126 | Lelie | Cancun |
Gebruiker 123 (Isabel) is verwijderd en wordt niet weergegeven. Gebruiker 125 (Mercedes) toont alleen de meest recente stad (Guadalajara) omdat SCD Type 1 eerdere waarden overschrijft. De eerdere UPDATE bij sequenceNum=5 is verwijderd omdat er later een update bij sequenceNum=6 arriveerde.
Nadat u het voorbeeld hebt uitgevoerd met de TRUNCATE record niet van commentaar voorzien, wordt bij sequenceNum=3 de tabel gewist. Dit betekent dat records 124 en 126 zich niet in de tabel bevinden en dat de uiteindelijke doeltabel alleen de volgende record bevat:
| userId | naam | city |
|---|---|---|
| 125 | Mercedes | Guadalajara |
SCD Type 2-updates verwerken
SCD Type 2 behoudt een volledige geschiedenis van wijzigingen door nieuwe rijen te maken voor elke versie van een record, met __START_AT en __END_AT kolommen die aangeven wanneer elke versie actief was.
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Nadat het SCD-type 2-voorbeeld is uitgevoerd, bevat de doeltabel de volgende records:
| userId | naam | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Monterrey | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | Raul | Oaxaca | 1 | Nul |
| 125 | Mercedes | Tijuana | 2 | 5 |
| 125 | Mercedes | Mexicali | 5 | 6 |
| 125 | Mercedes | Guadalajara | 6 | Nul |
| 126 | Lelie | Cancun | 2 | Nul |
De tabel behoudt de volledige geschiedenis. Gebruiker 123 heeft twee versies (beëindigd op reeks 6 wanneer deze is verwijderd). Gebruiker 125 heeft drie versies met stadswijzigingen. Records met __END_AT = null zijn momenteel actief.
Een kolomsubset bijhouden met SCD Type 2
SCD Type 2 maakt standaard een nieuwe versie wanneer een kolomwaarde wordt gewijzigd. U kunt een subset van kolommen opgeven die u wilt bijhouden, zodat wijzigingen in andere kolommen de huidige versie bijwerken in plaats van een nieuwe geschiedenisrecord te genereren.
In het volgende voorbeeld wordt de city kolom uitgesloten van het bijhouden van geschiedenis:
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Omdat city wijzigingen niet worden bijgehouden, overschrijven stadsupdates de huidige rij in plaats van een nieuwe versie te maken. De doeltabel bevat de volgende records:
| userId | naam | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | Raul | Oaxaca | 1 | Nul |
| 125 | Mercedes | Guadalajara | 2 | Nul |
| 126 | Lelie | Cancun | 2 | Nul |
VOORBEELDEN VAN AUTO CDC FROM SNAPSHOT
De volgende secties bieden voorbeelden van het gebruik van AUTO CDC FROM SNAPSHOT om momentopnamen te verwerken tot SCD Type 1- of Type 2-doeltabellen. Zie Gegevens vastleggen en momentopnamen wijzigen voor achtergrondinformatie over het gebruik van deze API.
Voorbeeld: Momentopnamen verwerken met behulp van tijd voor pijplijnopname
Gebruik deze methode wanneer momentopnamen regelmatig en in volgorde binnenkomen en u kunt vertrouwen op de tijdstempel van de pijplijnuitvoering voor versiebeheer. Bij elke pijplijnupdate wordt een nieuwe momentopname opgenomen.
U kunt momentopnamen van meerdere brontypen lezen, waaronder Delta-tabellen, cloudopslagbestanden en JDBC-verbindingen.
Stap 1: Voorbeeldgegevens maken
Maak een tabel met momentopnamegegevens. Voer de volgende code uit vanuit een notebook of Databricks SQL in de explorations map van uw pijplijn:
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);
INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');
Stap 2: AUTO CDC UITVOEREN VANAF MOMENTOPNAME
Ontwikkel declaratieve Pijplijnen van Lakeflow Spark om de code in deze stap uit te voeren.
Kies een brontype voor de weergave van momentopname (de voorbeeldcode genereert een Delta-tabel):
Optie A: Lezen uit een Delta-tabel
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")
Optie B: Lezen uit cloudopslag
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")
Optie C: Lezen uit JDBC (alleen klassiek berekenen)
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
Alle opties naar het doel schrijven
Voeg vervolgens de doeltabel toe en voeg de stroom toe:
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)
Na de eerste pijplijnuitvoering worden alle records als actieve rijen ingevoegd.
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | Nul |
| 2 | Monterrey | 0 | Nul |
| 3 | Tijuana | 0 | Nul |
Opmerking
Als u in plaats daarvan SCD Type 1 wilt gebruiken en alleen de huidige status wilt behouden, stelt u in stored_as_scd_type=1. In dit geval bevat de doeltabel de kolommen __START_AT en __END_AT niet.
Stap 3: Een nieuwe momentopname simuleren en opnieuw uitvoeren
Werk de brontabel bij om een nieuwe momentopname te simuleren die binnenkomt (voer deze code uit vanuit een notebook of SQL-bestand in de explorations map van uw pipline):
TRUNCATE TABLE main.cdc_tutorial.snapshot;
INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');
Voer de pijplijn opnieuw uit.
AUTO CDC FROM SNAPSHOT vergelijkt de nieuwe momentopname met de vorige en detecteert dat gebruiker 1 is verwijderd, gebruikers 2 en 3 zijn bijgewerkt en gebruikers 4 en 6 zijn ingevoegd. Hiermee wordt een wijzigingenfeed gegenereerd en gebruikt AUTO CDC om de uitvoertabel te maken.
Na de tweede iteratie met SCD Type 2 bevat de doeltabel de volgende gegevens:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | 1 |
| 2 | Monterrey | 0 | 1 |
| 2 | Carmel | 1 | Nul |
| 3 | Tijuana | 0 | 1 |
| 3 | Los Angeles | 1 | Nul |
| 4 | Death Valley | 1 | Nul |
| 6 | Kings Canyon | 1 | Nul |
Gebruiker 1 is beëindigd (verwijderd). Gebruikers 2 en 3 hebben elk twee versies met hun stadswijzigingen. Gebruikers 4 en 6 zijn nieuw ingevoegd.
Na de tweede uitvoering met SCD Type 1 geeft de doeltabel alleen de huidige status weer:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Death Valley |
| 6 | Kings Canyon |
Voorbeeld: Momentopnamen verwerken met behulp van versiefuncties
Gebruik deze methode wanneer u expliciet controle nodig hebt over het ordenen van momentopnamen. Gebruik deze benadering bijvoorbeeld wanneer meerdere momentopnamen tegelijkertijd aankomen of momentopnamen niet in de volgorde komen. U schrijft een functie die aangeeft welke momentopname moet worden verwerkt volgende en het bijbehorende versienummer. De API verwerkt momentopnamen in oplopende versievolgorde:
- Als meerdere momentopnamen zich in de opslag bevinden, worden ze allemaal op volgorde verwerkt.
- Als een momentopname niet in orde komt (bijvoorbeeld
snapshot_3komt nasnapshot_4), wordt deze overgeslagen. - Als er geen nieuwe momentopnamen zijn, wordt de functie geretourneerd
Noneen vindt er geen verwerking plaats.
Stap 1: Momentopnamebestanden voorbereiden
Maak CSV-bestanden met momentopnamegegevens en voeg ze toe aan een volume- of cloudopslaglocatie. Geef de bestanden chronologisch een naam (bijvoorbeeld snapshot_1.csv, snapshot_2.csv).
Elk bestand moet kolommen bevatten voor userId en city. Voorbeeld:
snapshot_1.csv:
| userId | city |
|---|---|
| 1 | Oaxaca |
| 2 | Monterrey |
| 3 | Tijuana |
snapshot_2.csv:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Death Valley |
Stap 2: AUTO CDC FROM SNAPSHOT uitvoeren met een versiefunctie
Maak een nieuw notebook en plak de volgende pijplijncode. Ontwikkel vervolgens declaratieve Pijplijnen van Lakeflow Spark.
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data
files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]
snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue
snapshot_versions.sort()
if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None
snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)
dp.create_streaming_table("main.cdc_tutorial.target_versioned")
dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
Opmerking
Als u in plaats daarvan SCD Type 1 wilt gebruiken, stelt u in stored_as_scd_type=1.
Na de verwerking snapshot_1.csv bevat de doeltabel de volgende records:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | Nul |
| 2 | Monterrey | 1 | Nul |
| 3 | Tijuana | 1 | Nul |
Na de verwerking snapshot_2.csv bevat de doeltabel de volgende records:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | 2 |
| 2 | Monterrey | 1 | 2 |
| 2 | Carmel | 2 | Nul |
| 3 | Tijuana | 1 | 2 |
| 3 | Los Angeles | 2 | Nul |
| 4 | Death Valley | 2 | Nul |
Opmerking
Houd er rekening mee dat voor SCD Type 1 de tabel er precies zo uitziet als de meest recente momentopname. Het verschil is dat downstreamquery's de wijzigingenfeed kunnen gebruiken om alleen gewijzigde records te verwerken.
Stap 3: Nieuwe momentopnamen toevoegen
Voeg een nieuw CSV-bestand toe aan de opslaglocatie met gewijzigde gegevens (bijvoorbeeld gewijzigde plaatswaarden, nieuwe rijen of verwijderde rijen). Voer vervolgens de pijplijn opnieuw uit om de nieuwe momentopname te verwerken.
Beperkingen
- De kolom voor sequentiëren moet een sorteerbaar gegevenstype zijn.
NULLSequentiërende waarden worden niet ondersteund. -
AUTO CDC FROM SNAPSHOTwordt alleen ondersteund in de Python-pijplijninterface; de SQL-interface wordt niet ondersteund.
Aanvullende bronnen
- Gegevens vastleggen en momentopnamen wijzigen: meer informatie over CDC-concepten, momentopnamen en SCD-typen.
-
Repliceer een externe RDBMS-tabel met behulp van
AUTO CDC: Leer hoe u initiële hydratatie met eenoncestroom uitvoert en vervolgens doorgaat met het verwerken van wijzigingen. - Geavanceerde onderwerpen over AUTO CDC: Leer over wijzigingsbewerkingen op AUTO CDC-doelen, het lezen van wijzigingsgegevensfeeds en verwerkingsmetriek.
- Zelfstudie: Een ETL-pijplijn bouwen met behulp van change data capture