Compartir vía


create_auto_cdc_flow

La función create_auto_cdc_flow() crea un flujo que utiliza la funcionalidad de captura de datos modificados (CDC) de Lakeflow Spark Declarative Pipelines para procesar datos de origen de un flujo de datos de cambios (CDF).

Nota:

Esta función reemplaza a la función apply_changes()anterior . Las dos funciones tienen la misma firma. Databricks recomienda actualizar para usar el nuevo nombre.

Importante

Debe declarar una tabla de streaming de destino para aplicar los cambios. Opcionalmente, puede especificar el esquema de la tabla de destino. Al especificar el esquema de la create_auto_cdc_flow() tabla de destino, debe incluir las __START_AT columnas y __END_AT con el mismo tipo de datos que los sequence_by campos.

Para crear la tabla de destino necesaria, puede usar la función create_streaming_table() en la interfaz de Python de canalización.

Syntax

from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = <bool>,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None,
  name = None,
  once = <bool>
)

Para el procesamiento de create_auto_cdc_flow, el comportamiento predeterminado para los eventos de INSERT y UPDATE es upsert eventos CDC desde el origen: actualizar las filas de la tabla de destino que coincidan con las claves especificadas o insertar una nueva fila cuando no exista un registro coincidente en la tabla de destino. El control de DELETE eventos se puede especificar con el apply_as_deletes parámetro .

Para más información sobre el procesamiento CDC con un flujo de cambios, consulte Las API AUTO CDC: Simplificación de la captura de cambios de datos con canalizaciones. Para un ejemplo de uso de la función create_auto_cdc_flow(), consulte Ejemplo: procesamiento de SCD tipo 1 y SCD tipo 2 con datos de origen de CDF.

Parámetros

Parámetro Tipo Description
target str Obligatorio. Nombre de la tabla que se va a actualizar. Puede usar la función create_streaming_table() para crear la tabla de destino antes de ejecutar la create_auto_cdc_flow() función.
source str Obligatorio. Origen de datos que contiene registros CDC.
keys list Obligatorio. Columna o combinación de columnas que identifican de forma única una fila en los datos de origen. Se usa para identificar qué eventos CDC se aplican a registros específicos de la tabla de destino. Puede especificar cualquiera de las dos opciones:
  • Una lista de cadenas: ["userId", "orderId"]
  • Lista de funciones de Spark SQL col() : [col("userId"), col("orderId")]. Los argumentos a las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId), pero no puede usar col(source.userId).
sequence_by str, col() o struct() Obligatorio. Nombres de columna que especifican el orden lógico de los eventos CDC en los datos de origen. Las canalizaciones declarativas de Spark de Lakeflow usan esta secuenciación para controlar los eventos de cambio que llegan fuera de orden. La columna especificada debe ser un tipo de datos ordenable. Puede especificar cualquiera de las dos opciones:
  • Una cadena: "sequenceNum"
  • Una función de Spark SQL col() : col("sequenceNum"). Los argumentos a las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId), pero no puede usar col(source.userId).
  • Combinación struct() de varias columnas para romper los vínculos: struct("timestamp_col", "id_col"), ordenará primero por el primer campo de estructura, después por el segundo campo si hay un empate, etc.
ignore_null_updates bool Permitir la ingesta de actualizaciones que contienen un subconjunto de las columnas de destino. Cuando un evento CDC coincide con una fila existente y ignore_null_updates es True, las columnas con un null conservan sus valores existentes en el destino. Esto también se aplica a las columnas anidadas con un valor de null. Cuando ignore_null_updates es False, los valores existentes se sobrescriben con null valores.
El valor predeterminado es False.
apply_as_deletes str o expr() Especifica cuándo se debe tratar un evento de CDC como un DELETE en lugar de un upsert. Puede especificar cualquiera de las dos opciones:
  • Una cadena: "Operation = 'DELETE'"
  • Una función sql expr() de Spark: expr("Operation = 'DELETE'")

Para controlar los datos desordenados, la fila eliminada se conserva temporalmente como una lápida en la tabla Delta subyacente y se crea una vista en el metastore que filtra estas lápidas. El intervalo de retención tiene como valor predeterminado dos días y se puede configurar con la pipelines.cdc.tombstoneGCThresholdInSeconds propiedad table.
apply_as_truncates str o expr() Especifica cuándo se debe tratar un evento CDC como una tabla TRUNCATEcompleta. Puede especificar cualquiera de las dos opciones:
  • Una cadena: "Operation = 'TRUNCATE'"
  • Una función sql expr() de Spark: expr("Operation = 'TRUNCATE'")

Dado que esta cláusula desencadena un truncamiento completo de la tabla de destino, solo se debe usar para casos de uso específicos que requieren esta funcionalidad. El apply_as_truncates parámetro solo se admite para el tipo SCD 1. El tipo SCD 2 no admite operaciones de truncamiento.
column_list o except_column_list list Subconjunto de columnas que se van a incluir en la tabla de destino. Use column_list para especificar la lista completa de columnas que se van a incluir. Use except_column_list para especificar las columnas que se van a excluir. Puede declarar cualquier valor como una lista de cadenas o como funciones de Spark SQL col() :
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Los argumentos a las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId), pero no puede usar col(source.userId). El valor predeterminado es incluir todas las columnas de la tabla de destino cuando no se pasa ningún column_list argumento o except_column_list a la función .
stored_as_scd_type str o int Si se van a almacenar registros como tipo SCD 1 o SCD tipo 2. 1 Establézcalo en para el tipo 1 de SCD o 2 para el tipo 2 de SCD. El valor predeterminado es SCD de tipo 1.
track_history_column_list o track_history_except_column_list list Subconjunto de columnas de salida a las que se va a realizar un seguimiento del historial en la tabla de destino. Use track_history_column_list para especificar la lista completa de columnas a las que se va a realizar el seguimiento. Use track_history_except_column_list para especificar las columnas que se excluirán del seguimiento. Puede declarar cualquier valor como una lista de cadenas o como funciones de Spark SQL col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Los argumentos a las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId), pero no puede usar col(source.userId). El valor predeterminado es incluir todas las columnas de la tabla de destino cuando no se pasa ningún track_history_column_list argumento o track_history_except_column_list a la función .
name str Nombre del flujo. Si no se proporciona, el valor predeterminado es el mismo que target.
once bool Opcionalmente, defina el flujo como un flujo de un solo uso, como un reposición. El uso de once=True cambia el flujo de dos maneras:
  • El valor de retorno. streaming-query. debe ser un dataframe por lotes en este caso, no un dataframe de streaming.
  • El flujo se ejecuta una vez de forma predeterminada. Si la canalización se actualiza por completo, el flujo ONCE se ejecuta nuevamente para recrear los datos.