Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Replizieren einer externen RDBMS-Tabelle mithilfe von
Diese Seite führt Sie durch das Replizieren einer Tabelle aus einem externen relationalen Datenbankverwaltungssystem (RDBMS) in Azure Databricks mithilfe der AUTO CDC API in Pipelines. Sie lernen Folgendes:
- Allgemeine Muster zum Einrichten der Quellen.
- So führen Sie eine einmalige vollständige Kopie der vorhandenen Daten mithilfe eines
onceFlusses aus. - Wie Sie neue Änderungen kontinuierlich mithilfe eines
changeFlusses aufnehmen.
Dieses Muster eignet sich ideal für das Erstellen langsam ändernder Dimensionstabellen (SCD) oder das Synchronisieren einer Zieltabelle mit einem externen Datensatzsystem.
Bevor Sie anfangen
In diesem Leitfaden wird davon ausgegangen, dass Sie zugriff auf die folgenden Datasets aus Ihrer Quelle haben:
- Eine vollständige Momentaufnahme der Quelltabelle im Cloudspeicher. Dieses Dataset wird für die anfängliche Last verwendet.
- Ein fortlaufender Änderungsfeed, der in denselben Cloudspeicherort aufgefüllt wird (z. B. mithilfe von Debezium, Kafka oder protokollbasiertem CDC). Dieser Feed ist die Eingabe für den laufenden
AUTO CDCProzess.
Einrichten von Quellansichten
Definieren Sie zunächst zwei Quellansichten, um die rdbms_orders Zieltabelle aus einem Cloudspeicherpfad orders_snapshot_pathaufzufüllen. Beide werden als Streamingansichten über Rohdaten im Cloudspeicher erstellt. Die Verwendung von Datenansichten bietet eine höhere Effizienz, da die Daten nicht geschrieben werden müssen, bevor sie im AUTO CDC Prozess verwendet werden.
- Die erste Quellansicht ist eine vollständige Momentaufnahme (
full_orders_snapshot) - Die zweite ist ein fortlaufender Änderungsfeed (
rdbms_orders_change_feed).
In den Beispielen in diesem Leitfaden wird Cloudspeicher als Quelle verwendet, Sie können jedoch jede quelle verwenden, die von Streamingtabellen unterstützt wird.
full_orders_snapshot()
In diesem Schritt wird eine Pipeline mit einer Ansicht erstellt, die den anfänglichen vollständigen Schnappschuss der Auftragsdaten liest.
Python
Das folgende Python-Beispiel:
- Verwendet
spark.readStreammit Auto Loader (format("cloudFiles")) - Liest JSON-Dateien aus einem Verzeichnis, das durch
orders_snapshot_pathdefiniert ist. - Setzt
includeExistingFilesauftrue, um sicherzustellen, dass bereits im Pfad vorhandene historische Daten verarbeitet werden - Setzt
inferColumnTypesauftrue, um das Schema automatisch abzuleiten. - Gibt alle Spalten mit
.select("\*")
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
SQL
Im folgenden SQL-Beispiel werden Optionen als Zeichenfolgen-Schlüsselwertpaare übergeben.
orders_snapshot_path sollte als SQL-Variable verfügbar sein (z. B. mithilfe von Pipelineparametern definiert oder manuell interpoliert).
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
In diesem Schritt wird eine zweite Ansicht erstellt, in der inkrementelle Änderungsdaten gelesen werden (z. B. aus CDC-Protokollen oder Änderungstabellen). Es liest aus orders_cdc_path und geht davon aus, dass JSON-Dateien im CDC-Stil regelmäßig in diesem Pfad abgelegt werden.
Python
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
SQL
Im folgenden SQL-Beispiel ist ${orders_cdc_path} eine Variable, die interpoliert werden kann, indem ein Wert in den Pipeline-Einstellungen festgelegt oder explizit eine Variable im Code gesetzt wird.
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
Erste Hydratisierung (einmaliger Fluss)
Nachdem die Quellen eingerichtet wurden, führt die Logik beide Quellen in einer Zielstreamingtabelle zusammen. Verwenden Sie zuerst einen einmaligen AUTO CDC Flow mit ONCE=TRUE , um den gesamten Inhalt der RDBMS-Tabelle in eine Streamingtabelle zu kopieren. Dadurch wird die Zieltabelle mit historischen Daten vorbereitet, ohne sie in zukünftigen Updates wiederzuverwenden.
Python
from pyspark import pipelines as dp
# Step 1: Create the target streaming table
dp.create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Der once Fluss wird nur einmal ausgeführt. Neue Dateien, die full_orders_snapshot nach der Pipelineerstellung hinzugefügt werden, werden ignoriert.
Von Bedeutung
Wenn Sie eine vollständige Aktualisierung der rdbms_orders Streamingtabelle durchführen, wird der once Datenfluss erneut ausgeführt. Wenn die anfänglichen Momentaufnahmendaten im Cloudspeicher entfernt wurden, führt dies zu Datenverlust.
Fortlaufender Änderungsfeed (Änderungsfluss)
Verwenden Sie nach der ersten Momentaufnahme einen anderen AUTO CDC Fluss, um kontinuierlich Änderungen vom CDC-Feed des RDBMS aufzunehmen. Dadurch bleibt Ihre rdbms_orders Tabelle mit Einfügungen, Aktualisierungen und Löschungen auf dem neuesten Stand.
Python
from pyspark import pipelines as dp
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Überlegungen
| Idempotenz von Backfill | Ein once Ablauf läuft nur erneut, wenn die Zieltabelle vollständig aktualisiert wird. |
|---|---|
| Mehrere Flüsse | Sie können mehrere Änderungsflüsse verwenden, um Korrekturen, spät eintreffende Daten oder alternative Feeds zusammenzuführen, aber alle müssen ein Schema und Schlüssel freigeben. |
| Vollständige Aktualisierung | Eine vollständige Aktualisierung der rdbms_orders Streamingtabelle führt den once Flow erneut aus. Dies kann zu Datenverlust führen, wenn der anfängliche Cloudspeicherort die anfänglichen Momentaufnahmendaten entfernt hat. |
| Ablaufausführungsreihenfolge | Die Reihenfolge der Ablaufausführung spielt keine Rolle. Das Endergebnis ist identisch. |
Weitere Ressourcen
- Vollständig verwalteter SQL Server-Connector in Lakeflow Connect