Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of mappen te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen om mappen te wijzigen.
Een externe RDBMS-tabel repliceren met behulp van
Op deze pagina wordt uitgelegd hoe u een tabel repliceert vanuit een extern relationeel databasebeheersysteem (RDBMS) naar Azure Databricks met behulp van de AUTO CDC API in pijplijnen. U leert het volgende:
- Algemene patronen voor het instellen van de bronnen.
- Een eenmalige volledige kopie van de bestaande gegevens uitvoeren met behulp van een
oncestroom. - Nieuwe wijzigingen continu opnemen met behulp van een
changeproces.
Dit patroon is ideaal voor het bouwen van langzaam veranderende dimensietabellen (SCD) of het synchroniseren van een doeltabel met een extern recordsysteem.
Voordat u begint
In deze handleiding wordt ervan uitgegaan dat u toegang hebt tot de volgende gegevenssets vanuit uw bron:
- Een volledige momentopname van de brontabel in cloudopslag. Deze gegevensset wordt gebruikt voor de eerste belasting.
- Een continue wijzigingsfeed, gevuld in dezelfde cloudopslaglocatie (bijvoorbeeld met behulp van Debezium, Kafka of op logboek gebaseerde CDC). Deze feed is de invoer voor het lopende
AUTO CDCproces.
Bronweergaven instellen
Definieer eerst twee bronweergaven om de rdbms_orders doeltabel te vullen vanuit een cloudopslagpad orders_snapshot_path. Beide zijn gebouwd als streamingweergaven over onbewerkte gegevens in cloudopslag. Het gebruik van weergaven biedt een hogere efficiëntie omdat de gegevens niet hoeven te worden geschreven voordat ze in het AUTO CDC proces worden gebruikt.
- De eerste bronweergave is een volledige momentopname (
full_orders_snapshot) - De tweede is een continue wijzigingsfeed (
rdbms_orders_change_feed).
In de voorbeelden in deze handleiding wordt cloudopslag gebruikt als bron, maar u kunt elke bron gebruiken die wordt ondersteund door streamingtabellen.
full_orders_snapshot()
Met deze stap maakt u een pijplijn met een weergave die de eerste volledige momentopname van de ordergegevens leest.
Python
Het volgende Python-voorbeeld:
- Gebruikt
spark.readStreammet Auto Loader (format("cloudFiles")) - Leest JSON-bestanden uit een map die is gedefinieerd door
orders_snapshot_path - Sets
includeExistingFilesintruezodat historische gegevens die al in het pad zijn aanwezig, worden verwerkt - Stelt
inferColumnTypesin optrueom het schema automatisch af te leiden - Retourneert alle kolommen met
.select("\*")
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
SQL
In het volgende SQL-voorbeeld worden opties doorgegeven als een kaart met sleutel-waardeparen voor tekenreeksen.
orders_snapshot_path moet beschikbaar zijn als een SQL-variabele (bijvoorbeeld gedefinieerd met behulp van pijplijnparameters of handmatig geïnterpoleerd).
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
Met deze stap maakt u een tweede weergave die incrementele wijzigingsgegevens leest (bijvoorbeeld uit CDC-logboeken of wijzigingstabellen). Het leest uit orders_cdc_path en gaat ervan uit dat JSON-bestanden in CDC-stijl regelmatig in dit pad worden geplaatst.
Python
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
SQL
In het volgende SQL-voorbeeld ${orders_cdc_path} is dit een variabele en kan worden geïnterpoleerd door een waarde in uw pijplijninstellingen in te stellen of expliciet een variabele in uw code in te stellen.
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
Initiële hydratatie (bij aanvang van de stroom)
Nu de bronnen zijn ingesteld, AUTO CDC voegt de logic beide bronnen samen in een doelstreamingtabel. Gebruik eerst een eenmalige AUTO CDC flow met ONCE=TRUE om de volledige inhoud van de RDBMS-tabel naar een streamingtabel te kopiëren. Hiermee wordt de doeltabel voorbereid met historische gegevens zonder deze in toekomstige updates opnieuw uit te voeren.
Python
from pyspark import pipelines as dp
# Step 1: Create the target streaming table
dp.create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
De once proces wordt slechts één keer uitgevoerd. Nieuwe bestanden die worden toegevoegd full_orders_snapshot nadat de pijplijn is gemaakt, worden genegeerd.
Belangrijk
Wanneer u een volledige vernieuwing uitvoert in de rdbms_orders streamingtabel, wordt het once proces opnieuw uitgevoerd. Als de eerste momentopnamegegevens in cloudopslag zijn verwijderd, leidt dit tot gegevensverlies.
Doorlopende wijzigingenfeed (wijzigingsstroom)
Nadat de eerste momentopname is geladen, gebruikt u een andere AUTO CDC stroom om voortdurend wijzigingen op te nemen van de CDC-feed van RDBMS. Hierdoor blijft uw rdbms_orders tabel up-to-date met invoegingen, updates en verwijderingen.
Python
from pyspark import pipelines as dp
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Overwegingen
| Aanvullen idempotentie | Een once stroom wordt alleen opnieuw uitgevoerd wanneer de doeltabel volledig wordt vernieuwd. |
|---|---|
| Meerdere stromen | U kunt meerdere wijzigingsstromen gebruiken om samen te voegen in correcties, late binnenkomende gegevens of alternatieve feeds, maar alle moeten een schema en sleutels delen. |
| Volledig vernieuwen | Met een volledige vernieuwing van de rdbms_orders streamingtabel wordt de once flow opnieuw uitgevoerd. Dit kan leiden tot gegevensverlies als de initiële cloudopslaglocatie de initiële momentopnamegegevens heeft verwijderd. |
| Stroomuitvoeringsvolgorde | De volgorde van de stroomuitvoering maakt niet uit. Het eindresultaat is hetzelfde. |
Aanvullende bronnen
- Volledig beheerde SQL Server-connector in Lakeflow Connect