APPLY CHANGES API: simplificación de la captura de datos modificados en Delta Live Tables

Delta Live Tables simplifica la captura de datos modificados (CDC) con la API de APPLY CHANGES. Anteriormente, la MERGE INTO instrucción se usaba normalmente para procesar registros CDC en Azure Databricks. Sin embargo, MERGE INTO puede generar resultados incorrectos debido a registros fuera de secuencia o requerir lógica compleja para volver a ordenar los registros.

Al controlar automáticamente los registros fuera de secuencia, la API de APPLY CHANGES en Delta Live Tables garantiza el procesamiento correcto de los registros CDC y elimina la necesidad de desarrollar lógica compleja para controlar registros fuera de secuencia.

La API APPLY CHANGES se admite en las interfaces SQL y Python de Delta Live Tables, incluida la compatibilidad con la actualización de tablas con el tipo SCD 1 y el tipo 2:

  • Use SCD de tipo 1 para actualizar los registros directamente. No se conserva el historial de los registros que se actualizan.
  • Use el SCD de tipo 2 para conservar un historial de registros, ya sea en todas las actualizaciones o en actualizaciones de un conjunto especificado de columnas.

Para ver la sintaxis y otras referencias, consulte:

Nota:

En este artículo se describe cómo actualizar tablas en la canalización de Delta Live Tables en función de los cambios en los datos de origen. Para aprender a registrar y consultar información de cambio a nivel de fila para las tablas Delta, consulte Uso de la fuente de datos de cambios de Delta Lake en Azure Databricks.

¿Cómo se implementa la CDC con Delta Live Tables?

Se debe especificar una columna en los datos de origen sobre los que se secuenciarán los registros, que Delta Live Tables interpreta como una representación que crece de forma regular de la ordenación adecuada de los datos de origen. Delta Live Tables controla automáticamente los datos que llegan desordenados. Para los cambios de tipo 2 de SCD, Delta Live Tables propaga los valores de secuenciación adecuados en las columnas __START_AT y __END_AT de la tabla de destino. Debería haber una actualización distinta por clave en cada valor de secuenciación y no se admiten valores de secuenciación NULL.

Para realizar el procesamiento de CDC con Delta Live Tables, primero se debe crear una tabla de streaming y, a continuación, usar una instrucción APPLY CHANGES INTO para especificar el origen, las claves y la secuenciación de la fuente de cambios. Para crear la tabla de streaming de destino, use la instrucción CREATE OR REFRESH STREAMING TABLE en SQL o la función create_streaming_table() en Python. Para crear la instrucción que defina el procesamiento de CDC, use la instrucción APPLY CHANGES en SQL o la función apply_changes() en Python. Para obtener más información sobre la sintaxis, consulte Captura de datos modificados con SQL en Delta Live Tables o Captura de datos modificados con Python en Delta Live Tables.

¿Qué objetos de datos se usan para el procesamiento de CDC de Delta Live Tables?

Al declarar la tabla de destino en el metastore de Hive, se crean dos estructuras de datos:

  • Vista con el nombre asignado a la tabla de destino.
  • Tabla de respaldo interna usada por Delta Live Tables para administrar el procesamiento de CDC. Esta tabla se denomina anteponiendo __apply_changes_storage_ al nombre de la tabla de destino.

Por ejemplo, si declara una tabla de destino denominada dlt_cdc_target, verá una vista denominada dlt_cdc_target y una tabla denominada __apply_changes_storage_dlt_cdc_target en el metastore. La creación de una vista permite que Delta Live Tables filtre la información adicional (por ejemplo, los marcadores de exclusión y las versiones) necesaria para manipular los datos desordenados. Para ver los datos procesados, consulte la vista de destino. Dado que el esquema de la tabla __apply_changes_storage_ podría cambiar para admitir futuras características o mejoras, no se debería consultar la tabla para su uso en producción. Si agrega datos manualmente a la tabla, se presume que los registros vendrán antes que otros cambios porque faltan las columnas de versión.

Si una canalización se publica en el catálogo de Unity, las tablas de respaldo internas no serán accesibles para los usuarios.

Obtener datos sobre los registros procesados por una consulta CDC de Delta Live Tables

Las consultas apply changes capturas las siguientes métricas:

  • num_upserted_rows: número de filas de salida insertadas en el conjunto de datos durante una actualización.
  • num_deleted_rows: número de filas de salida existentes eliminadas del conjunto de datos durante una actualización.

La métrica num_output_rows, que es la salida de los flujos que no son CDC, no se captura para las consultas apply changes.

Limitaciones

El destino de la consulta APPLY CHANGES INTO o la función apply_changes no se puede usar como origen de una tabla de streaming. Una tabla que lee del destino de una consulta APPLY CHANGES INTO o función apply_changes debe ser una vista materializada.

SCD tipo 1 y SCD tipo 2 en Azure Databricks

