Compartir vía


Uso de la fuente de distribución de datos de cambios de Delta Lake en Azure Databricks

La fuente de distribución de datos modificados permite a Azure Databricks realizar un seguimiento de los cambios de nivel de fila entre las versiones de una tabla Delta. Cuando se habilita en una tabla Delta, el entorno de ejecución registra los eventos de cambio para todos los datos escritos en la tabla. Esto incluye los datos de fila, junto con los metadatos que indican si la fila especificada se ha insertado, eliminado o actualizado.

Importante

La fuente de distribución de datos de cambios funciona en conjunto con el historial de tablas para proporcionar información sobre los cambios. Dado que la clonación de una tabla Delta crea un historial independiente, la fuente de distribución de datos de cambios en las tablas clonadas no coincide con la de la tabla original.

Procesamiento incremental de los datos de cambio

Databricks recomienda usar la fuente de distribución de datos modificados en combinación con el flujo estructurado para procesar incrementalmente los cambios de las tablas Delta. Debe usar el flujo estructurado para Azure Databricks para realizar un seguimiento automático de las versiones de la fuente de datos de cambios de la tabla.

Nota:

Delta Live Tables proporciona funcionalidad para facilitar la propagación de datos modificados y almacenar resultados como tablas SCD (dimensión de variación lenta) tipo 1 o tipo 2. Vea API APPLY CHANGES: simplificación de la captura de datos modificados con Delta Live Tables.

Para leer la fuente de distribución de datos modificados de una tabla, debe habilitar la fuente de distribución de datos modificados en esa tabla. Consulte Habilitación de la fuente de distribución de datos modificados.

Establezca la opción readChangeFeed en true al configurar una secuencia en una tabla para leer la fuente de distribución de datos modificados, como se muestra en el ejemplo de sintaxis siguiente:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

De forma predeterminada, la secuencia devuelve la instantánea más reciente de la tabla cuando la secuencia se inicia por primera vez como una INSERT y cambios futuros como datos modificados.

Las confirmaciones de datos modificados como parte de la transacción de Delta Lake y están disponibles al mismo tiempo que las nuevas confirmaciones de datos en la tabla.

Opcionalmente, puede especificar una versión inicial. Consulte ¿Debo especificar una versión inicial?.

La fuente de distribución de datos modificados también admite la ejecución por lotes, lo que requiere especificar una versión inicial. Consulte Leer cambios en las consultas por lotes.

También se admiten opciones como límites de frecuencia (maxFilesPerTrigger, maxBytesPerTrigger) y excludeRegex al leer los datos modificados.

La limitación de frecuencia puede ser atómica para versiones distintas de la versión de la instantánea inicial. Es decir, se limitará la frecuencia de toda la versión de confirmación o se devolverá toda la confirmación.

¿Debo especificar una versión inicial?

Opcionalmente, puede especificar una versión inicial si desea omitir los cambios que se produjeron antes de una versión determinada. Puede especificar una versión mediante una marca de tiempo o el número de identificador de versión registrado en el registro de transacciones delta.

Nota:

Se requiere una versión inicial para las lecturas por lotes y muchos patrones de lote pueden beneficiarse de establecer una versión final opcional.

Al volver a configurar cargas de trabajo del flujo estructurado que implican la fuente de distribución de datos modificados, es importante comprender cómo especificar una versión inicial afecta al procesamiento.

Muchas cargas de trabajo de streaming, especialmente las nuevas canalizaciones de procesamiento de datos, se benefician del comportamiento predeterminado. Con el comportamiento predeterminado, el primer lote se procesa cuando la secuencia registra primero todos los registros existentes de la tabla como INSERT operaciones en la fuente de distribución de datos modificados.

Si la tabla de destino ya contiene todos los registros con los cambios adecuados hasta un punto determinado, especifique una versión inicial para evitar procesar el estado de la tabla de origen como eventos de INSERT.

La siguiente sintaxis de ejemplo se recupera de un error de streaming en el que el punto de control estaba dañado. En este ejemplo, supongamos las condiciones siguientes:

  1. La fuente de distribución de datos modificados se ha habilitado en la tabla de origen en la creación de la tabla.
  2. La tabla de nivel inferior de destino ha procesado todos los cambios hasta la versión 75 e incluida.
  3. El historial de versiones de la tabla de origen está disponible para las versiones 70 y posteriores.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

