Compartilhar via


create_auto_cdc_flow

A create_auto_cdc_flow() função cria um fluxo que usa a funcionalidade CDC (captura de dados de alteração) do Lakeflow Spark Declarative Pipelines para processar dados de origem de um CDF (feed de dados de alteração).

Observação

Essa função substitui a função apply_changes()anterior. As duas funções têm a mesma assinatura. O Databricks recomenda a atualização para usar o novo nome.

Importante

Você deve declarar uma tabela de streaming de destino para aplicar alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de create_auto_cdc_flow() destino, você deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados que os campos sequence_by.

Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface python do pipeline.

Sintaxe

from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = <bool>,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None,
  name = None,
  once = <bool>
)

No processamento de create_auto_cdc_flow, o comportamento padrão para eventos INSERT e UPDATE é upsert eventos CDC da origem: fazendo atualizações ou inserções para qualquer linha na tabela de destino que correspondam às chaves especificadas, ou inserir uma nova linha caso não exista um registro correspondente na tabela de destino. A manipulação de DELETE eventos pode ser especificada com o apply_as_deletes parâmetro.

Para saber mais sobre o processamento CDC com um fluxo de alterações, consulte as APIs AUTO CDC: Simplifique a captura de dados alterados com pipelines. Para obter um exemplo de como usar a create_auto_cdc_flow() função, consulte Exemplo: processamento SCD tipo 1 e tipo 2 com dados de origem CDF.

Parâmetros

Parâmetro Tipo Description
target str Obrigatório O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a create_auto_cdc_flow() função.
source str Obrigatório A fonte de dados que contém os registros do CDC.
keys list Obrigatório A coluna ou combinação de colunas que identificam exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino. Você pode especificar:
  • Uma lista de cadeias de caracteres: ["userId", "orderId"]
  • Uma lista de funções SQL col() do Spark: [col("userId"), col("orderId")]. Argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).
sequence_by str, col() ou struct() Obrigatório Os nomes de coluna que especificam a ordem lógica dos eventos CDC nos dados de origem. O Lakeflow Spark Declarative Pipelines usa esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem. A coluna especificada deve ser um tipo de dados classificável. Você pode especificar:
  • Uma cadeia de caracteres: "sequenceNum"
  • Uma função SQL col() do Spark: col("sequenceNum"). Argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).
  • Uma struct() combinação de várias colunas para resolver empates: struct("timestamp_col", "id_col"), será ordenado pelo primeiro campo da struct, depois pelo segundo campo, se houver um empate, e assim por diante.
ignore_null_updates bool Permitir a ingestão de atualizações que contêm um subconjunto das colunas-alvo. Quando um evento CDC corresponde a uma linha existente e ignore_null_updates é True, colunas com null retêm seus valores existentes no destino. Isso também é válido para colunas aninhadas com um valor de null. Quando ignore_null_updates é False, os valores existentes são substituídos com null valores.
O padrão é False.
apply_as_deletes str ou expr() Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert. Você pode especificar:
  • Uma cadeia de caracteres: "Operation = 'DELETE'"
  • Uma função SQL expr() do Spark: expr("Operation = 'DELETE'")

Para lidar com dados fora de ordem, a linha excluída é temporariamente retida como uma pedra tombada na tabela Delta subjacente e uma exibição é criada no metastore que filtra essas pedras de tumba. O intervalo de retenção padrão é de dois dias e pode ser configurado com a propriedade de tabela pipelines.cdc.tombstoneGCThresholdInSeconds.
apply_as_truncates str ou expr() Especifica quando um evento CDC deve ser tratado como uma tabela TRUNCATEcompleta. Você pode especificar:
  • Uma cadeia de caracteres: "Operation = 'TRUNCATE'"
  • Uma função SQL expr() do Spark: expr("Operation = 'TRUNCATE'")

Como essa cláusula dispara um truncamento completo da tabela de destino, ela deve ser usada apenas para casos específicos que requerem essa funcionalidade. O apply_as_truncates parâmetro tem suporte apenas para SCD tipo 1. O tipo 2 do SCD não dá suporte a operações de truncamento.
column_list ou except_column_list list Um subconjunto de colunas a serem incluídas na tabela de destino. Use column_list para especificar a lista completa de colunas a serem incluídas. Use except_column_list para especificar as colunas a serem excluídas. Você pode declarar o valor como uma lista de cadeias de caracteres ou como funções SQL col() do Spark:
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento column_list ou except_column_list é passado para a função.
stored_as_scd_type str ou int Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2. Defina para SCD 1 tipo 1 ou 2 SCD tipo 2. O padrão é SCD tipo 1.
track_history_column_list ou track_history_except_column_list list Um subconjunto das colunas de saída a ser registrado no histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Use track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. Você pode declarar o valor como uma lista de cadeias de caracteres ou como funções SQL col() do Spark:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list é passado para a função.
name str O nome do fluxo. Se não for fornecido, o padrão será o mesmo valor de target.
once bool Opcionalmente, defina o fluxo como um fluxo único, como um backfill. Usar once=True altera o fluxo de duas maneiras:
  • O valor de retorno. streaming-query. deve ser um DataFrame em lote nesse caso, não um DataFrame de streaming.
  • O fluxo é executado uma vez por padrão. Se o pipeline for atualizado completamente, o fluxo ONCE será executado novamente para recriar os dados.