Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Replikera en extern RDBMS-tabell med hjälp av
Den här sidan beskriver hur du replikerar en tabell från ett externt hanteringssystem för relationsdatabaser (RDBMS) till Azure Databricks med hjälp av API:et AUTO CDC i pipelines. Du får lära dig:
- Vanliga mönster för att konfigurera källorna.
- Så här utför du en engångskopia av befintliga data med hjälp av ett
onceflöde. - Så här matar du in nya ändringar kontinuerligt med hjälp av ett
changeflöde.
Det här mönstret är idealiskt för att skapa scd-tabeller (långsamt föränderliga dimensionstabeller) eller för att hålla en måltabell synkroniserad med ett externt arkivhandlingssystem.
Innan du börjar
Den här guiden förutsätter att du har åtkomst till följande datauppsättningar från din källa:
- En fullständig ögonblicksbild av källtabellen i molnlagringen. Den här datamängden används för den första inläsningen.
- En kontinuerlig ändringsfeed som fylls i på samma molnlagringsplats (till exempel med hjälp av Debezium, Kafka eller loggbaserad CDC). Den här feeden är indata för den pågående
AUTO CDCprocessen.
Konfigurera källvyer
Definiera först två källvyer för att fylla i måltabellen rdbms_orders från en molnlagringssökväg orders_snapshot_path. Båda är byggda som strömmande vyer över rådata i molnlagring. Att använda vyer ger högre effektivitet eftersom data inte behöver skrivas innan de AUTO CDC används i processen.
- Den första källvyn är en fullständig ögonblicksbild (
full_orders_snapshot) - Den andra är en kontinuerlig ändringsfeed (
rdbms_orders_change_feed).
Exemplen i den här guiden använder molnlagring som källa, men du kan använda alla källor som stöds av strömmande tabeller.
full_orders_snapshot()
Det här steget skapar en pipeline med en vy som läser den ursprungliga fullständiga ögonblicksbilden av orderdatat.
python
Följande Python-exempel:
- Använder
spark.readStreammed Auto Loader (format("cloudFiles")) - Läser JSON-filer från en katalog som definierats av
orders_snapshot_path - Anger
includeExistingFilestilltrueför att säkerställa att historiska data som redan finns i sökvägen bearbetas - Ställer in
inferColumnTypestilltrueför att schemat ska härledas automatiskt - Returnerar alla kolumner med
.select("\*")
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
SQL
I följande SQL-exempel skickas alternativ som en karta över nyckel/värde-strängpar.
orders_snapshot_path vara tillgänglig som en SQL-variabel (till exempel definierad med hjälp av pipelineparametrar eller manuellt interpolerad).
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
Det här steget skapar en andra vy som läser inkrementella ändringsdata (till exempel från CDC-loggar eller ändringstabeller). Den läser från orders_cdc_path och förutsätter att CDC-liknande JSON-filer släpps i den här sökvägen regelbundet.
python
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
SQL
I följande SQL-exempel ${orders_cdc_path} är en variabel och kan interpoleras genom att ange ett värde i pipelineinställningarna eller uttryckligen ange en variabel i koden.
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
Initial hydrering (engångsflöde)
Nu när källorna har konfigurerats AUTO CDC sammanfogar logiken båda källorna till en målströmningstabell. Använd först ett engångsflöde med AUTO CDC för att kopiera det fullständiga innehållet i RDBMS-tabellen ONCE=TRUE till en strömmande tabell. Detta förbereder måltabellen med historiska data utan att spela upp den igen i framtida uppdateringar.
python
from pyspark import pipelines as dp
# Step 1: Create the target streaming table
dp.create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Flödet once körs bara en gång. Nya filer som läggs till efter att full_orders_snapshot pipelinen har skapats ignoreras.
Viktigt!
Om du utför en fullständig uppdatering på strömningstabellen rdbms_orders körs flödet once om. Om de första ögonblicksbildsdata i molnlagringen har tagits bort resulterar detta i dataförlust.
Kontinuerlig ändringsfeed (ändringsflöde)
Efter den första ögonblicksbildbelastningen använder du ett annat AUTO CDC flöde för att kontinuerligt mata in ändringar från RDBMS CDC-feed. Detta håller tabellen rdbms_orders uppdaterad med infogningar, uppdateringar och borttagningar.
python
from pyspark import pipelines as dp
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Överväganden
| Idempotens för återfyllnad | Ett once flöde körs bara igen när måltabellen är helt uppdaterad. |
|---|---|
| Flera flöden | Du kan använda flera ändringsflöden för att sammanslå korrigeringar, försenade data eller alternativa feeds, men alla måste dela ett schema och nycklar. |
| Fullständig uppdatering | En fullständig uppdatering av strömningstabellen rdbms_orders kör om flödet once igen. Detta kan leda till dataförlust om den första molnlagringsplatsen har rensat bort de första ögonblicksbildsdata. |
| Flödeskörningsordning | Flödets utförandeordning spelar ingen roll. Slutresultatet är detsamma. |
Ytterligare resurser
- Fullständigt hanterad SQL Server-anslutning i Lakeflow Connect