En este ejemplo, también debe especificar una nueva ubicación de punto de control.

Importante

Si especifica una versión inicial, la secuencia no se inicia desde un nuevo punto de control si la versión inicial ya no está presente en el historial de tablas. Delta Lake limpia automáticamente las versiones históricas, lo que significa que todas las versiones iniciales especificadas se eliminan finalmente.

Consulte ¿Puedo usar la fuente de distribución de datos de cambios para reproducir todo el historial de una tabla?.

Leer cambios en las consultas por lotes

Puede usar la sintaxis de consulta por lotes para leer todos los cambios a partir de una versión determinada o para leer los cambios dentro de un intervalo de versiones especificado.

Especifique la versión como un entero y la marca de tiempo como una cadena con el formato yyyy-MM-dd[ HH:mm:ss[.SSS]].

Las versiones iniciales y finales son inclusivas en las consultas. Para leer los cambios de una versión inicial determinada a la versión más reciente de la tabla, especifique solo la versión inicial.

Si proporciona una versión anterior o una marca de tiempo anterior a otra que tenga eventos de cambio registrados, es decir, cuando la fuente de distribución de datos modificados estaba habilitada, se produce un error que indica que la fuente de distribución de datos modificados no estaba habilitada.

En los ejemplos de sintaxis siguientes se muestra cómo usar las opciones de versión inicial y final con lecturas por lotes:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Nota:

De forma predeterminada, si un usuario pasa una versión o marca de tiempo que supera la última confirmación de una tabla, se producirá el error timestampGreaterThanLatestCommit. En Databricks Runtime 11.3 LTS y versiones posteriores, la fuente de distribución de datos modificados puede controlar el caso de versión fuera del intervalo, si el usuario establece la siguiente configuración en true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Si proporciona una versión de inicio mayor que la última confirmación en una tabla o una marca de tiempo de inicio más reciente que la última confirmación de una tabla, cuando se habilita la configuración anterior, se devuelve un resultado de lectura vacío.

Si proporciona una versión final mayor que la última confirmación en una tabla o una marca de tiempo final más reciente que la última confirmación de una tabla, cuando la configuración anterior está habilitada en modo de lectura por lotes, se devuelven todos los cambios entre la versión de inicio y la última confirmación.

¿Cuál es el esquema de la fuente de distribución de datos de cambios?

Cuando se lee de la fuente de distribución de datos de cambios de una tabla, se usa el esquema de la versión de tabla más reciente.

Nota:

La mayoría de las operaciones de cambio y evolución de esquemas son totalmente compatibles. La tabla con la asignación de columnas habilitada no admite todos los casos de uso y muestra un comportamiento diferente. Consulte Cambio de las limitaciones de la fuente de distribución de datos para las tablas con la asignación de columnas habilitada.

Además de las columnas de datos del esquema de la tabla Delta, la fuente de distribución de datos de cambios contiene columnas de metadatos que identifican el tipo de evento de cambio:

Nombre de la columna Tipo Valores
_change_type String insert, update_preimage , update_postimage, delete (1)
_commit_version long El registro Delta o la versión de la tabla que contiene el cambio.
_commit_timestamp Timestamp Marca de tiempo asociada al crear la confirmación.

(1) preimage es el valor antes de la actualización, postimage es el valor después de la actualización.

Nota:

No se puede habilitar la fuente de distribución de datos modificados en una tabla si el esquema contiene columnas con los mismos nombres que estas columnas agregadas. Cambie el nombre de las columnas de la tabla para resolver este conflicto antes de intentar habilitar la fuente de distribución de datos de cambios.

Habilitar cambio de fuente de distribución de datos

Solo puede leer la fuente de distribución de datos de cambios para las tablas habilitadas. Debe habilitar de forma explícita la opción de la fuente de distribución de datos modificados mediante uno de los métodos siguientes:

  • Nueva tabla: establezca la propiedad delta.enableChangeDataFeed = true de la tabla en el comando CREATE TABLE.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tabla existente: establezca la propiedad delta.enableChangeDataFeed = true de la tabla en el comando ALTER TABLE.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Todas las tablas nuevas:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Importante

Solo se registran los cambios realizados después de habilitar la fuente de distribución de datos de cambios. Los cambios pasados en una tabla no se capturan.

