Partilhar via


create_auto_cdc_from_snapshot_flow

Importante

Esta funcionalidade está na Pré-visualização Pública.

A função create_auto_cdc_from_snapshot_flow cria um fluxo que utiliza a funcionalidade de captura de dados alterados (CDC) do Lakeflow Spark Declarative Pipelines para processar dados de origem a partir de instantâneos de banco de dados. Consulte Como o CDC é implementado com a AUTO CDC FROM SNAPSHOT API?.

Observação

Esta função substitui a função apply_changes_from_snapshot()anterior. As duas funções têm a mesma assinatura. O Databricks recomenda a atualização para usar o novo nome.

Importante

É necessário ter uma tabela de streaming de destino para realizar esta operação.

Para criar a tabela de destino necessária, você pode usar a função create_streaming_table( ).

Sintaxe

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Observação

Para AUTO CDC FROM SNAPSHOT processamento, o comportamento padrão é inserir uma nova linha quando um registro correspondente com a(s) mesma(s) chave(s) não existe no destino. Se existir um registro correspondente, ele será atualizado somente se algum dos valores na linha tiver sido alterado. As linhas com chaves presentes no destino, mas não mais presentes na origem, são excluídas.

Para saber mais sobre o processamento CDC com snapshots, consulte "As APIs AUTO CDC: Simplifique a captura de dados de mudança com pipelines". Para exemplos de uso da função create_auto_cdc_from_snapshot_flow(), consulte os exemplos de ingestão de instantâneo periódico e ingestão de instantâneo histórico.

Parâmetros

Parâmetro Tipo Description
target str Required. O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a create_auto_cdc_from_snapshot_flow() função.
source str ou lambda function Required. O nome de uma tabela ou vista para tirar snapshots periodicamente ou uma função lambda Python que retorna o DataFrame de snapshot a ser processado e a versão de snapshot. Veja Implementar o source argumento.
keys list Required. A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino. Você pode especificar:
  • Uma lista de cadeias de caracteres: ["userId", "orderId"]
  • Uma lista de funções do Spark SQL col() : [col("userId"), col("orderId"]. Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).
stored_as_scd_type str ou int Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2. Definido como 1 para SCD tipo 1 ou 2 para SCD tipo 2. O padrão é SCD tipo 1.
track_history_column_list ou track_history_except_column_list list Um subconjunto de colunas de saída a serem acompanhadas para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Use track_history_except_column_list para especificar as colunas a serem excluídas do rastreamento. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções do Spark SQL col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list é passado para a função.

Implementar o source argumento

A create_auto_cdc_from_snapshot_flow() função inclui o source argumento. Para processar snapshots históricos, espera-se que o source argumento seja uma função lambda Python que retorna dois valores para a create_auto_cdc_from_snapshot_flow() função: um Python DataFrame contendo os dados de snapshot a serem processados e uma versão de snapshot.

A assinatura da função lambda é a seguinte:

lambda Any => Optional[(DataFrame, Any)]
  • O argumento para a função lambda é a versão de snapshot processada mais recentemente.
  • O valor de retorno da função lambda é None ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame contendo o instantâneo a ser processado. O segundo valor da tupla é a versão do instantâneo que representa a ordem lógica do instantâneo.

Um exemplo que implementa e chama a função lambda:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

O ambiente de execução do Lakeflow Spark Declarative Pipelines executa as seguintes etapas sempre que o pipeline que contém a função create_auto_cdc_from_snapshot_flow() é disparado:

  1. Executa a next_snapshot_and_version função para carregar o próximo snapshot DataFrame e a versão de snapshot correspondente.
  2. Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
  3. Deteta as alterações no novo instantâneo e as aplica incrementalmente à tabela de destino.
  4. Retorna à etapa #1 para carregar o próximo instantâneo e sua versão.