Udostępnij za pośrednictwem


Replikowanie zewnętrznej tabeli RDBMS przy użyciu AUTO CDC

Na tej stronie przedstawiono sposób replikowania tabeli z zewnętrznego systemu zarządzania relacyjnymi bazami danych (RDBMS) do usługi Azure Databricks przy użyciu interfejsu AUTO CDC API w potokach. Dowiesz się:

  • Typowe wzorce konfigurowania źródeł.
  • Jak wykonać jednorazową pełną kopię istniejących danych za pomocą przepływu once.
  • Jak ciągle pozyskiwać nowe zmiany przy użyciu przepływu change.

Ten wzorzec jest idealny do tworzenia wolno zmieniających się tabel wymiarów (SCD) lub zachowywania tabeli docelowej w synchronizacji z zewnętrznym systemem rekordów.

Zanim rozpoczniesz

W tym przewodniku założono, że masz dostęp do następujących zestawów danych ze źródła:

  • Pełna migawka tabeli źródłowej w chmurze. Ten zestaw danych jest używany do początkowego ładowania.
  • Strumień zmian umieszczony w tej samej lokalizacji magazynu w chmurze (na przykład przy użyciu Debezium, Kafka lub CDC opartego na dzienniku). To źródło danych stanowi dane wejściowe dla trwającego AUTO CDC procesu.

Konfigurowanie widoków źródłowych

Najpierw zdefiniuj dwa widoki źródłowe, aby wypełnić tabelę docelową rdbms_orders ze ścieżki orders_snapshot_path magazynu w chmurze. Oba są tworzone jako widoki strumieniowe nad surowymi danymi w magazynie w chmurze. Użycie widoków zapewnia wyższą wydajność, ponieważ dane nie muszą być zapisywane przed użyciem AUTO CDC w procesie.

  • Pierwszy widok źródłowy to pełna migawka (full_orders_snapshot)
  • Drugi to ciągły strumień zmian (rdbms_orders_change_feed).

Przykłady w tym przewodniku używają magazynu w chmurze jako źródła, ale można użyć dowolnego źródła obsługiwanego przez tabele przesyłania strumieniowego.

full_orders_snapshot()

Ten krok tworzy strumień, który korzysta z widoku odczytującego początkowy pełny zrzut danych dotyczących zamówień.

Python

Poniższy przykład w języku Python:

  • Używa spark.readStream z Auto Loaderem (format("cloudFiles"))
  • Odczytuje pliki JSON z katalogu zdefiniowanego przez orders_snapshot_path
  • Ustaw includeExistingFiles na true aby zapewnić, że dane historyczne już obecne w ścieżce są przetwarzane
  • Ustawia inferColumnTypes na true w celu automatycznego wnioskowania schematu
  • Zwraca wszystkie kolumny z .select("\*")
@dp.view()
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(orders_snapshot_path)
        .select("*")
    )

SQL

Poniższy przykład SQL przekazuje opcje jako mapę par klucz-wartość ciągu. orders_snapshot_path powinna być dostępna jako zmienna SQL (na przykład zdefiniowana przy użyciu parametrów potoku lub ręcznie interpolowana).

CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
  "cloudFiles.includeExistingFiles", "true",
  "cloudFiles.inferColumnTypes", "true"
));

rdbms_orders_change_feed()

Ten krok tworzy drugi widok, który odczytuje dane dotyczące przyrostowej zmiany (na przykład z dzienników CDC lub tabel zmian). Czyta z orders_cdc_path i zakłada, że pliki JSON w stylu CDC są regularnie umieszczane na tej ścieżce.

Python

@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)

SQL

W poniższym przykładzie SQL ${orders_cdc_path} jest zmienną, którą można interpolować, ustawiając jej wartość w ustawieniach potoku lub jawnie w kodzie.

CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));

Początkowe nawodnienie (po rozpoczęciu przepływu)

Po skonfigurowaniu AUTO CDC źródeł logika scala oba źródła w docelową tabelę przesyłania strumieniowego. Najpierw użyj jednorazowego przepływu AUTO CDC z ONCE=TRUE, aby skopiować pełną zawartość tabeli RDBMS do tabeli strumieniowej. Spowoduje to przygotowanie tabeli docelowej z danymi historycznymi bez ponownego ich odtworzenia w przyszłych aktualizacjach.

Python

from pyspark import pipelines as dp

# Step 1: Create the target streaming table

dp.create_streaming_table("rdbms_orders")

# Step 2: Once Flow — Load initial snapshot of full RDBMS table

dp.create_auto_cdc_flow(
  flow_name = "initial_load_orders",
  once = True,  # one-time load
  target = "rdbms_orders",
  source = "full_orders_snapshot",  # e.g., ingested from JDBC into bronze
  keys = ["order_id"],
  sequence_by = "timestamp",
  stored_as_scd_type = "1"
)

SQL


-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;

-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

Przepływ once jest uruchamiany tylko raz. Nowe pliki dodawane do full_orders_snapshot po utworzeniu potoku są ignorowane.

Ważne

Wykonanie pełnego odświeżenia w tabeli przesyłania strumieniowego rdbms_orders powoduje ponowne uruchomienie once procesu. Jeśli początkowe dane migawki w magazynie w chmurze zostały usunięte, spowoduje to utratę danych.

Ciągły kanał zmian (przepływ zmian)

Po początkowym załadowaniu migawki użyj innego AUTO CDC przepływu, aby stale pobierać zmiany z kanału CDC RDBMS. Dzięki temu tabela rdbms_orders jest na bieżąco aktualizowana poprzez wstawienia, aktualizacje i usunięcia.

Python

from pyspark import pipelines as dp

# Step 3: Change Flow — Ingest ongoing CDC stream from source system

dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

SQL

-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

Rozważania

Idempotentność uzupełniania danych Przepływ once jest uruchamiany ponownie tylko wtedy, gdy tabela docelowa jest w pełni odświeżona.
Wiele przepływów Można użyć wielu przepływów zmian, aby scalić korekty, dane opóźnione lub alternatywne źródła danych, ale wszystkie muszą udostępniać schemat i klucze.
Pełne odświeżanie Pełne odświeżenie tabeli przesyłania strumieniowego rdbms_orders powoduje ponowne wykonanie przepływu once. Może to prowadzić do utraty danych, jeśli początkowa lokalizacja magazynu w chmurze usunęła początkowe pliki migawek.
Kolejność wykonywania przepływu Kolejność wykonania przepływu procesów nie ma znaczenia. Wynik końcowy jest taki sam.

Dodatkowe zasoby