API APPLY CHANGES: simplificar la captura de datos modificados con Delta Live Tables
Delta Live Tables simplifica la captura de datos modificados (CDC) con las API APPLY CHANGES
y APPLY CHANGES FROM SNAPSHOT
. La interfaz que use depende del origen de los datos modificados:
- Use
APPLY CHANGES
para procesar los cambios de una fuente de datos de cambios (CDF). - Use
APPLY CHANGES FROM SNAPSHOT
(versión preliminar pública) para procesar los cambios en las instantáneas de base de datos.
Anteriormente, la MERGE INTO
instrucción se usaba normalmente para procesar registros CDC en Azure Databricks. Sin embargo, MERGE INTO
puede generar resultados incorrectos debido a registros fuera de secuencia o requerir lógica compleja para volver a ordenar los registros.
La API APPLY CHANGES
se admite en el SQL de Delta Live Tables y en las interfaces de Python. La API APPLY CHANGES FROM SNAPSHOT
se admite en la interfaz de Python de Delta Live Tables.
Tanto APPLY CHANGES
como APPLY CHANGES FROM SNAPSHOT
admiten la actualización de tablas mediante el SCD de tipo 1 y tipo 2:
- Use SCD de tipo 1 para actualizar los registros directamente. El historial no se conserva para los registros actualizados.
- Use el SCD de tipo 2 para conservar un historial de registros, ya sea en todas las actualizaciones o en actualizaciones de un conjunto especificado de columnas.
Para ver la sintaxis y otras referencias, consulte:
- Captura de datos modificados de una fuente de cambios con Python en Delta Live Tables
- Captura de datos modificados con SQL en Delta Live Tables
Nota:
En este artículo se describe cómo actualizar tablas en la canalización de Delta Live Tables en función de los cambios en los datos de origen. Para aprender a registrar y consultar información de cambio a nivel de fila para las tablas Delta, consulte Uso de la fuente de datos de cambios de Delta Lake en Azure Databricks.
Requisitos
Para usar las API CDC, la canalización debe configurarse para usar canalizaciones DLT sin servidor o Delta Live Tables Pro
o Advanced
ediciones.
¿Cómo se implementa la CDC con la API APPLY CHANGES
?
Al controlar automáticamente los registros fuera de secuencia, la API de APPLY CHANGES
en Delta Live Tables garantiza el procesamiento correcto de los registros CDC y elimina la necesidad de desarrollar lógica compleja para controlar registros fuera de secuencia. Se debe especificar una columna en los datos de origen sobre los que se secuenciarán los registros, que Delta Live Tables interpreta como una representación que crece de forma regular de la ordenación adecuada de los datos de origen. Delta Live Tables controla automáticamente los datos que llegan desordenados. Para los cambios de SCD de tipo 2, Delta Live Tables propaga los valores de secuenciación adecuados a las columnas de __START_AT
y __END_AT
de la tabla de destino. Debería haber una actualización distinta por clave en cada valor de secuenciación y no se admiten valores de secuenciación NULL.
Para realizar el procesamiento de CDC con APPLY CHANGES
, primero debe crear una tabla de streaming y, a continuación, usar la instrucción APPLY CHANGES INTO
en SQL o la función apply_changes()
de Python para especificar el origen, las claves y la secuenciación de la fuente de cambios. Para crear la tabla de streaming de destino, use la instrucción CREATE OR REFRESH STREAMING TABLE
en SQL o la función create_streaming_table()
en Python. Consulte los ejemplos de procesamiento de SCD de tipo 1 y tipo 2.
Para obtener más información sobre la sintaxis, consulte la referencia de SQL o la referencia de Python de Delta Live Tables.
¿Cómo se implementa la CDC con la API APPLY CHANGES FROM SNAPSHOT
?
Importante
La API APPLY CHANGES FROM SNAPSHOT
está en versión preliminar pública.
APPLY CHANGES FROM SNAPSHOT
es una API declarativa que determina eficazmente los cambios en los datos de origen comparando una serie de instantáneas en orden y, a continuación, ejecuta el procesamiento necesario para el procesamiento CDC de los registros en las instantáneas. APPLY CHANGES FROM SNAPSHOT
solo es compatible con la interfaz de Python de Delta Live Tables.
APPLY CHANGES FROM SNAPSHOT
admite la ingesta de instantáneas de varios tipos de origen:
- Use la ingesta periódica de instantáneas para ingerir instantáneas de una tabla o vista existente.
APPLY CHANGES FROM SNAPSHOT
tiene una interfaz sencilla y simplificada para admitir la ingesta periódica de instantáneas desde un objeto de base de datos existente. Se ingiere una nueva instantánea con cada actualización de canalización y el tiempo de ingesta se usa como versión de instantánea. Cuando se ejecuta una canalización en modo continuo, se ingieren varias instantáneas con cada actualización de la canalización en un período determinado por la configuración del intervalo de desencadenador para el flujo que contiene el procesamiento APPLY CHANGES FROM SNAPSHOT. - Use la ingesta de instantáneas históricas para procesar archivos que contienen instantáneas de base de datos, como instantáneas generadas a partir de una base de datos Oracle o MySQL o un almacenamiento de datos.
Para realizar el procesamiento de CDC desde cualquier tipo de origen con APPLY CHANGES FROM SNAPSHOT
, primero debe crear una tabla de streaming y, a continuación, usar la función apply_changes_from_snapshot()
en Python para especificar la instantánea, las claves y otros argumentos necesarios para implementar el procesamiento. Consulte los ejemplos de ingesta de instantáneas periódicas e ingesta de instantáneas históricas.
Las instantáneas pasadas a la API deben estar en orden ascendente por versión. Si Delta Live Tables detecta una instantánea desordenada, se produce un error.
Para obtener más información sobre la sintaxis, consulte la referencia de Python de Delta Live Tables.
Limitaciones
La columna utilizada para la secuenciación debe ser un tipo de datos ordenable.
Ejemplo: procesamiento de SCD de tipo 1 y SCD de tipo 2 con datos de origen de CDF
En las secciones siguientes, se proporcionan ejemplos de consultas SCD de tipo 1 y de tipo 2 de Delta Live Tables que actualizan las tablas de destino en función de los eventos de origen de una fuente de datos de cambios que:
- Crea nuevos registros de usuario.
- Elimina un registro de usuario.
- Actualiza registros de usuario. En el ejemplo de SCD de tipo 1, las últimas
UPDATE
operaciones llegan tarde y se anulan en la tabla de destino, lo que demuestra el control de eventos desordenados.
En los ejemplos siguientes, se supone que está familiarizado con la configuración y actualización de canalizaciones de Delta Live Tables. Consulte Tutorial: ejecute su primera canalización de Delta Live Tables.
Para ejecutar estos ejemplos, deberá comenzar creando un conjunto de datos de ejemplo. Consulte Generación de datos de prueba.
A continuación, se muestran los registros de entrada de estos ejemplos:
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancún | INSERT | 2 |
123 | null | null | Delete | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Si quita la marca de comentario de la última final de los datos de ejemplo, se insertará el registro siguiente, que especifica dónde se deberían truncar los registros:
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
Nota:
Todos los ejemplos siguientes incluyen opciones para especificar las operaciones DELETE
y TRUNCATE
, pero cada una es opcional.
Proceso de actualizaciones de SCD de tipo 1
En el ejemplo siguiente, se muestra cómo procesar actualizaciones de SCD de tipo 1:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Después de ejecutar el ejemplo de SCD de tipo 1, la tabla de destino contiene los registros siguientes:
userId | name | city |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancún |
Después de ejecutar el ejemplo de SCD tipo 1 con el registro adicional TRUNCATE
, los registros 124
y 126
se truncan por la operación TRUNCATE
en sequenceNum=3
y la tabla de destino contiene el registro siguiente:
userId | name | city |
---|---|---|
125 | Mercedes | Guadalajara |
Proceso de actualizaciones de SCD de tipo 2
En el ejemplo siguiente, se muestra cómo procesar actualizaciones de SCD de tipo 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Después de ejecutar el ejemplo de SCD de tipo 2, la tabla de destino contiene los registros siguientes:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lily | Cancún | 2 | null |
Una consulta de tipo 2 de SCD también puede especificar un subconjunto de columnas de salida de las que se va a realizar un seguimiento del historial en la tabla de destino. Los cambios en otras columnas se actualizan en lugar de generar nuevos registros de historial. En el ejemplo siguiente, se muestra la exclusión de la columna city
del seguimiento:
En el ejemplo siguiente, se muestra el uso del historial de seguimiento con SCD de tipo 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Después de ejecutar este ejemplo sin el registro TRUNCATE
adicional, la tabla de destino contiene los registros siguientes:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lily | Cancún | 2 | null |
Generación de datos de prueba
El código siguiente se proporciona para generar un conjunto de datos de ejemplo para su uso en las consultas de ejemplo presentes de este tutorial. Suponiendo que tenga las credenciales adecuadas para crear un nuevo esquema y crear una nueva tabla, ejecute estas instrucciones con un cuaderno o Databricks SQL. El código siguiente no se diseñó para ejecutarse como parte de una canalización de Delta Live Tables:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Ejemplo: procesamiento periódico de instantáneas
En el ejemplo siguiente se muestra el procesamiento de SCD de tipo 2 que ingiere instantáneas de una tabla almacenada en mycatalog.myschema.mytable
. Los resultados del procesamiento se escriben en una tabla denominada target
.
mycatalog.myschema.mytable
registros en la marca de tiempo 2024-01-01 00:00:00
Key | Value |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
registros en la marca de tiempo 2024-01-01 12:00:00
Key | Value |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Después de procesar las instantáneas, la tabla de destino contiene los siguientes registros:
Key | Value | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | nulo |
3 | a3 | 2024-01-01 12:00:00 | nulo |
Ejemplo: procesamiento histórico de instantáneas
En el ejemplo siguiente se muestra el procesamiento de SCD de tipo 2 que actualiza una tabla de destino basada en eventos de origen de dos instantáneas almacenadas en un sistema de almacenamiento en la nube:
Instantánea en timestamp
, almacenada en /<PATH>/filename1.csv
Clave | TrackingColumn | NonTrackingColumn |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
Instantánea en timestamp + 5
, almacenada en /<PATH>/filename2.csv
Clave | TrackingColumn | NonTrackingColumn |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
En el ejemplo de código siguiente, se muestra cómo procesar actualizaciones de SCD de tipo 2 con estas instantáneas:
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
Después de procesar las instantáneas, la tabla de destino contiene los siguientes registros:
Clave | TrackingColumn | NonTrackingColumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | null |
3 | a3 | b3 | 2 | null |
4 | a4 | b4_new | 1 | nulo |
Agregar, cambiar o eliminar datos en una tabla de streaming de destino
Si la canalización publica tablas en el catálogo de Unity, use instrucciones de lenguaje de manipulación de datos (DML), incluyendo las instrucciones insert, update, delete y merge, para modificar las tablas de streaming de destino creadas por instrucciones APPLY CHANGES INTO
.
Nota:
- No se admiten instrucciones DML que modifican el esquema de tabla de una tabla de streaming. Asegúrese de que las instrucciones DML no intenten evolucionar el esquema de la tabla.
- Las instrucciones DML que actualizan una tabla de streaming solo se pueden ejecutar en un clúster compartido de Unity Catalog o en un almacenamiento de SQL mediante Databricks Runtime 13.3 LTS y versiones posteriores.
- Dado que el streaming requiere orígenes de datos de solo anexión, si el procesamiento requiere streaming desde una tabla de streaming de origen con cambios (por ejemplo, por instrucciones de DML), establezca la marca skipChangeCommits al leer la tabla de streaming de origen. Cuando se establezca
skipChangeCommits
, se omitirán las transacciones que eliminen o modifiquen registros de la tabla de origen. Si el procesamiento no requiere una tabla de streaming, use una vista materializada (que no tiene la restricción de solo anexión) como tabla de destino.
Dado que Delta Live Tables usa una columna SEQUENCE BY
especificada y propaga los valores de secuenciación adecuados a las columnas __START_AT
y __END_AT
de la tabla de destino (para el SCD de tipo 2), asegúrese de que las instrucciones DML usen valores válidos para estas columnas para mantener el orden adecuado de los registros. Consulte ¿Cómo se implementa CDC con la API APPLY CHANGES?.
Para obtener más información sobre el uso de instrucciones DML con tablas de streaming, consulte Agregar, cambiar o eliminar datos en una tabla de streaming.
En el ejemplo siguiente, se inserta un registro activo con una secuencia de inicio de 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Leer una fuente de distribución de datos modificados de una tabla de destino APPLY CHANGES
En Databricks Runtime 15.2 y versiones posteriores, puede leer una fuente de distribución de datos de cambios de una tabla de streaming que sea el destino de las consultas APPLY CHANGES
o APPLY CHANGES FROM SNAPSHOT
de la misma manera que leer una fuente de distribución de datos modificados de otras tablas Delta. A continuación se requieren para leer la fuente de distribución de datos modificados de una tabla de streaming de destino:
- La tabla de streaming de destino debe publicarse en el catálogo de Unity. Consulte Utiliza el Catálogo Unity con tus canalizaciones de Tablas Delta Live.
- Para leer la fuente de distribución de datos modificados de la tabla de streaming de destino, debe usar Databricks Runtime 15.2 o superior. Para leer la fuente de distribución de datos modificados en otra canalización de Delta Live Tables, la canalización debe configurarse para usar Databricks Runtime 15.2 o superior.
Puede leer la fuente de distribución de datos modificados de una tabla de streaming de destino que se creó en una canalización de Delta Live Tables de la misma manera que leer una fuente de distribución de datos modificados de otras tablas Delta. Para obtener más información sobre el uso de la funcionalidad de fuente de distribución de datos de cambios de Delta, incluidos ejemplos en Python y SQL, consulte Uso de la fuente de datos de cambios de Delta Lake en Azure Databricks.
Nota:
El registro de fuente de distribución de datos modificados incluye metadatos que identifican el tipo de evento de cambio. Cuando se actualiza un registro en una tabla, los metadatos de los registros de cambios asociados suelen incluir valores _change_type
establecidos en eventos update_preimage
y update_postimage
.
Sin embargo, los valores _change_type
son diferentes si se realizan actualizaciones en la tabla de streaming de destino que incluyen el cambio de valores de clave principal. Cuando los cambios incluyen actualizaciones de las claves principales, los campos de metadatos _change_type
se establecen en eventos insert
y delete
. Los cambios en las claves principales pueden producirse cuando se realizan actualizaciones manuales en uno de los campos clave con una instrucción UPDATE
o MERGE
o, para las tablas de tipo SCD 2, cuando el campo __start_at
cambia para reflejar un valor de secuencia de inicio anterior.
La consulta APPLY CHANGES
determina los valores de clave principal, que difieren para el procesamiento de tipo 1 y SCD 2:
- Para el procesamiento del tipo 1 de SCD y la interfaz de Python Delta Live Tables, la clave principal es el valor del parámetro
keys
en la funciónapply_changes()
. Para la interfaz SQL de Delta Live Tables, la clave principal es las columnas definidas por la cláusulaKEYS
en la instrucciónAPPLY CHANGES INTO
. - Para el tipo SCD 2, la clave principal es el parámetro
keys
o cláusulaKEYS
más el valor devuelto de la operacióncoalesce(__START_AT, __END_AT)
, donde__START_AT
y__END_AT
son las columnas correspondientes de la tabla de streaming de destino.
Obtener datos sobre los registros procesados por una consulta CDC de Delta Live Tables
Nota:
Las siguientes métricas solo se capturan mediante consultas APPLY CHANGES
y no mediante consultas APPLY CHANGES FROM SNAPSHOT
.
Las consultas APPLY CHANGES
capturas las siguientes métricas:
num_upserted_rows
: número de filas de salida insertadas en el conjunto de datos durante una actualización.num_deleted_rows
: número de filas de salida existentes eliminadas del conjunto de datos durante una actualización.
La métrica num_output_rows
, que es la salida de los flujos que no son CDC, no se captura para las consultas apply changes
.
¿Qué objetos de datos se usan para el procesamiento de CDC de Delta Live Tables?
Nota: las siguientes estructuras de datos solo se aplican al procesamiento de APPLY CHANGES
, no al procesamiento de APPLY CHANGES FROM SNAPSHOT
.
Al declarar la tabla de destino en el metastore de Hive, se crean dos estructuras de datos:
- Vista con el nombre asignado a la tabla de destino.
- Tabla de respaldo interna usada por Delta Live Tables para administrar el procesamiento de CDC. Esta tabla se denomina anteponiendo
__apply_changes_storage_
al nombre de la tabla de destino.
Por ejemplo, si declara una tabla de destino denominada dlt_cdc_target
, verá una vista denominada dlt_cdc_target
y una tabla denominada __apply_changes_storage_dlt_cdc_target
en el metastore. La creación de una vista permite que Delta Live Tables filtre la información adicional (por ejemplo, los marcadores de exclusión y las versiones) necesaria para manipular los datos desordenados. Para ver los datos procesados, consulte la vista de destino. Dado que el esquema de la tabla __apply_changes_storage_
podría cambiar para admitir futuras características o mejoras, no se debería consultar la tabla para su uso en producción. Si agrega datos manualmente a la tabla, se presume que los registros vendrán antes que otros cambios porque faltan las columnas de versión.
Si una canalización se publica en el catálogo de Unity, las tablas de respaldo internas no serán accesibles para los usuarios.