Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Replicar uma tabela RDBMS externa usando
Esta página orienta você sobre como replicar uma tabela de um sistema de gerenciamento de banco de dados relacional externo (RDBMS) no Azure Databricks usando a AUTO CDC API em pipelines. Você aprenderá:
- Padrões comuns para configurar as fontes.
- Como executar uma cópia completa única dos dados existentes usando um
oncefluxo. - Como assimilar de forma contínua alterações novas usando um
changefluxo.
Esse padrão é ideal para criar tabelas de dimensões que mudam lentamente (SCD) ou manter uma tabela de destino sincronizada com um sistema externo de registro.
Antes de começar
Este guia pressupõe que você tenha acesso aos seguintes conjuntos de dados de sua origem:
- Um instantâneo completo da tabela de origem no armazenamento em nuvem. Este conjunto de dados é usado para a carga inicial.
- Um feed de alteração contínua, preenchido no mesmo local de armazenamento em nuvem (por exemplo, usando Debezium, Kafka ou CDC baseado em log). Este feed é a entrada para o processo em curso
AUTO CDC.
Configurar visualizações de código-fonte
Primeiro, defina duas visualizações de origem para preencher a rdbms_orders tabela de destino a partir de um caminho orders_snapshot_pathde armazenamento em nuvem. Ambos são construídos como visualizações de streaming sobre dados brutos no armazenamento em nuvem. O uso de modos de exibição proporciona maior eficiência porque os dados não precisam ser gravados antes de serem usados no AUTO CDC processo.
- A primeira visualização de origem é um instantâneo completo (
full_orders_snapshot) - O segundo é um feed de mudança contínua (
rdbms_orders_change_feed).
Os exemplos neste guia usam o armazenamento em nuvem como fonte, mas você pode usar qualquer fonte suportada por tabelas de streaming.
full_orders_snapshot()
Esta etapa cria um pipeline com uma vista que lê a captura inicial completa dos dados dos pedidos.
Python
O seguinte exemplo de Python:
- Utilize
spark.readStreamcom Auto Loader (format("cloudFiles")) - Lê arquivos JSON de um diretório definido por
orders_snapshot_path - Defina
includeExistingFilesparatruepara assegurar que os dados históricos já presentes no trajeto sejam processados - Defina
inferColumnTypesparatruea fim de inferir o esquema automaticamente - Devolve todas as colunas com
.select("\*")
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
SQL
O exemplo SQL a seguir passa opções como um mapa de pares chave-valor de strings.
orders_snapshot_path deve estar disponível como uma variável SQL (por exemplo, definida usando parâmetros de pipeline ou interpolada manualmente).
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
Esta etapa cria uma segunda exibição que lê dados de alteração incremental (por exemplo, de logs CDC ou tabelas de alteração). O programa lê a partir de orders_cdc_path e assume que os ficheiros JSON ao estilo CDC são colocados neste caminho regularmente.
Python
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
SQL
No exemplo SQL a seguir, ${orders_cdc_path} é uma variável e pode ser interpolada definindo um valor em suas configurações de pipeline ou definindo explicitamente uma variável em seu código.
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
Hidratação inicial (após o início do fluxo)
Agora que as fontes estão configuradas, AUTO CDC a lógica mescla ambas as fontes em uma tabela de streaming de destino. Primeiro, use um fluxo único AUTO CDC com ONCE=TRUE para copiar todo o conteúdo da tabela RDBMS em uma tabela de streaming. Isso prepara a tabela de destino com dados históricos sem reproduzi-los em atualizações futuras.
Python
from pyspark import pipelines as dp
# Step 1: Create the target streaming table
dp.create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
O once fluxo só é executado uma vez. Novos arquivos que são adicionados a full_orders_snapshot após a criação do pipeline são ignorados.
Importante
Executar uma atualização completa na tabela de streaming rdbms_orders faz com que o fluxo once seja executado novamente. Se os dados de instantâneo iniciais no armazenamento em nuvem tiverem sido removidos, isso resultará em perda de dados.
Canal de alterações contínuas (fluxo de alterações)
Após a carga inicial do snapshot, use outro AUTO CDC fluxo para ingerir continuamente as alterações do feed CDC do RDBMS. Isso mantém sua rdbms_orders tabela atualizada com inserções, atualizações e exclusões.
Python
from pyspark import pipelines as dp
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Considerações
| Idempotência de retropreenchimento | Um once fluxo só é executado novamente quando a tabela de destino é totalmente atualizada. |
|---|---|
| Fluxos múltiplos | Você pode usar vários fluxos de alteração para mesclar correções, dados que chegam atrasados ou feeds alternativos, mas todos devem compartilhar um esquema e chaves. |
| Atualização completa | Um recarregamento completo da rdbms_orders tabela de streaming reexecuta o once fluxo. Isso pode levar à perda de dados se o local de armazenamento em nuvem inicial tiver removido os dados de instantâneo iniciais. |
| Ordem de execução de fluxo | A ordem de execução do fluxo não importa. O resultado final é o mesmo. |
Recursos adicionais
- Conector SQL Server totalmente gerenciado no Lakeflow Connect