Almacenamiento de datos modificados

La habilitación de la fuente de distribución de datos modificados provoca un pequeño aumento de los costos de almacenamiento de una tabla. Los registros de datos modificados se generan a medida que se ejecuta la consulta y, por lo general, son mucho más pequeños que el tamaño total de los archivos reescritos.

Azure Databricks registra los datos modificados de las operaciones UPDATE, DELETE y MERGE en la carpeta _change_data en el directorio de la tabla. Algunas operaciones, como las operaciones de solo inserción y las eliminaciones de particiones completas, no generan datos en el directorio _change_data porque Azure Databricks puede calcular eficazmente la fuente de distribución de datos de cambios directamente desde el registro de transacciones.

Todas las lecturas de los archivos de datos de la carpeta _change_data deben pasar por las API de Delta Lake compatibles.

Los archivos de la carpeta _change_data siguen la directiva de retención de la tabla. Los datos de fuente de distribución de datos modificados se eliminan cuando se ejecuta el comando VACUUM.

¿Puedo usar la fuente de distribución de datos modificados para reproducir todo el historial de una tabla?

La fuente de distribución de datos modificados no está pensada para servir como un registro permanente de todos los cambios en una tabla. La fuente de distribución de datos modificado solo registra los cambios que se producen después de habilitarla’.

La fuente de distribución de datos modificados y Delta Lake le permiten reconstruir siempre una instantánea completa de una tabla de origen, lo que significa que puede iniciar una nueva lectura de streaming en una tabla con la fuente de distribución de datos modificado habilitada y capturar la versión actual de esa tabla y todos los cambios que se producen después.

Debe tratar los registros de la fuente de distribución de datos modificados como transitorios y solo accesibles para una ventana de retención especificada. El registro de transacciones delta quita las versiones de tabla y sus correspondientes versiones de fuente de distribución de datos modificados a intervalos regulares. Cuando se quita una versión del registro de transacciones, ya no se puede leer la fuente de distribución de datos de cambios de esa versión.

Si el caso de uso requiere mantener un historial permanente de todos los cambios en una tabla, debe usar lógica incremental para escribir registros de la fuente de distribución de datos modificados en una nueva tabla. En el ejemplo de código siguiente se muestra el uso de trigger.AvailableNow, que aprovecha el procesamiento incremental de Structured Streaming, pero procesa los datos disponibles como una carga de trabajo por lotes. Puede programar esta carga de trabajo de forma asincrónica con las canalizaciones de procesamiento principales para crear una copia de seguridad de la fuente de datos de cambios con fines de auditoría o capacidad de reproducción completa.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Cambio de las limitaciones de la fuente de distribución de datos para las tablas con la asignación de columnas habilitada

Con la asignación de columnas habilitada en una tabla Delta, puede quitar o cambiar el nombre de las columnas de la tabla sin volver a escribir los archivos de datos para los datos existentes. Si la asignación de columnas está habilitada, la fuente de distribución de datos de cambios tiene limitaciones después de realizar cambios de esquema no aditivos, como cambiar el nombre o quitar una columna, cambiar el tipo de datos o cambiar la nulabilidad.

Importante

  • No se puede leer la fuente de distribución de datos de cambios para una transacción o intervalo en el que se produce un cambio de esquema no aditivo utilizando la semántica de lotes.
  • En Databricks Runtime 12.2 LTS y versiones posteriores, las tablas con la asignación de columnas habilitada que hayan experimentado cambios de esquema no aditivos no admiten lecturas de streaming en la fuente de distribución de datos modificados. Consulte Streaming con asignación de columnas y cambios de esquema.
  • En Databricks Runtime 11.3 LTS y versiones posteriores, no se puede leer la fuente de distribución de datos de cambios para las tablas con la asignación de columnas habilitada que hayan experimentado un cambio de nombre o una eliminación de columnas.

En Databricks Runtime 12.2 LTS y versiones posteriores, puede realizar lecturas por lotes en la fuente de distribución de datos de cambios para tablas con la asignación de columnas habilitada que hayan experimentado cambios de esquema no aditivos. En lugar de usar el esquema de la versión más reciente de la tabla, las operaciones de lectura usan el esquema de la versión final de la tabla especificada en la consulta. Todavía se produce un error en las consultas si el intervalo de versión especificado abarca un cambio de esquema no aditivo.