Compartilhar via


create_auto_cdc_from_snapshot_flow

A create_auto_cdc_from_snapshot_flow função cria um fluxo que utiliza a funcionalidade de captura de dados de alteração (CDC) do Lakeflow Spark Declarative Pipelines para processar os dados de origem a partir de instantâneos de banco de dados. Veja como o CDC é implementado com a AUTO CDC FROM SNAPSHOT API?.

Observação

Essa função substitui a função apply_changes_from_snapshot()anterior. As duas funções têm a mesma assinatura. O Databricks recomenda a atualização para usar o novo nome.

Importante

Você deve possuir uma tabela de destino de streaming para esta operação. Para criar a tabela de destino necessária, você pode usar a função create_streaming_table(). Você não pode direcionar a mesma tabela de streaming usando tanto create_auto_cdc_from_snapshot_flow() quanto create_auto_cdc_flow().

Sintaxe

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Observação

Para AUTO CDC FROM SNAPSHOT processamento, o comportamento padrão é inserir uma nova linha quando um registro correspondente com a(s) mesma(s) chave(s) não existe no destino. Se um registro correspondente existir, ele será atualizado somente se algum dos valores na linha tiver sido alterado. As linhas com chaves presentes no destino, mas não mais presentes na origem, são excluídas.

Para saber mais sobre o processamento CDC com instantâneos, consulte as APIs AUTO CDC: Simplifique a captura de dados de mudanças com pipelines. Para obter exemplos de uso da função create_auto_cdc_from_snapshot_flow(), consulte os exemplos de ingestão de instantâneo periódicos e de ingestão de instantâneo histórico.

Parâmetros

Parâmetro Tipo Description
target str Obrigatório O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a create_auto_cdc_from_snapshot_flow() função.
source str ou lambda function Obrigatório O nome de uma tabela ou visualização para capturar instantâneos periodicamente ou uma função lambda do Python que retorna o DataFrame do instantâneo a ser processado e a versão do instantâneo. Consulte Implementar o source argumento.
keys list Obrigatório A coluna ou combinação de colunas que identificam exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino. Você pode especificar:
  • Uma lista de cadeias de caracteres: ["userId", "orderId"]
  • Uma lista de funções SQL col() do Spark: [col("userId"), col("orderId"]. Argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).
stored_as_scd_type str ou int Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2. Defina para SCD 1 tipo 1 ou 2 SCD tipo 2. O padrão é SCD tipo 1.
track_history_column_list ou track_history_except_column_list list Um subconjunto das colunas de saída a ser registrado no histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Use track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. Você pode declarar o valor como uma lista de cadeias de caracteres ou como funções SQL col() do Spark:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list é passado para a função.

Implementar o source argumento

A create_auto_cdc_from_snapshot_flow() função inclui o source argumento. Para processar instantâneos históricos, espera-se que o source argumento seja uma função lambda do Python que retorna dois valores para a create_auto_cdc_from_snapshot_flow() função: um DataFrame do Python que contém os dados de instantâneo a serem processados e uma versão de instantâneo.

Veja a seguir a assinatura da função lambda:

lambda Any => Optional[(DataFrame, Any)]
  • O argumento para a função lambda é a versão de instantâneo mais recentemente processada.
  • O valor retornado da função lambda é None ou uma tupla de dois valores: o primeiro valor da tupla é um DataFrame que contém o instantâneo a ser processado. O segundo valor da tupla é a versão do instantâneo que representa a ordem lógica do instantâneo.

Um exemplo que implementa e chama a função lambda:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

O runtime do Lakeflow Spark Declarative Pipelines executa as seguintes etapas sempre que o pipeline que contém a create_auto_cdc_from_snapshot_flow() função é disparado:

  1. Executa a função next_snapshot_and_version para carregar o próximo DataFrame de instantâneo e a versão do instantâneo correspondente.
  2. Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
  3. Detecta as alterações no novo instantâneo e as aplica incrementalmente à tabela de destino.
  4. Retorna à etapa número 1 para carregar o próximo instantâneo e sua versão.