Condividi tramite


Replicare una tabella RDBMS esterna usando AUTO CDC

Questa pagina illustra come replicare una tabella da un sistema di gestione di database relazionali (RDBMS) esterno in Azure Databricks usando l'API AUTO CDC nelle pipeline. Imparerai:

  • Modelli comuni per la configurazione delle origini.
  • Come eseguire una copia completa una tantum dei dati esistenti usando un once flusso.
  • Come inserire continuamente nuove modifiche usando un change flusso.

Questo modello è ideale per la creazione di tabelle di dimensioni a modifica lenta (SCD) o per mantenere sincronizzata una tabella di destinazione con un sistema esterno di record.

Prima di iniziare

Questa guida presuppone che l'utente abbia accesso ai set di dati seguenti dall'origine:

  • Snapshot completo della tabella di origine nell'archiviazione su cloud. Questo set di dati viene usato per il caricamento iniziale.
  • Feed di modifiche continuo, popolato nella stessa posizione di archiviazione cloud, ad esempio usando Debezium, Kafka o CDC basato su log. Questo flusso è l'input per il processo in corso AUTO CDC.

Configurare le viste di origine

Prima, definisci due viste sorgente per riempire la tabella di destinazione rdbms_orders da un percorso di archiviazione cloud orders_snapshot_path. Entrambi sono creati come visualizzazioni di streaming sui dati non elaborati nell'archiviazione nel cloud. L'uso delle viste garantisce un'efficienza maggiore perché i dati non devono essere scritti prima dell'uso AUTO CDC nel processo.

  • La prima vista di origine è uno snapshot completo (full_orders_snapshot)
  • Il secondo è un feed di modifiche continuo (rdbms_orders_change_feed).

Gli esempi in questa guida usano l'archiviazione cloud come origine, ma è possibile usare qualsiasi origine supportata dalle tabelle di streaming.

full_orders_snapshot()

Questo passaggio crea una pipeline con una vista che legge lo snapshot completo iniziale dei dati degli ordini.

Pitone

L'esempio python seguente:

  • Usa spark.readStream con il caricatore automatico (format("cloudFiles"))
  • Legge i file JSON da una directory definita da orders_snapshot_path
  • Imposta includeExistingFiles a true per garantire che i dati cronologici già presenti nel percorso vengano elaborati
  • Imposta inferColumnTypes a true per dedurre automaticamente lo schema
  • Restituisce tutte le colonne con .select("\*")
@dp.view()
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(orders_snapshot_path)
        .select("*")
    )

SQL

L'esempio SQL seguente passa le opzioni come mappa di coppie chiave-valore stringa. orders_snapshot_path deve essere disponibile come variabile SQL( ad esempio, definita usando parametri della pipeline o interpolata manualmente).

CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
  "cloudFiles.includeExistingFiles", "true",
  "cloudFiles.inferColumnTypes", "true"
));

rdbms_orders_change_feed()

Questo passaggio crea una seconda visualizzazione che legge i dati delle modifiche incrementali, ad esempio dai log CDC o dalle tabelle delle modifiche. Legge da orders_cdc_path e presuppone che i file JSON in stile CDC vengano inseriti regolarmente in questo percorso.

Pitone

@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)

SQL

Nell'esempio SQL seguente è ${orders_cdc_path} una variabile e può essere interpolata impostando un valore nelle impostazioni della pipeline o impostando in modo esplicito una variabile nel codice.

CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));

Idratazione iniziale (al primo flusso)

Ora che le origini sono configurate, AUTO CDC la logica unisce entrambe le origini in una tabella di streaming di destinazione. Per prima cosa, usare un flusso monouso AUTO CDC con ONCE=TRUE per copiare il contenuto completo della tabella RDBMS in una tabella di streaming. In questo modo la tabella di destinazione viene preparata con i dati cronologici senza reinserirli negli aggiornamenti futuri.

Pitone

from pyspark import pipelines as dp

# Step 1: Create the target streaming table

dp.create_streaming_table("rdbms_orders")

# Step 2: Once Flow — Load initial snapshot of full RDBMS table

dp.create_auto_cdc_flow(
  flow_name = "initial_load_orders",
  once = True,  # one-time load
  target = "rdbms_orders",
  source = "full_orders_snapshot",  # e.g., ingested from JDBC into bronze
  keys = ["order_id"],
  sequence_by = "timestamp",
  stored_as_scd_type = "1"
)

SQL


-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;

-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

Il once flusso viene eseguito una sola volta. I nuovi file aggiunti a full_orders_snapshot dopo la creazione della pipeline vengono ignorati.

Importante

L'esecuzione di un aggiornamento completo sulla tabella di streaming rdbms_orders esegue il flusso once nuovamente. Se i dati dello snapshot iniziale nell'archiviazione cloud sono stati rimossi, ciò comporta la perdita di dati.

Flusso di modifiche continuo

Dopo il caricamento iniziale dello snapshot, usare un altro AUTO CDC flusso per inserire continuamente le modifiche dal feed CDC di RDBMS. In questo modo la rdbms_orders tabella viene aggiornata con inserimenti, aggiornamenti ed eliminazioni.

Pitone

from pyspark import pipelines as dp

# Step 3: Change Flow — Ingest ongoing CDC stream from source system

dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

SQL

-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

Considerazioni

Backfill e idempotenza Un once flusso viene eseguito di nuovo solo quando la tabella di destinazione viene completamente aggiornata.
Più flussi È possibile usare più flussi di modifica per unire le correzioni, i dati in arrivo in ritardo o i feed alternativi, ma tutti devono condividere uno schema e le chiavi.
Aggiornamento completo Un aggiornamento completo della tabella di streaming rdbms_orders riesegue il flusso once. Ciò può causare la perdita di dati se la posizione iniziale di archiviazione cloud ha eliminato i dati dello snapshot iniziale.
Ordine di esecuzione del flusso L'ordine di esecuzione del flusso non è importante. Il risultato finale è lo stesso.

Risorse aggiuntive