Uso de la fuente de distribución de datos de cambios de Delta Lake en Azure Databricks

Nota:

La fuente de distribución de datos modificados permite a Azure Databricks realizar un seguimiento de los cambios de nivel de fila entre las versiones de una tabla Delta. Cuando se habilita en una tabla Delta, el entorno de ejecución registra los eventos de cambio para todos los datos escritos en la tabla. Esto incluye los datos de fila, junto con los metadatos que indican si la fila especificada se ha insertado, eliminado o actualizado.

Puede leer los eventos de cambio en consultas por lotes mediante Spark SQL, Apache Spark DataFrames y Structured Streaming.

Importante

La fuente de distribución de datos de cambios funciona en conjunto con el historial de tablas para proporcionar información sobre los cambios. Dado que la clonación de una tabla Delta crea un historial independiente, la fuente de distribución de datos de cambios en las tablas clonadas no coincide con la de la tabla original.

Casos de uso

La fuente de distribución de datos modificados no está habilitada de forma predeterminada. Los casos de uso siguientes deben regir cuándo se habilita la fuente de distribución de datos modificados.

  • Tablas Silver y Gold: mejore el rendimiento de Delta Lake con el procesamiento exclusivo de los cambios de nivel de fila tras las operaciones iniciales MERGE, UPDATE o DELETE para acelerar y simplificar la extracción, transformación y carga de datos (ETL) y las operaciones correspondientes.
  • Vistas materializadas: cree vistas agregadas actualizadas de información para su uso en la inteligencia empresarial (BI) y los análisis sin tener que volver a procesar todas las tablas subyacentes ya que, en su lugar, se actualiza solo cuando se producen cambios.
  • Transmisión de cambios: envíe una fuente de distribución de datos modificados a sistemas descendentes, como Kafka o RDBMS, que puedan usarla para procesar de forma incremental en fases posteriores de las canalizaciones de datos.
  • Tabla de registro de auditoría: capturar la fuente de distribución de datos modificados como tabla Delta proporciona almacenamiento perpetuo y una funcionalidad de consulta eficaz para ver todos los cambios realizados a lo largo del tiempo, entre ellos cuándo se producen las eliminaciones y qué actualizaciones se han realizado.

Habilitar cambio de fuente de distribución de datos

Debe habilitar de forma explícita la opción de la fuente de distribución de datos modificados mediante uno de los métodos siguientes:

  • Nueva tabla: establezca la propiedad delta.enableChangeDataFeed = true de la tabla en el comando CREATE TABLE.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tabla existente: establezca la propiedad delta.enableChangeDataFeed = true de la tabla en el comando ALTER TABLE.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Todas las tablas nuevas:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Importante

Solo se registran los cambios realizados después de habilitar la fuente de distribución de datos modificados. Los cambios anteriores de una tabla no se capturan.

Almacenamiento de datos modificados

Azure Databricks registra los datos modificados de las operaciones UPDATE, DELETE y MERGE en la carpeta _change_data en el directorio de la tabla. Algunas operaciones, como operaciones de solo inserción y eliminaciones de particiones completas, no generan datos en el directorio _change_data porque Azure Databricks puede calcular eficazmente la fuente de distribución de datos modificados directamente desde el registro de transacciones.

Los archivos de la carpeta _change_data siguen la directiva de retención de la tabla. Por lo tanto, si ejecuta el comando VACUUM, también se eliminan los datos de la fuente de distribución de datos modificados.

Lectura de cambios en las consultas por lotes

Puede proporcionar la versión o la marca de tiempo para el inicio y el final. Las versiones inicial y final y las marcas de tiempo son inclusivas en las consultas. Para leer los cambios desde una versión inicial concreta a la última versión de la tabla, especifique solo la versión inicial o la marca de tiempo.

Especifique la versión como un entero y la marca de tiempo como una cadena con el formato yyyy-MM-dd[ HH:mm:ss[.SSS]].

Si proporciona una versión anterior o una marca de tiempo anterior a otra que tenga eventos de cambio registrados, es decir, cuando la fuente de distribución de datos modificados estaba habilitada, se produce un error que indica que la fuente de distribución de datos modificados no estaba habilitada.

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

Lectura de cambios en las consultas de streaming

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Para obtener los datos modificados al leer la tabla, establezca la opción readChangeFeed en true. Los valores startingVersion o startingTimestamp son opcionales y, si no se proporcionan, la secuencia devuelve la última instantánea de la tabla en el momento de hacer streaming como INSERT y los cambios futuros como datos modificados. También se admiten opciones como límites de frecuencia (maxFilesPerTrigger, maxBytesPerTrigger) y excludeRegex al leer los datos modificados.

Nota:

La limitación de frecuencia puede ser atómica para versiones distintas de la versión de la instantánea inicial. Es decir, se limitará la frecuencia de toda la versión de confirmación o se devolverá toda la confirmación.

