Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Replikowanie zewnętrznej tabeli RDBMS przy użyciu
Na tej stronie przedstawiono sposób replikowania tabeli z zewnętrznego systemu zarządzania relacyjnymi bazami danych (RDBMS) do usługi Azure Databricks przy użyciu interfejsu AUTO CDC API w potokach. Dowiesz się:
- Typowe wzorce konfigurowania źródeł.
- Jak wykonać jednorazową pełną kopię istniejących danych za pomocą przepływu
once. - Jak ciągle pozyskiwać nowe zmiany przy użyciu przepływu
change.
Ten wzorzec jest idealny do tworzenia wolno zmieniających się tabel wymiarów (SCD) lub zachowywania tabeli docelowej w synchronizacji z zewnętrznym systemem rekordów.
Zanim rozpoczniesz
W tym przewodniku założono, że masz dostęp do następujących zestawów danych ze źródła:
- Pełna migawka tabeli źródłowej w chmurze. Ten zestaw danych jest używany do początkowego ładowania.
- Strumień zmian umieszczony w tej samej lokalizacji magazynu w chmurze (na przykład przy użyciu Debezium, Kafka lub CDC opartego na dzienniku). To źródło danych stanowi dane wejściowe dla trwającego
AUTO CDCprocesu.
Konfigurowanie widoków źródłowych
Najpierw zdefiniuj dwa widoki źródłowe, aby wypełnić tabelę docelową rdbms_orders ze ścieżki orders_snapshot_path magazynu w chmurze. Oba są tworzone jako widoki strumieniowe nad surowymi danymi w magazynie w chmurze. Użycie widoków zapewnia wyższą wydajność, ponieważ dane nie muszą być zapisywane przed użyciem AUTO CDC w procesie.
- Pierwszy widok źródłowy to pełna migawka (
full_orders_snapshot) - Drugi to ciągły strumień zmian (
rdbms_orders_change_feed).
Przykłady w tym przewodniku używają magazynu w chmurze jako źródła, ale można użyć dowolnego źródła obsługiwanego przez tabele przesyłania strumieniowego.
full_orders_snapshot()
Ten krok tworzy strumień, który korzysta z widoku odczytującego początkowy pełny zrzut danych dotyczących zamówień.
Python
Poniższy przykład w języku Python:
- Używa
spark.readStreamz Auto Loaderem (format("cloudFiles")) - Odczytuje pliki JSON z katalogu zdefiniowanego przez
orders_snapshot_path - Ustaw
includeExistingFilesnatrueaby zapewnić, że dane historyczne już obecne w ścieżce są przetwarzane - Ustawia
inferColumnTypesnatruew celu automatycznego wnioskowania schematu - Zwraca wszystkie kolumny z
.select("\*")
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
SQL
Poniższy przykład SQL przekazuje opcje jako mapę par klucz-wartość ciągu.
orders_snapshot_path powinna być dostępna jako zmienna SQL (na przykład zdefiniowana przy użyciu parametrów potoku lub ręcznie interpolowana).
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
Ten krok tworzy drugi widok, który odczytuje dane dotyczące przyrostowej zmiany (na przykład z dzienników CDC lub tabel zmian). Czyta z orders_cdc_path i zakłada, że pliki JSON w stylu CDC są regularnie umieszczane na tej ścieżce.
Python
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
SQL
W poniższym przykładzie SQL ${orders_cdc_path} jest zmienną, którą można interpolować, ustawiając jej wartość w ustawieniach potoku lub jawnie w kodzie.
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
Początkowe nawodnienie (po rozpoczęciu przepływu)
Po skonfigurowaniu AUTO CDC źródeł logika scala oba źródła w docelową tabelę przesyłania strumieniowego. Najpierw użyj jednorazowego przepływu AUTO CDC z ONCE=TRUE, aby skopiować pełną zawartość tabeli RDBMS do tabeli strumieniowej. Spowoduje to przygotowanie tabeli docelowej z danymi historycznymi bez ponownego ich odtworzenia w przyszłych aktualizacjach.
Python
from pyspark import pipelines as dp
# Step 1: Create the target streaming table
dp.create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Przepływ once jest uruchamiany tylko raz. Nowe pliki dodawane do full_orders_snapshot po utworzeniu potoku są ignorowane.
Ważne
Wykonanie pełnego odświeżenia w tabeli przesyłania strumieniowego rdbms_orders powoduje ponowne uruchomienie once procesu. Jeśli początkowe dane migawki w magazynie w chmurze zostały usunięte, spowoduje to utratę danych.
Ciągły kanał zmian (przepływ zmian)
Po początkowym załadowaniu migawki użyj innego AUTO CDC przepływu, aby stale pobierać zmiany z kanału CDC RDBMS. Dzięki temu tabela rdbms_orders jest na bieżąco aktualizowana poprzez wstawienia, aktualizacje i usunięcia.
Python
from pyspark import pipelines as dp
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Rozważania
| Idempotentność uzupełniania danych | Przepływ once jest uruchamiany ponownie tylko wtedy, gdy tabela docelowa jest w pełni odświeżona. |
|---|---|
| Wiele przepływów | Można użyć wielu przepływów zmian, aby scalić korekty, dane opóźnione lub alternatywne źródła danych, ale wszystkie muszą udostępniać schemat i klucze. |
| Pełne odświeżanie | Pełne odświeżenie tabeli przesyłania strumieniowego rdbms_orders powoduje ponowne wykonanie przepływu once. Może to prowadzić do utraty danych, jeśli początkowa lokalizacja magazynu w chmurze usunęła początkowe pliki migawek. |
| Kolejność wykonywania przepływu | Kolejność wykonania przepływu procesów nie ma znaczenia. Wynik końcowy jest taki sam. |
Dodatkowe zasoby
- W pełni zarządzany łącznik programu SQL Server w programie Lakeflow Connect