Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Replicación de una tabla RDBMS externa mediante
En esta página se explica cómo replicar una tabla desde un sistema de administración de bases de datos relacionales (RDBMS) externo en Azure Databricks mediante la AUTO CDC API en canalizaciones. Aprenderá lo siguiente:
- Patrones comunes para configurar los orígenes.
- Cómo realizar una copia completa de los datos existentes para un uso único mediante un flujo
once. - Cómo ingerir continuamente nuevos cambios mediante un
changeflujo.
Este patrón es ideal para crear tablas de dimensiones de variación lenta (SCD) o mantener una tabla de destino sincronizada con un sistema externo de registro.
Antes de empezar
En esta guía se da por supuesto que tiene acceso a los siguientes conjuntos de datos de su fuente:
- Instantánea completa de la tabla de origen en el almacenamiento en la nube. Este conjunto de datos se usa para la carga inicial.
- Una fuente de cambios continua, poblada en la misma ubicación de almacenamiento en la nube (por ejemplo, utilizando Debezium, Kafka o CDC basado en registros). Esta fuente es la entrada del proceso en curso
AUTO CDC.
Configuración de vistas de origen
En primer lugar, defina dos vistas de origen para rellenar la tabla de destino rdbms_orders desde una ruta orders_snapshot_path de acceso de almacenamiento en la nube. Ambos se crean como vistas de streaming sobre datos sin procesar en el almacenamiento en la nube. El uso de vistas proporciona una mayor eficacia porque los datos no tienen que escribirse antes de usarlos en el AUTO CDC proceso.
- La primera vista de origen es una instantánea completa (
full_orders_snapshot) - El segundo es una fuente de cambios continua (
rdbms_orders_change_feed).
En los ejemplos de esta guía se usa el almacenamiento en la nube como origen, pero puede usar cualquier origen compatible con las tablas de streaming.
full_orders_snapshot()
Este paso crea una canalización con una vista que lee la instantánea completa inicial de los datos de pedidos.
Pitón
El siguiente ejemplo de Python:
- Usa
spark.readStreamcon el cargador automático (format("cloudFiles")) - Lee los archivos JSON de un directorio definido por
orders_snapshot_path - Establece
includeExistingFilesentruepara asegurarse de que los datos históricos que ya están presentes en la ruta de acceso sean procesados. - Establece
inferColumnTypesentruepara deducir automáticamente el esquema. - Devuelve todas las columnas con
.select("\*")
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
SQL
En el siguiente ejemplo de SQL se pasan opciones como un mapa de pares clave-valor de cadena.
orders_snapshot_path debe estar disponible como una variable SQL (por ejemplo, definida mediante parámetros de canalización o interpoladas manualmente).
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
Este paso crea una segunda vista que lee los datos de cambios incrementales (por ejemplo, de registros CDC o tablas de cambios). Lee desde orders_cdc_path y supone que los archivos JSON con estilo CDC se colocan regularmente en esta ruta de acceso.
Pitón
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
SQL
En el siguiente ejemplo de SQL, ${orders_cdc_path} es una variable y se puede interpolar estableciendo un valor en la configuración de canalización o estableciendo explícitamente una variable en el código.
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
Hidratación inicial (una vez flujo)
Ahora que los orígenes están configurados, AUTO CDC la lógica combina ambos orígenes en una tabla de streaming de destino. En primer lugar, use un flujo único AUTO CDC con ONCE=TRUE para copiar el contenido completo de la tabla RDBMS en una tabla de streaming. Esto prepara la tabla de destino con datos históricos sin reproducirla en futuras actualizaciones.
Pitón
from pyspark import pipelines as dp
# Step 1: Create the target streaming table
dp.create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
El once flujo solo se ejecuta una vez. Se omiten los nuevos archivos que se agregan a full_orders_snapshot después de crear la canalización.
Importante
Al realizar una actualización completa en la tabla de rdbms_orders streaming, se ejecuta nuevamente el flujo once. Si se han quitado los datos de instantánea iniciales en el almacenamiento en la nube, esto da lugar a la pérdida de datos.
Fuente de cambios continua (flujo de cambios)
Después de la carga inicial de la instantánea, use otro AUTO CDC flujo para la ingestión continua de los cambios de la alimentación CDC del RDBMS. Esto mantiene rdbms_orders actualizada la tabla con inserciones, actualizaciones y eliminaciones.
Pitón
from pyspark import pipelines as dp
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
Consideraciones
| Idempotencia de relleno | Un once flujo solo se vuelve a ejecutar cuando la tabla de destino está totalmente actualizada. |
|---|---|
| Varios flujos | Puede usar varios flujos de cambio para combinar correcciones, datos de llegada tardía o fuentes alternativas, pero todos deben compartir un esquema y claves. |
| Actualización completa | Una actualización completa de la tabla de rdbms_orders streaming reejecuta el flujo once. Esto puede provocar la pérdida de datos si la ubicación de almacenamiento en la nube inicial ha quitado los datos de instantánea iniciales. |
| Orden de ejecución de flujo | El orden de ejecución del flujo no importa. El resultado final es el mismo. |
Recursos adicionales
- Conector de SQL Server totalmente administrado en Lakeflow Connect