De forma predeterminada, si un usuario pasa una versión o marca de tiempo que supera la última confirmación de una tabla, se producirá el error timestampGreaterThanLatestCommit. En Databricks Runtime 11.3 LTS y versiones posteriores, la fuente de distribución de datos modificados puede controlar el caso de versión fuera del intervalo, si el usuario establece la siguiente configuración en true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Si proporciona una versión de inicio mayor que la última confirmación en una tabla o una marca de tiempo de inicio más reciente que la última confirmación de una tabla, cuando se habilita la configuración anterior, se devuelve un resultado de lectura vacío.

Si proporciona una versión final mayor que la última confirmación en una tabla o una marca de tiempo final más reciente que la última confirmación de una tabla, cuando la configuración anterior está habilitada en modo de lectura por lotes, se devuelven todos los cambios entre la versión de inicio y la última confirmación.

¿Cuál es el esquema de la fuente de distribución de datos de cambios?

Cuando se lee de la fuente de distribución de datos de cambios de una tabla, se usa el esquema de la versión de tabla más reciente.

Nota:

La mayoría de las operaciones de cambio y evolución de esquemas son totalmente compatibles. La tabla con la asignación de columnas habilitada no admite todos los casos de uso y muestra un comportamiento diferente. Consulte Cambio de las limitaciones de la fuente de distribución de datos para las tablas con la asignación de columnas habilitada.

Además de las columnas de datos del esquema de la tabla Delta, la fuente de distribución de datos de cambios contiene columnas de metadatos que identifican el tipo de evento de cambio:

Nombre de la columna Tipo Valores
_change_type String insert, update_preimage, update_postimage, delete(1)
_commit_version long El registro Delta o la versión de la tabla que contiene el cambio.
_commit_timestamp Timestamp Marca de tiempo asociada al crear la confirmación.

(1)preimage el valor antes de la actualización, postimage es el valor después de la actualización.

Nota:

No se puede habilitar la fuente de distribución de datos modificados en una tabla si el esquema contiene columnas con los mismos nombres que estas columnas agregadas. Cambie el nombre de las columnas de la tabla para resolver este conflicto antes de intentar habilitar la fuente de distribución de datos de cambios.

Cambio de las limitaciones de la fuente de distribución de datos para las tablas con la asignación de columnas habilitada

Con la asignación de columnas habilitada en una tabla Delta, puede quitar o cambiar el nombre de las columnas de la tabla sin volver a escribir los archivos de datos para los datos existentes. Si la asignación de columnas está habilitada, la fuente de distribución de datos de cambios tiene limitaciones después de realizar cambios de esquema no aditivos, como cambiar el nombre o quitar una columna, cambiar el tipo de datos o cambiar la nulabilidad.

Importante

  • No se puede leer la fuente de distribución de datos de cambios para una transacción o intervalo en el que se produce un cambio de esquema no aditivo utilizando la semántica de lotes.
  • En Databricks Runtime 12.2 LTS y versiones posteriores, las tablas con la asignación de columnas habilitada que hayan experimentado cambios de esquema no aditivos no admiten lecturas de streaming en la fuente de distribución de datos modificados. Consulte Streaming con asignación de columnas y cambios de esquema.
  • En Databricks Runtime 11.3 LTS y versiones posteriores, no se puede leer la fuente de distribución de datos de cambios para las tablas con la asignación de columnas habilitada que hayan experimentado un cambio de nombre o una eliminación de columnas.

En Databricks Runtime 12.2 LTS y versiones posteriores, puede realizar lecturas por lotes en la fuente de distribución de datos de cambios para tablas con la asignación de columnas habilitada que hayan experimentado cambios de esquema no aditivos. En lugar de usar el esquema de la versión más reciente de la tabla, las operaciones de lectura usan el esquema de la versión final de la tabla especificada en la consulta. Todavía se produce un error en las consultas si el intervalo de versión especificado abarca un cambio de esquema no aditivo.

Preguntas más frecuentes

¿En qué medida sobrecarga habilitar la fuente de distribución de datos modificados?

No hay ningún impacto significativo. Los registros de datos modificados se generan en línea durante el proceso de ejecución de consultas y, por lo general, su tamaño es mucho menor que el tamaño total de los archivos reescritos.

¿Cuál es la directiva de retención para los registros de cambios?

Los registros de cambios siguen la misma directiva de retención que las versiones de tabla no actualizadas y se limpiarán mediante VACUUM si están fuera del período de retención especificado.

¿Cuándo están disponibles los nuevos registros en la fuente de distribución de datos modificados?

Los datos de cambios se confirman junto con la transacción de Delta Lake y estarán disponibles al mismo tiempo que los nuevos datos estén disponibles en la tabla.

Ejemplo de cuaderno: Propagación de cambios con la fuente de distribución de datos de cambios Delta

Este cuaderno se muestra cómo propagar los cambios realizados a una tabla Silver del número absoluto de vacunaciones a una tabla Gold de tasas de vacunación.

Cuaderno de fuente de distribución de datos modificados

Obtener el cuaderno