En las secciones siguientes, se proporcionan ejemplos que muestran las consultas SCD de tipo 1 y de tipo 2 de Delta Live Tables que actualizan las tablas de destino en función de los eventos de origen que:

  1. Crean registros de usuario.
  2. Eliminan un registro de usuario.
  3. Actualizan registros de usuario. En el ejemplo de SCD de tipo 1, las últimas UPDATE operaciones llegan tarde y se anulan en la tabla de destino, lo que demuestra el control de eventos desordenados.

En los ejemplos siguientes, se supone que está familiarizado con la configuración y actualización de canalizaciones de Delta Live Tables. Consulte Tutorial: ejecute su primera canalización de Delta Live Tables.

Para ejecutar estos ejemplos, deberá comenzar creando un conjunto de datos de ejemplo. Consulte Generación de datos de prueba.

A continuación, se muestran los registros de entrada de estos ejemplos:

userId name city operation sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancún INSERT 2
123 null null Delete 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Si quita la marca de comentario de la última final de los datos de ejemplo, se insertará el registro siguiente, que especifica dónde se deberían truncar los registros:

userId name city operation sequenceNum
null null null TRUNCATE 3

Nota:

Todos los ejemplos siguientes incluyen opciones para especificar las operaciones DELETE y TRUNCATE, pero cada una de ellas es opcional.

Proceso de actualizaciones de SCD de tipo 1

En el ejemplo de código siguiente, se muestra cómo procesar actualizaciones de SCD de tipo 1:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Después de ejecutar el ejemplo de SCD de tipo 1, la tabla de destino contiene los registros siguientes:

userId name city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancún

Después de ejecutar el ejemplo de SCD tipo 1 con el registro adicional TRUNCATE, los registros 124 y 126 se truncan por la operación TRUNCATE en sequenceNum=3 y la tabla de destino contiene el registro siguiente:

userId name city
125 Mercedes Guadalajara

Proceso de actualizaciones de SCD de tipo 2

En el ejemplo de código siguiente, se muestra cómo procesar actualizaciones de SCD de tipo 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Después de ejecutar el ejemplo de SCD de tipo 2, la tabla de destino contiene los registros siguientes:

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lily Cancún 2 null

Una consulta de tipo 2 de SCD también puede especificar un subconjunto de columnas de salida de las que se va a realizar un seguimiento del historial en la tabla de destino. Los cambios en otras columnas se actualizan en lugar de generar nuevos registros de historial. En el ejemplo siguiente, se muestra la exclusión de la columna city del seguimiento:

En el ejemplo siguiente, se muestra el uso del historial de seguimiento con SCD de tipo 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Después de ejecutar este ejemplo sin el registro TRUNCATE adicional, la tabla de destino contiene los registros siguientes:

userId name city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 null
125 Mercedes Guadalajara 2 null
126 Lily Cancún 2 null

Generación de datos de prueba

El código siguiente se proporciona para generar un conjunto de datos de ejemplo para su uso en las consultas de ejemplo presentes de este tutorial. Suponiendo que tenga las credenciales adecuadas para crear un nuevo esquema y crear una nueva tabla, ejecute estas instrucciones con un cuaderno o Databricks SQL. El código siguiente no se diseñó para ejecutarse como parte de una canalización de Delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Agregar, cambiar o eliminar datos en una tabla de streaming de destino

Si la canalización publica tablas en el catálogo de Unity, use instrucciones de lenguaje de manipulación de datos (DML), incluyendo las instrucciones insert, update, delete y merge, para modificar las tablas de streaming de destino creadas por instrucciones APPLY CHANGES INTO.

Nota:

  • No se admiten instrucciones DML que modifican el esquema de tabla de una tabla de streaming. Asegúrese de que las instrucciones DML no intenten evolucionar el esquema de la tabla.
  • Las instrucciones DML que actualizan una tabla de streaming solo se pueden ejecutar en un clúster compartido de Unity Catalog o en un almacenamiento de SQL mediante Databricks Runtime 13.3 LTS y versiones posteriores.
  • Dado que el streaming requiere orígenes de datos de solo anexión, si el procesamiento requiere streaming desde una tabla de streaming de origen con cambios (por ejemplo, por instrucciones de DML), establezca la marca skipChangeCommits al leer la tabla de streaming de origen. Cuando se establezca skipChangeCommits, se omitirán las transacciones que eliminen o modifiquen registros de la tabla de origen. Si el procesamiento no requiere una tabla de streaming, use una vista materializada (que no tiene la restricción de solo anexión) como tabla de destino.

Dado que Delta Live Tables usa una columna SEQUENCE BY especificada y propaga los valores de secuenciación adecuados a las columnas __START_AT y __END_AT de la tabla de destino (para el SCD de tipo 2), asegúrese de que las instrucciones DML usen valores válidos para estas columnas para mantener el orden adecuado de los registros. Consulte ¿Cómo se implementa la CDC con Delta Live Tables?.

Para obtener más información sobre el uso de instrucciones DML con tablas de streaming, consulte Agregar, cambiar o eliminar datos en una tabla de streaming.

En el ejemplo siguiente, se inserta un registro activo con una secuencia de inicio de 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);