Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Importante
Esta funcionalidad está en versión preliminar pública.
La create_auto_cdc_from_snapshot_flow función crea un flujo que usa la funcionalidad de captura de datos de cambios (CDC) de las canalizaciones declarativas de Lakeflow Spark para procesar los datos de origen de las instantáneas de base de datos. Consulte ¿Cómo se implementa CDC con la AUTO CDC FROM SNAPSHOT API?.
Nota:
Esta función reemplaza a la función apply_changes_from_snapshot()anterior . Las dos funciones tienen la misma firma. Databricks recomienda actualizar para usar el nuevo nombre.
Importante
Debe tener una tabla de streaming de destino para esta operación.
Para crear la tabla de destino necesaria, puede usar la función create_streaming_table().
Syntax
from pyspark import pipelines as dp
dp.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Nota:
Para AUTO CDC FROM SNAPSHOT el procesamiento, el comportamiento predeterminado es insertar una nueva fila cuando un registro coincidente con las mismas claves no existe en el destino. Si existe un registro coincidente, solo se actualiza si alguno de los valores de la fila ha cambiado. Se eliminan las filas con claves presentes en el conjunto de destino, pero que ya no están presentes en el conjunto de origen.
Para obtener más información sobre el procesamiento CDC con instantáneas, consulte Las API de AUTO CDC: Simplificación de la captura de datos modificados con canalizaciones. Para obtener ejemplos de uso de la create_auto_cdc_from_snapshot_flow() función , consulte los ejemplos de ingesta de instantáneas periódicas e ingesta de instantáneas históricas .
Parámetros
| Parámetro | Tipo | Description |
|---|---|---|
target |
str |
Obligatorio. Nombre de la tabla que se va a actualizar. Puede usar la función create_streaming_table() para crear la tabla de destino antes de ejecutar la create_auto_cdc_from_snapshot_flow() función. |
source |
str o lambda function |
Obligatorio. Nombre de una tabla o vista para realizar instantáneas periódicamente o una función lambda de Python que devuelve el DataFrame de instantánea para su procesamiento y la versión de la instantánea. Consulte Implementación del source argumento. |
keys |
list |
Obligatorio. Columna o combinación de columnas que identifican de forma única una fila en los datos de origen. Se usa para identificar qué eventos CDC se aplican a registros específicos de la tabla de destino. Puede especificar cualquiera de las siguientes opciones:
|
stored_as_scd_type |
str o int |
Si se van a almacenar registros como tipo SCD 1 o SCD tipo 2.
1 Establézcalo en para el tipo 1 de SCD o 2 para el tipo 2 de SCD. El valor predeterminado es SCD de tipo 1. |
track_history_column_list o track_history_except_column_list |
list |
Subconjunto de columnas de salida a las que se va a realizar un seguimiento del historial en la tabla de destino. Use track_history_column_list para especificar la lista completa de columnas a las que se va a realizar el seguimiento. Use track_history_except_column_list para especificar las columnas que se excluirán del seguimiento. Puede declarar cualquier valor como una lista de cadenas o como funciones de Spark SQL col() :
Los argumentos para col() funciones no pueden incluir calificadores. Por ejemplo, puede usar col(userId), pero no puede usar col(source.userId). El valor predeterminado es incluir todas las columnas de la tabla de destino cuando no se pasa ningún track_history_column_list argumento o track_history_except_column_list a la función . |
Implementación del source argumento
La create_auto_cdc_from_snapshot_flow() función incluye el source argumento . Para procesar instantáneas históricas, se espera que el source argumento sea una función lambda de Python que devuelva dos valores a la create_auto_cdc_from_snapshot_flow() función: un DataFrame de Python que contiene los datos de instantánea que se van a procesar y una versión de instantánea.
A continuación se muestra la firma de la función lambda:
lambda Any => Optional[(DataFrame, Any)]
- El argumento de la función lambda es la versión de instantánea procesada más recientemente.
- El valor devuelto de la función lambda es
Noneo una tupla de dos valores: el primer valor de la tupla es un DataFrame que contiene la instantánea que se va a procesar. El segundo valor de la tupla es el que corresponde a la versión de instantánea, la cual representa el orden lógico de la misma.
Un ejemplo que implementa y llama a la función lambda:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)
El entorno de ejecución de canalizaciones declarativas de Spark de Lakeflow realiza los pasos siguientes cada vez que se desencadena la canalización que contiene la create_auto_cdc_from_snapshot_flow() función:
- Ejecuta la
next_snapshot_and_versionfunción para cargar la siguiente instantánea DataFrame y la versión de instantánea correspondiente. - Si no devuelve ningún dataframe, la ejecución finaliza y la actualización de la canalización se marca como completa.
- Detecta los cambios en la nueva instantánea y los aplica incrementalmente a la tabla de destino.
- Devuelve al paso 1 para cargar la siguiente instantánea y su versión.