Partilhar via


Replicar uma tabela RDBMS externa usando AUTO CDC

Esta página orienta você sobre como replicar uma tabela de um sistema de gerenciamento de banco de dados relacional externo (RDBMS) no Azure Databricks usando a AUTO CDC API em pipelines. Você aprenderá:

  • Padrões comuns para configurar as fontes.
  • Como executar uma cópia completa única dos dados existentes usando um once fluxo.
  • Como assimilar de forma contínua alterações novas usando um change fluxo.

Esse padrão é ideal para criar tabelas de dimensões que mudam lentamente (SCD) ou manter uma tabela de destino sincronizada com um sistema externo de registro.

Antes de começar

Este guia pressupõe que você tenha acesso aos seguintes conjuntos de dados de sua origem:

  • Um instantâneo completo da tabela de origem no armazenamento em nuvem. Este conjunto de dados é usado para a carga inicial.
  • Um feed de alteração contínua, preenchido no mesmo local de armazenamento em nuvem (por exemplo, usando Debezium, Kafka ou CDC baseado em log). Este feed é a entrada para o processo em curso AUTO CDC .

Configurar visualizações de código-fonte

Primeiro, defina duas visualizações de origem para preencher a rdbms_orders tabela de destino a partir de um caminho orders_snapshot_pathde armazenamento em nuvem. Ambos são construídos como visualizações de streaming sobre dados brutos no armazenamento em nuvem. O uso de modos de exibição proporciona maior eficiência porque os dados não precisam ser gravados antes de serem usados no AUTO CDC processo.

  • A primeira visualização de origem é um instantâneo completo (full_orders_snapshot)
  • O segundo é um feed de mudança contínua (rdbms_orders_change_feed).

Os exemplos neste guia usam o armazenamento em nuvem como fonte, mas você pode usar qualquer fonte suportada por tabelas de streaming.

full_orders_snapshot()

Esta etapa cria um pipeline com uma vista que lê a captura inicial completa dos dados dos pedidos.

Python

O seguinte exemplo de Python:

  • Utilize spark.readStream com Auto Loader (format("cloudFiles"))
  • Lê arquivos JSON de um diretório definido por orders_snapshot_path
  • Defina includeExistingFiles para true para assegurar que os dados históricos já presentes no trajeto sejam processados
  • Defina inferColumnTypes para true a fim de inferir o esquema automaticamente
  • Devolve todas as colunas com .select("\*")
@dp.view()
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(orders_snapshot_path)
        .select("*")
    )

SQL

O exemplo SQL a seguir passa opções como um mapa de pares chave-valor de strings. orders_snapshot_path deve estar disponível como uma variável SQL (por exemplo, definida usando parâmetros de pipeline ou interpolada manualmente).

CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
  "cloudFiles.includeExistingFiles", "true",
  "cloudFiles.inferColumnTypes", "true"
));

rdbms_orders_change_feed()

Esta etapa cria uma segunda exibição que lê dados de alteração incremental (por exemplo, de logs CDC ou tabelas de alteração). O programa lê a partir de orders_cdc_path e assume que os ficheiros JSON ao estilo CDC são colocados neste caminho regularmente.

Python

@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)

SQL

No exemplo SQL a seguir, ${orders_cdc_path} é uma variável e pode ser interpolada definindo um valor em suas configurações de pipeline ou definindo explicitamente uma variável em seu código.

CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));

Hidratação inicial (após o início do fluxo)

Agora que as fontes estão configuradas, AUTO CDC a lógica mescla ambas as fontes em uma tabela de streaming de destino. Primeiro, use um fluxo único AUTO CDC com ONCE=TRUE para copiar todo o conteúdo da tabela RDBMS em uma tabela de streaming. Isso prepara a tabela de destino com dados históricos sem reproduzi-los em atualizações futuras.

Python

from pyspark import pipelines as dp

# Step 1: Create the target streaming table

dp.create_streaming_table("rdbms_orders")

# Step 2: Once Flow — Load initial snapshot of full RDBMS table

dp.create_auto_cdc_flow(
  flow_name = "initial_load_orders",
  once = True,  # one-time load
  target = "rdbms_orders",
  source = "full_orders_snapshot",  # e.g., ingested from JDBC into bronze
  keys = ["order_id"],
  sequence_by = "timestamp",
  stored_as_scd_type = "1"
)

SQL


-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;

-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

O once fluxo só é executado uma vez. Novos arquivos que são adicionados a full_orders_snapshot após a criação do pipeline são ignorados.

Importante

Executar uma atualização completa na tabela de streaming rdbms_orders faz com que o fluxo once seja executado novamente. Se os dados de instantâneo iniciais no armazenamento em nuvem tiverem sido removidos, isso resultará em perda de dados.

Canal de alterações contínuas (fluxo de alterações)

Após a carga inicial do snapshot, use outro AUTO CDC fluxo para ingerir continuamente as alterações do feed CDC do RDBMS. Isso mantém sua rdbms_orders tabela atualizada com inserções, atualizações e exclusões.

Python

from pyspark import pipelines as dp

# Step 3: Change Flow — Ingest ongoing CDC stream from source system

dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

SQL

-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

Considerações

Idempotência de retropreenchimento Um once fluxo só é executado novamente quando a tabela de destino é totalmente atualizada.
Fluxos múltiplos Você pode usar vários fluxos de alteração para mesclar correções, dados que chegam atrasados ou feeds alternativos, mas todos devem compartilhar um esquema e chaves.
Atualização completa Um recarregamento completo da rdbms_orders tabela de streaming reexecuta o once fluxo. Isso pode levar à perda de dados se o local de armazenamento em nuvem inicial tiver removido os dados de instantâneo iniciais.
Ordem de execução de fluxo A ordem de execução do fluxo não importa. O resultado final é o mesmo.

Recursos adicionais