Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Las canalizaciones declarativas de Spark de Lakeflow simplifican la captura de datos modificados (CDC) con las API AUTO CDC y AUTO CDC FROM SNAPSHOT. Estas API automatizan la complejidad de la computación de dimensiones de variación lenta (SCD) tipo 1 y tipo 2 desde una fuente CDC o instantáneas de base de datos. Para más información sobre estos conceptos, consulte Captura de datos modificados e instantáneas.
Nota:
Las AUTO CDC API reemplazan las APPLY CHANGES API y tienen la misma sintaxis. Las APPLY CHANGES API siguen estando disponibles, pero Databricks recomienda usar las AUTO CDC API en su lugar.
La API que use depende del origen de los datos modificados:
-
AUTO CDC: Utilice esto cuando la base de datos de origen tenga una fuente CDC habilitada.AUTO CDCprocesa los cambios de un flujo de datos de cambios (CDF). Se admite en las interfaces SQL y Python de canalización. -
AUTO CDC FROM SNAPSHOT: Use esto cuando CDC no está habilitado en la base de datos de origen y solo hay instantáneas disponibles. Esta API compara las instantáneas para determinar los cambios y, a continuación, los procesa. Solo se admite en la interfaz de Python.
Ambas API admiten la actualización de tablas mediante SCD Tipo 1 y Tipo 2:
- Use SCD Type 1 para actualizar los registros directamente. El historial no se conserva para los registros actualizados.
- Use SCD Type 2 para conservar un historial de registros, ya sea en todas las actualizaciones o en actualizaciones de un conjunto de columnas especificado.
Las AUTO CDC API no son compatibles con las canalizaciones declarativas de Apache Spark.
Para consultar la sintaxis y otras referencias, consulte AUTO CDC INTO (pipelines), create_auto_cdc_flow y create_auto_cdc_from_snapshot_flow.
Nota:
En esta página se describe cómo actualizar las tablas de las canalizaciones en función de los cambios en los datos de origen. Para obtener información sobre cómo registrar y consultar información de cambios de nivel de fila para tablas Delta, consulte Uso de la fuente de datos de cambios de Delta Lake en Azure Databricks.
Requisitos
Para usar las API de CDC, la canalización debe configurarse para usar SDP sin servidor o Pro las Advancedediciones SDP.
Funcionamiento de AUTO CDC
Para realizar el procesamiento CDC con AUTO CDC, cree una tabla de streaming y, a continuación, use la instrucción AUTO CDC ... INTO en SQL o la función create_auto_cdc_flow() en Python para especificar el origen, las claves y la secuenciación para el feed de cambios. Para una explicación de cómo funcionan la secuenciación y la lógica SCD, consulte Captura de datos modificados e instantáneas. Consulte los ejemplos de AUTO CDC.
Para la hidratación inicial desde una fuente con un feed de cambios, use AUTO CDC con un flujo de once y, a continuación, continúe procesando el feed de cambios. Consulte Replicación de una tabla RDBMS externa mediante AUTO CDC.
Para más información sobre la sintaxis, consulte AUTO CDC INTO (pipelines) o create_auto_cdc_flow.
Funcionamiento de AUTO CDC FROM SNAPSHOT
AUTO CDC FROM SNAPSHOT determina los cambios en los datos de origen comparando instantáneas en orden. Solo se admite en la interfaz de canalización de Python. Puede leer instantáneas de una tabla Delta, archivos de almacenamiento en la nube o JDBC directamente.
Para realizar el procesamiento CDC con AUTO CDC FROM SNAPSHOT, cree una tabla de streaming y luego use la función create_auto_cdc_from_snapshot_flow() para especificar la instantánea, las claves y otros argumentos. Para más información sobre los dos patrones de ingesta y cuándo usar cada uno, consulte Patrones de procesamiento de instantáneas. Consulte los ejemplos de AUTO CDC FROM SNAPSHOT.
Para obtener más información sobre la sintaxis, consulte create_auto_cdc_from_snapshot_flow.
Uso de varias columnas para la secuenciación
Para secuenciar por varias columnas (por ejemplo, una marca de tiempo y un identificador para resolver empates), use un STRUCT para combinarlas. La API ordena primero por el primer campo y, en caso de empate, tiene en cuenta el segundo campo, etc.
SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)
Pitón
sequence_by = struct("timestamp_col", "id_col")
Ejemplos de AUTO CDC
En los ejemplos siguientes se muestra el procesamiento SCD Tipo 1 y Tipo 2 mediante una fuente de alimentación de datos de cambios. Los datos de ejemplo crean nuevos registros de usuario, eliminan un registro de usuario y actualizan los registros de usuario. En el ejemplo del tipo 1 de SCD, las últimas UPDATE operaciones llegan tarde y se quitan de la tabla de destino, lo que muestra el control de eventos desordenados.
A continuación se muestran los registros de entrada usados en estos ejemplos. Estos datos se crean mediante la ejecución de la consulta en la sección Crear datos de ejemplo .
| userId | nombre | city | operation | secuenciaNum |
|---|---|---|---|---|
| 124 | Raul | Oaxaca | INSERT | 1 |
| 123 | Isabel | Monterrey | INSERT | 1 |
| 125 | Mercedes | Tijuana | INSERT | 2 |
| 126 | Lily | Cancún | INSERT | 2 |
| 123 | nulo | nulo | DELETE | 6 |
| 125 | Mercedes | Guadalajara | UPDATE | 6 |
| 125 | Mercedes | Mexicali | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
Si eliminas el comentario de la última línea de la consulta de generación de datos de ejemplo, inserta el siguiente registro que especifica vaciar la tabla en sequenceNum=3:
| userId | nombre | city | operation | secuenciaNum |
|---|---|---|---|---|
| nulo | nulo | nulo | TRUNCAR | 3 |
Nota:
Todos los ejemplos siguientes incluyen opciones para especificar las DELETE operaciones y TRUNCATE , pero cada una es opcional.
Creación de datos de ejemplo
Ejecute las instrucciones siguientes para crear un conjunto de datos de ejemplo. Este código no está pensado para ejecutarse como parte de una definición de canalización. Ejecútelo desde la carpeta de exploración de la canalización, en lugar de la carpeta de transformaciones.
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Procesar actualizaciones del tipo 1 de SCD
SCD Type 1 mantiene solo la versión más reciente de cada registro. En el ejemplo siguiente se lee del flujo de datos de cambios creado anteriormente y se aplican los cambios a una tabla de destino de transmisión. Desarrolle canalizaciones declarativas de Spark de Lakeflow para ejecutar este código.
Pitón
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_current")
dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
CREATE OR REFRESH STREAMING TABLE users_current;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_current
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Después de ejecutar el ejemplo de SCD Type 1, la tabla de destino contiene los siguientes registros:
| userId | nombre | city |
|---|---|---|
| 124 | Raul | Oaxaca |
| 125 | Mercedes | Guadalajara |
| 126 | Lily | Cancún |
Se eliminó el usuario 123 (Isabel) y no aparece. El usuario 125 (Mercedes) muestra solo la ciudad más reciente (Guadalajara) porque SCD Type 1 sobrescribe los valores anteriores. El UPDATE anterior en sequenceNum=5 se eliminó porque llegó una actualización posterior en sequenceNum=6.
Después de ejecutar el ejemplo con el TRUNCATE registro descomentado, la tabla se elimina en sequenceNum=3. Esto significa que los registros 124 y 126 no están en la tabla y la tabla de destino final contiene solo el registro siguiente:
| userId | nombre | city |
|---|---|---|
| 125 | Mercedes | Guadalajara |
Procesar actualizaciones del tipo 2 de SCD
ScD Type 2 conserva un historial completo de cambios mediante la creación de nuevas filas para cada versión de un registro, con __START_AT columnas y __END_AT que indican cuándo estaba activa cada versión.
Pitón
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Después de ejecutar el ejemplo scD type 2, la tabla de destino contiene los siguientes registros:
| userId | nombre | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Monterrey | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | Raul | Oaxaca | 1 | nulo |
| 125 | Mercedes | Tijuana | 2 | 5 |
| 125 | Mercedes | Mexicali | 5 | 6 |
| 125 | Mercedes | Guadalajara | 6 | nulo |
| 126 | Lily | Cancún | 2 | nulo |
La tabla conserva el historial completo. El usuario 123 tiene dos versiones (finalizó en la secuencia 6 cuando se eliminó). El usuario 125 tiene tres versiones que muestran los cambios en la ciudad. Los registros con __END_AT = null están activos actualmente.
Rastrear un subconjunto de columnas con SCD Type 2
De forma predeterminada, SCD Type 2 crea una nueva versión cada vez que cambia cualquier valor de columna. Puede especificar un subconjunto de columnas para realizar un seguimiento, de modo que los cambios realizados en otras columnas actualicen la versión actual en lugar de generar un nuevo registro de historial.
En el ejemplo siguiente se excluye la city columna del seguimiento del historial:
Pitón
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Como no se realiza un seguimiento de los cambios en city, las actualizaciones de la ciudad sobrescriben la fila existente en lugar de crear una nueva versión. La tabla de destino contiene los siguientes registros:
| userId | nombre | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | Raul | Oaxaca | 1 | nulo |
| 125 | Mercedes | Guadalajara | 2 | nulo |
| 126 | Lily | Cancún | 2 | nulo |
Ejemplos de AUTO CDC FROM SNAPSHOT
En las secciones siguientes se proporcionan ejemplos de cómo usar AUTO CDC FROM SNAPSHOT para procesar instantáneas en tablas de destino de Tipo 1 o Tipo 2 de SCD. Para obtener información sobre cuándo usar esta API, consulte Captura de datos modificados e instantáneas.
Ejemplo: Procesamiento de instantáneas mediante el tiempo de ingesta de canalización
Use este enfoque cuando las instantáneas lleguen periódicamente y en orden, y puede confiar en la marca de tiempo de ejecución de la canalización para el control de versiones. Se ingiere una nueva instantánea con cada actualización de canalización.
Puede leer instantáneas de varios tipos de origen, incluidas tablas Delta, archivos de almacenamiento en la nube y conexiones JDBC.
Paso 1: Crear datos de ejemplo
Cree una tabla que contenga datos de instantáneas. Ejecute el código siguiente desde un cuaderno o Databricks SQL en la carpeta de la explorations canalización:
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);
INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');
Paso 2: Ejecutar 'AUTO CDC DESDE SNAPSHOT'
Desarrolle canalizaciones declarativas de Spark de Lakeflow para ejecutar el código en este paso.
Elija un tipo de origen para la vista de instantáneas (el código de creación de ejemplo genera una tabla Delta):
Opción A: Leer desde una tabla Delta
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")
Opción B: Leer desde el almacenamiento en la nube
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")
Opción C: Lectura desde JDBC (solo proceso clásico)
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
Todas las opciones, escribir en el destino
A continuación, agregue la tabla de destino y el flujo:
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)
Después de la primera ejecución del proceso de la canalización, todos los registros se insertan como filas activas.
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | nulo |
| 2 | Monterrey | 0 | nulo |
| 3 | Tijuana | 0 | nulo |
Nota:
Para usar scD type 1 en su lugar y mantener solo el estado actual, establezca stored_as_scd_type=1. En este caso, la tabla de destino no incluye ni __START_AT ni __END_AT columnas.
Paso 3: Simular una nueva instantánea y volver a ejecutar
Actualice la tabla de origen para simular una nueva instantánea que llegue (ejecute este código desde un cuaderno o un archivo SQL en la explorations carpeta de la pipline):
TRUNCATE TABLE main.cdc_tutorial.snapshot;
INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');
Repetición de la ejecución de la canalización
AUTO CDC FROM SNAPSHOT compara la nueva instantánea con la anterior y detecta que se eliminó el usuario 1, se actualizaron los usuarios 2 y 3 y se insertaron los usuarios 4 y 6. Esto genera una fuente de cambios y usa AUTO CDC para crear la tabla de salida.
Después de la segunda ejecución con SCD Type 2, la tabla de destino contiene los siguientes registros:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | 1 |
| 2 | Monterrey | 0 | 1 |
| 2 | Carmel | 1 | nulo |
| 3 | Tijuana | 0 | 1 |
| 3 | Los Ángeles | 1 | nulo |
| 4 | Valle de la muerte | 1 | nulo |
| 6 | Kings Canyon | 1 | nulo |
El usuario 1 finalizó (se eliminó). Los usuarios 2 y 3 tienen dos versiones que muestran sus cambios en la ciudad. Los usuarios 4 y 6 se insertaron recientemente.
Después de la segunda ejecución con SCD Type 1, la tabla de destino solo muestra el estado actual:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Ángeles |
| 4 | Valle de la muerte |
| 6 | Kings Canyon |
Ejemplo: Procesamiento de instantáneas mediante funciones de versión
Use este enfoque cuando necesite un control explícito sobre el orden de instantáneas. Por ejemplo, use este enfoque cuando llegan varias instantáneas al mismo tiempo o las instantáneas llegan fuera de orden. Escriba una función que especifique qué instantánea procesará a continuación y su número de versión. La API procesa instantáneas en orden ascendente de versiones:
- Si hay varias instantáneas en el almacenamiento, todas se procesan en orden.
- Si una instantánea llega fuera de orden (por ejemplo,
snapshot_3llega después desnapshot_4), se omite. - Si no hay nuevas instantáneas, la función devuelve
Noney no se produce ningún procesamiento.
Paso 1: Preparación de archivos de instantánea
Crear archivos CSV que contengan datos de instantáneas y agregarlos a una ubicación de almacenamiento en un volumen o en la nube. Asigne un nombre cronológico a los archivos (por ejemplo, snapshot_1.csv, snapshot_2.csv).
Cada archivo debe contener columnas para userId y city. Por ejemplo:
snapshot_1.csv:
| userId | city |
|---|---|
| 1 | Oaxaca |
| 2 | Monterrey |
| 3 | Tijuana |
snapshot_2.csv:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Ángeles |
| 4 | Valle de la muerte |
Paso 2: Ejecución de AUTO CDC FROM SNAPSHOT con una función de versión
Cree un cuaderno y pegue el código de canalización siguiente. Desarrolle las canalizaciones declarativas de Lakeflow Spark.
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data
files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]
snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue
snapshot_versions.sort()
if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None
snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)
dp.create_streaming_table("main.cdc_tutorial.target_versioned")
dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
Nota:
Para usar scD type 1 en su lugar, establezca stored_as_scd_type=1.
Después de procesar snapshot_1.csv, la tabla de destino contiene los siguientes registros:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | nulo |
| 2 | Monterrey | 1 | nulo |
| 3 | Tijuana | 1 | nulo |
Después de procesar snapshot_2.csv, la tabla de destino contiene los siguientes registros:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | 2 |
| 2 | Monterrey | 1 | 2 |
| 2 | Carmel | 2 | nulo |
| 3 | Tijuana | 1 | 2 |
| 3 | Los Ángeles | 2 | nulo |
| 4 | Valle de la muerte | 2 | nulo |
Nota:
Recuerde que, para SCD Type 1, la tabla es exactamente similar a la instantánea más reciente. La diferencia es que las consultas posteriores pueden usar el flujo de cambios para procesar solo los registros modificados.
Paso 3: Agregar nuevas instantáneas
Agregue un nuevo archivo CSV a la ubicación de almacenamiento con datos modificados (por ejemplo, valores de ciudad modificados, filas nuevas o filas eliminadas). Después, vuelva a ejecutar la canalización para procesar la nueva instantánea.
Limitaciones
- La columna de secuenciación debe ser un tipo de datos ordenable.
NULLNo se admiten valores de secuenciación. -
AUTO CDC FROM SNAPSHOTsolo se admite en la interfaz de canalización de Python; no se admite la interfaz SQL.
Recursos adicionales
- Captura de datos cambiantes y instantáneas: obtenga información sobre los conceptos de CDC, las instantáneas y los tipos de SCD.
-
Replicación de una tabla RDBMS externa mediante
AUTO CDC: obtenga información sobre cómo realizar la hidratación inicial con unonceflujo y, a continuación, continuar procesando los cambios. - Temas avanzados de AUTO CDC: obtenga información sobre las operaciones de cambio en los objetivos de AUTO CDC, la lectura de fuentes de datos de cambio y el procesamiento de métricas.
- Tutorial: Compilación de una canalización de ETL mediante la captura de datos modificados